PIMORONI pico enviro+ with measurements on the screen.

PIMORONI Pico Enviro+ Notes

Environmental Monitoring – No Screen

Saves a CSV file. Use Thonny to retrieve data.

import machine
import utime
import time
from machine import Pin, ADC, UART
from pimoroni import RGBLED, Button
from breakout_bme68x import BreakoutBME68X, STATUS_HEATER_STABLE
from pimoroni_i2c import PimoroniI2C
from breakout_ltr559 import BreakoutLTR559
# from pms5003 import PMS5003

"""
This basic example shows how to read from all the sensors on Enviro+.
Prints results to the REPL and saves data as a CSV
"""
# Set time
rtc=machine.RTC()

# Setup buttons
button_a = Button(12, invert=True)
button_b = Button(13, invert=True)
button_x = Button(14, invert=True)
button_y = Button(15, invert=True)


# change this to adjust temperature compensation
TEMPERATURE_OFFSET = 3

# set up the LED
led = RGBLED(6, 7, 10, invert=True)
led.set_rgb(0, 0, 255)

# set up the Pico's I2C
PINS_BREAKOUT_GARDEN = {"sda": 4, "scl": 5}
i2c = PimoroniI2C(**PINS_BREAKOUT_GARDEN)

# set up BME688 and LTR559 sensors
bme = BreakoutBME68X(i2c, address=0x77)
ltr = BreakoutLTR559(i2c)

# setup analog channel for microphone
MIC_PIN = 26
mic = ADC(Pin(26))

# Button a state
a = False

while True:
    if button_a.is_pressed:
        a = True
        led.set_rgb(0, 255, 0)
        print("Button A pressed!")
        # Open file
        timestamp=rtc.datetime()
        fname ="%04d%02d%02d-%02d%02d%02d.csv"%(timestamp[0:3] + timestamp[4:7])

        file=open(fname,"w")
        file.write("Time,Temperature (°C),Humidity (%), Pressure (hPa), Gas, Lux, Mic")
        # Run program
        while not button_b.is_pressed:             
            # read BME688
            temperature, pressure, humidity, gas, status, _, _ = bme.read()
            heater = "Stable" if status & STATUS_HEATER_STABLE else "Unstable"

            # correct temperature and humidity using an offset
            corrected_temperature = temperature - TEMPERATURE_OFFSET
            dewpoint = temperature - ((100 - humidity) / 5)
            corrected_humidity = 100 - (5 * (corrected_temperature - dewpoint))

            # read LTR559
            ltr_reading = ltr.get_reading()
            lux = ltr_reading[BreakoutLTR559.LUX]
            prox = ltr_reading[BreakoutLTR559.PROXIMITY]

            # read mic
            mic_reading = mic.read_u16()


            if heater == "Stable" and ltr_reading is not None:
                timestamp=rtc.datetime()
                print(timestamp)
                timestring="%04d-%02d-%02d %02d:%02d:%02d"%(timestamp[0:3] + timestamp[4:7])
                led.set_rgb(0, 255, 0)
                
                print(f"""\n
                    Time = {timestring}
                    Temperature = {corrected_temperature} °C
                    Humidity = {corrected_humidity} %
                    Pressure = {pressure/100} hPa
                    Gas = {gas}
                    Lux = {lux}
                    Mic = {mic_reading}
                     """)
                csvline = f"""\n{timestring},{corrected_temperature},{corrected_humidity},{pressure/100},{gas},{lux},{mic_reading}"""
                file.write(csvline)
            else:
                # light up the LED red if there's a problem with the BME688 or LTR559 sensor readings
                led.set_rgb(255, 0, 0)

            time.sleep(1.0)
        
        # Button B has been pressed
        print("Button B pressed")
        led.set_rgb(0, 0, 255)
        # Close file
        if a:
            file.close()
            a = False


Code language: Python (python)

With Screen, Network and Time Server

Save the above on the pico.

ntptime.py

import time

try:
    import usocket as socket
except:
    import socket
try:
    import ustruct as struct
except:
    import struct

# The NTP host can be configured at runtime by doing: ntptime.host = 'myhost.org'
host = "pool.ntp.org"
# The NTP socket timeout can be configured at runtime by doing: ntptime.timeout = 2
timeout = 1


def ttime():
    NTP_QUERY = bytearray(48)
    NTP_QUERY[0] = 0x1B
    addr = socket.getaddrinfo(host, 123)[0][-1]
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    try:
        s.settimeout(timeout)
        res = s.sendto(NTP_QUERY, addr)
        msg = s.recv(48)
    finally:
        s.close()
    val = struct.unpack("!I", msg[40:44])[0]

    EPOCH_YEAR = time.gmtime(0)[0]
    if EPOCH_YEAR == 2000:
        # (date(2000, 1, 1) - date(1900, 1, 1)).days * 24*60*60
        NTP_DELTA = 3155673600
    elif EPOCH_YEAR == 1970:
        # (date(1970, 1, 1) - date(1900, 1, 1)).days * 24*60*60
        NTP_DELTA = 2208988800
    else:
        raise Exception("Unsupported epoch: {}".format(EPOCH_YEAR))

    return val - NTP_DELTA


# There's currently no timezone support in MicroPython, and the RTC is set in UTC time.
# Add 11 for AEDT
def settime():
    t = ttime()
    import machine

    tm = time.gmtime(t)
    machine.RTC().datetime((tm[0], tm[1], tm[2], tm[6] + 1, tm[3], tm[4], tm[5], 0))
Code language: Python (python)

network_manager.py

import rp2
import network
import machine
import uasyncio


class NetworkManager:
    _ifname = ("Client", "Access Point")

    def __init__(self, country="GB", client_timeout=60, access_point_timeout=5, status_handler=None, error_handler=None):
        rp2.country(country)
        self._ap_if = network.WLAN(network.AP_IF)
        self._sta_if = network.WLAN(network.STA_IF)

        self._mode = network.STA_IF
        self._client_timeout = client_timeout
        self._access_point_timeout = access_point_timeout
        self._status_handler = status_handler
        self._error_handler = error_handler
        self.UID = ("{:02X}" * 8).format(*machine.unique_id())

    def isconnected(self):
        return self._sta_if.isconnected() or self._ap_if.isconnected()

    def config(self, var):
        if self._sta_if.active():
            return self._sta_if.config(var)
        else:
            if var == "password":
                return self.UID
            return self._ap_if.config(var)

    def mode(self):
        if self._sta_if.isconnected():
            return self._ifname[0]
        if self._ap_if.isconnected():
            return self._ifname[1]
        return None

    def ifaddress(self):
        if self._sta_if.isconnected():
            return self._sta_if.ifconfig()[0]
        if self._ap_if.isconnected():
            return self._ap_if.ifconfig()[0]
        return '0.0.0.0'

    def disconnect(self):
        if self._sta_if.isconnected():
            self._sta_if.disconnect()
        if self._ap_if.isconnected():
            self._ap_if.disconnect()

    async def wait(self, mode):
        while not self.isconnected():
            self._handle_status(mode, None)
            await uasyncio.sleep_ms(1000)

    def _handle_status(self, mode, status):
        if callable(self._status_handler):
            self._status_handler(self._ifname[mode], status, self.ifaddress())

    def _handle_error(self, mode, msg):
        if callable(self._error_handler):
            if self._error_handler(self._ifname[mode], msg):
                return
        raise RuntimeError(msg)

    async def client(self, ssid, psk):
        if self._sta_if.isconnected():
            self._handle_status(network.STA_IF, True)
            return

        self._ap_if.disconnect()
        self._ap_if.active(False)

        self._sta_if.active(True)
        self._sta_if.config(pm=0xa11140)
        self._sta_if.connect(ssid, psk)

        try:
            await uasyncio.wait_for(self.wait(network.STA_IF), self._client_timeout)
            self._handle_status(network.STA_IF, True)

        except uasyncio.TimeoutError:
            self._sta_if.active(False)
            self._handle_status(network.STA_IF, False)
            self._handle_error(network.STA_IF, "WIFI Client Failed")

    async def access_point(self):
        if self._ap_if.isconnected():
            self._handle_status(network.AP_IF, True)
            return

        self._sta_if.disconnect()
        self._sta_if.active(False)

        self._ap_if.ifconfig(("10.10.1.1", "255.255.255.0", "10.10.1.1", "10.10.1.1"))
        self._ap_if.config(password=self.UID)
        self._ap_if.active(True)

        try:
            await uasyncio.wait_for(self.wait(network.AP_IF), self._access_point_timeout)
            self._handle_status(network.AP_IF, True)

        except uasyncio.TimeoutError:
            self._sta_if.active(False)
            self._handle_status(network.AP_IF, False)
            self._handle_error(network.AP_IF, "WIFI Client Failed")

Code language: Python (python)

WIFI_CONFIG.py

Replace the values below with the appropriate values.

SSID = "WIFI SSID"
PSK = "WIFI PASSWORD "
COUNTRY = "COUNTRY CODE"


Code language: Python (python)

main.py

17/3/2024: Updated to only write average data (microphone maximum) once a minute rather than every second.

import machine
import time
import ntptime
import WIFI_CONFIG
import random
from machine import Pin, ADC, UART
from picographics import PicoGraphics, DISPLAY_ENVIRO_PLUS
from pimoroni import RGBLED, Button
from breakout_bme68x import BreakoutBME68X, STATUS_HEATER_STABLE
from pimoroni_i2c import PimoroniI2C
from breakout_ltr559 import BreakoutLTR559
from network_manager import NetworkManager
import uasyncio
# from pms5003 import PMS5003

"""
This basic example shows how to read from all the sensors on Enviro+.
Prints results to the REPL and saves data as a CSV
"""

# set up wifi

def status_handler(mode, status, ip):
    display.set_font("bitmap8")
    display.set_pen(BLACK)
    display.clear()
    display.set_pen(WHITE)
    display.text("Network: {}".format(WIFI_CONFIG.SSID), 10, 10, scale=2)
    print("Network: {}".format(WIFI_CONFIG.SSID))
    status_text = "Connecting..."
    if status is not None:
        if status:
            status_text = "Connection successful!"
        else:
            status_text = "Connection failed!"

    display.text(status_text, 10, 40, scale=2)
    print(status_text)
    print("IP: {}".format(ip))
    display.text("IP: {}".format(ip), 10, 60, scale=2)
    display.update()
    display.set_font("sans")
    
# Time on display

def display_time():
    colours = [RED, GREEN, CYAN, MAGENTA, YELLOW, BLUE, WHITE]
    timestamp=rtc.datetime()
    timestring="%04d-%02d-%02d %02d:%02d:%02d"%(timestamp[0:3] + timestamp[4:7])
    textheight = random.randint(25, 180)
    textwidth = random.randint(0, 30)
    colour_choice = random.choice(colours)
    display.set_font("bitmap8")
    display.set_pen(BLACK)
    display.clear()
    display.set_pen(colour_choice)
    display.text("Press A to start", textwidth, textheight, WIDTH, scale=2)
    display.text(f"{timestring}", textwidth, textheight + 20, WIDTH, scale=2)
    display.update()
    time.sleep(1)
    
    
# Set time
rtc=machine.RTC()

# Setup buttons
button_a = Button(12, invert=True)
button_b = Button(13, invert=True)
button_x = Button(14, invert=True)
button_y = Button(15, invert=True)

# Setup screen
# set up the display
display = PicoGraphics(display=DISPLAY_ENVIRO_PLUS)
BRIGHTNESS = 0.8
display.set_backlight(BRIGHTNESS)

# some constants we'll use for drawing
WHITE = display.create_pen(255, 255, 255)
BLACK = display.create_pen(0, 0, 0)
RED = display.create_pen(255, 0, 0)
GREEN = display.create_pen(0, 255, 0)
CYAN = display.create_pen(0, 255, 255)
MAGENTA = display.create_pen(200, 0, 200)
YELLOW = display.create_pen(200, 200, 0)
BLUE = display.create_pen(0, 0, 200)
FFT_COLOUR = display.create_pen(255, 0, 255)
GREY = display.create_pen(75, 75, 75)

WIDTH, HEIGHT = display.get_bounds()
display.set_font("sans")


# change this to adjust temperature compensation - doesn't seem to require this.
TEMPERATURE_OFFSET = 5

# set up the LED
led = RGBLED(6, 7, 10, invert=True)
led.set_rgb(0, 0, 255)

# set up the Pico's I2C
PINS_BREAKOUT_GARDEN = {"sda": 4, "scl": 5}
i2c = PimoroniI2C(**PINS_BREAKOUT_GARDEN)

# set up BME688 and LTR559 sensors
bme = BreakoutBME68X(i2c, address=0x77)
ltr = BreakoutLTR559(i2c)

# setup analog channel for microphone
MIC_PIN = 26
mic = ADC(Pin(26))

network_manager = NetworkManager(WIFI_CONFIG.COUNTRY, status_handler=status_handler)
# connect to wifi
uasyncio.get_event_loop().run_until_complete(network_manager.client(WIFI_CONFIG.SSID, WIFI_CONFIG.PSK))

# Set time
ntptime.settime()

# Disconnect network
network_manager.disconnect()


# Button a state
a = False

# Second counter
secs = 0

while True:
    display_time()
    if button_a.is_pressed:
        a = True
        led.set_rgb(0, 255, 0)
        display.set_backlight(BRIGHTNESS)
        display.set_font("sans")
        display.set_pen(RED)
        display.text("waiting for sensors", 0, 20, WIDTH, scale=1)
        display.update()
        
        # Setup measures
        temperature_sum = 0
        humidity_sum = 0
        lux_sum = 0
        pressure_sum = 0
        gas_sum = 0
        mic_max = 0
        
        
        # Open file
        timestamp=rtc.datetime()
        fname ="%04d%02d%02d-%02d%02d%02d.csv"%(timestamp[0:3] + timestamp[4:7])

        file=open(fname,"w")
        file.write("Time,Temperature (°C),Humidity (%), Pressure (hPa), Gas, Lux, Mic")
        # Run program
        while not button_b.is_pressed:
            if button_y.is_pressed:
                # Switch backlight off
                display.set_backlight(0)
            elif button_x.is_pressed:
                display.set_backlight(BRIGHTNESS)

            # read BME688
            temperature, pressure, humidity, gas, status, _, _ = bme.read()
            heater = "Stable" if status & STATUS_HEATER_STABLE else "Unstable"

            # correct temperature and humidity using an offset
            corrected_temperature = temperature - TEMPERATURE_OFFSET
            dewpoint = temperature - ((100 - humidity) / 5)
            corrected_humidity = 100 - (5 * (corrected_temperature - dewpoint))
            
            temperature_sum = temperature_sum + corrected_temperature
            pressure_sum = pressure_sum + pressure
            humidity_sum = humidity_sum + corrected_humidity
            gas_sum = gas_sum + gas 

            # read LTR559
            ltr_reading = ltr.get_reading()
            lux = ltr_reading[BreakoutLTR559.LUX]
            prox = ltr_reading[BreakoutLTR559.PROXIMITY]
            lux_sum = lux_sum + lux
            
            # read mic
            mic_reading = mic.read_u16()
            if mic_reading > mic_max:
                mic_max = mic_reading 

            if heater == "Stable" and ltr_reading is not None:
                timestamp=rtc.datetime()
                print(timestamp)
                timestring="%04d-%02d-%02d %02d:%02d:%02d"%(timestamp[0:3] + timestamp[4:7])
                led.set_rgb(0, 255, 0)
                
                print(f"""\n
                    Time = {timestring}
                    Temperature = {corrected_temperature} °C
                    Humidity = {corrected_humidity} %
                    Pressure = {pressure/100} hPa
                    Gas = {gas}
                    Lux = {lux}
                    Mic = {mic_reading}
                     """)
                if secs > 59:
                    avg_temp = temperature_sum / secs
                    avg_humidity = humidity_sum / secs
                    avg_pressure = pressure_sum / secs
                    avg_lux = lux_sum / secs
                    avg_gas = gas_sum / secs 
                    
                    # Write to csv file
                    csvline = f"""\n{timestring},{avg_temp},{avg_humidity},{avg_pressure/100},{avg_gas},{avg_lux},{mic_max}"""
                    file.write(csvline)
                    
                    secs = 0
                    mic_max = 0
                    temperature_sum = 0
                    humidity_sum = 0
                    lux_sum = 0
                    pressure_sum = 0
                    gas_sum = 0
                    mic_max = 0
                    
                # Display on screen
                display.set_pen(BLACK)
                display.clear()
                display.set_pen(WHITE)
                display.text(f"Temp: {corrected_temperature:.0f} °C", 0, 25, WIDTH, scale=1)
                display.text(f"Humd: {corrected_humidity:.0f}%", 0, 55, WIDTH, scale=1)
                display.text(f"Prs: {pressure/100:.0f}hPa", 0, 85, WIDTH, scale=1)
                display.text(f"Gas: {gas}", 0, 115, WIDTH, scale=1)
                display.text(f"Lux: {lux:.0f} lux", 0, 145, WIDTH, scale=1)
                display.text(f"Mic: {mic_reading - 32000}", 0, 175, WIDTH, scale=1)
                display.update()

                
            else:
                # light up the LED red if there's a problem with the BME688 or LTR559 sensor readings
                led.set_rgb(255, 0, 0)
                display.set_pen(BLACK)
                display.clear()
                display.set_backlight(BRIGHTNESS)
            
            # Increment seconds
            secs = secs + 1 
            # Measure every second.
            time.sleep(1.0)
        
        # Button B has been pressed
        print("Button B pressed")
        led.set_rgb(0, 0, 255)
        display_time()
        # Close file
        if a:
            file.close()
            a = False
            secs = 0

Code language: Python (python)

Usage

Press A to start collecting data and B to stop. X and Y switch on and off the screen backlight.

Retrieve the data files off the Pico using Thonny and delete off the Pico to save storage space.

Filed under: ,