Skip to content

Commit

Permalink
Port to gpiod and gpiodevice
Browse files Browse the repository at this point in the history
  • Loading branch information
Gadgetoid committed Nov 6, 2023
1 parent 0165e39 commit 77d1e34
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 71 deletions.
3 changes: 2 additions & 1 deletion examples/all.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@


# Configure the PMS5003 for Enviro+
pms5003 = PMS5003(device="/dev/ttyAMA0", baudrate=9600, pin_enable=22, pin_reset=27)
# PIN15 and PIN13 are enable and reset for Raspberry Pi 5
pms5003 = PMS5003(device="/dev/ttyAMA0", baudrate=9600, pin_enable="PIN15", pin_reset="PIN13")

try:
while True:
Expand Down
3 changes: 2 additions & 1 deletion examples/specific.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
)

# Configure the PMS5003 for Enviro+
pms5003 = PMS5003(device="/dev/ttyAMA0", baudrate=9600, pin_enable=22, pin_reset=27)
# PIN15 and PIN13 are enable and reset for Raspberry Pi 5
pms5003 = PMS5003(device="/dev/ttyAMA0", baudrate=9600, pin_enable="PIN15", pin_reset="PIN13")

try:
while True:
Expand Down
37 changes: 26 additions & 11 deletions pms5003/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
import struct
import time

import RPi.GPIO as GPIO
import gpiod
import gpiodevice
import serial
from gpiod.line import Direction, Value

__version__ = "0.0.5"


PMS5003_SOF = bytearray(b"\x42\x4d")

OUTL = gpiod.LineSettings(direction=Direction.OUTPUT, output_value=Value.INACTIVE)
OUTH = gpiod.LineSettings(direction=Direction.OUTPUT, output_value=Value.ACTIVE)
PLATFORMS = {

This comment has been minimized.

Copy link
@kevinvkell

kevinvkell Jan 7, 2024

This breaks my Raspberry Pi 2 because it can't find an entry in the dict. Some people were able to fix the problem by adding their own entry: https://forums.pimoroni.com/t/issue-with-enviro-with-rpi-zero-display-and-particles-not-working/23365/21

"Radxa ROCK 5B": {"enable": ("PIN_15", OUTH), "reset": ("PIN_13", OUTL)},
"Raspberry Pi 5": {"enable": ("PIN15", OUTH), "reset": ("PIN13", OUTL)},
"Raspberry Pi 4": {"enable": ("GPIO22", OUTH), "reset": ("GPIO27", OUTL)}
}


class ChecksumMismatchError(RuntimeError):
pass
Expand Down Expand Up @@ -86,33 +96,38 @@ def __str__(self):


class PMS5003:
def __init__(self, device="/dev/ttyAMA0", baudrate=9600, pin_enable=22, pin_reset=27):
def __init__(self, device="/dev/ttyAMA0", baudrate=9600, pin_enable=None, pin_reset=None):
self._serial = None
self._device = device
self._baudrate = baudrate
self._pin_enable = pin_enable
self._pin_reset = pin_reset

if pin_enable is not None and pin_reset is not None:
gpiodevice.friendly_errors = True
self._pin_enable = gpiodevice.get_pin(pin_enable, "PMS5003_en", OUTH)
self._pin_reset = gpiodevice.get_pin(pin_reset, "PMS5003_rst", OUTL)
else:
self._pin_enable, self._pin_reset = gpiodevice.get_pins_for_platform(PLATFORMS)

self.setup()

def setup(self):
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
GPIO.setup(self._pin_enable, GPIO.OUT, initial=GPIO.HIGH)
GPIO.setup(self._pin_reset, GPIO.OUT, initial=GPIO.HIGH)

if self._serial is not None:
self._serial.close()

self._serial = serial.Serial(self._device, baudrate=self._baudrate, timeout=4)

self.reset()

def set_pin(self, pin, state):
lines, offset = pin
lines.set_value(offset, Value.ACTIVE if state else Value.INACTIVE)

def reset(self):
time.sleep(0.1)
GPIO.output(self._pin_reset, GPIO.LOW)
self.set_pin(self._pin_reset, False)
self._serial.flushInput()
time.sleep(0.1)
GPIO.output(self._pin_reset, GPIO.HIGH)
self.set_pin(self._pin_reset, True)

def read(self):
start = time.time()
Expand Down
80 changes: 80 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import struct
import sys

import mock
import pytest


class MockSerial:
def __init__(self, *args, **kwargs):
self.ptr = 0
self.sof = b"\x42\x4d"
self.data = self.sof
self.data += struct.pack(">H", 28)
self.data += b"\x00" * 26
checksum = struct.pack(">H", sum(bytearray(self.data)))
self.data += checksum

def read(self, length):
result = self.data[self.ptr : self.ptr + length]
self.ptr += length
if self.ptr >= len(self.data):
self.ptr = 0
return result

def flushInput(self):
pass

def close(self):
pass


class MockSerialFail(MockSerial):
def __init__(self, *args, **kwargs):
pass

def read(self, length):
return b"\x00" * length


@pytest.fixture(scope='function', autouse=False)
def pms5003():
import pms5003
yield pms5003
del sys.modules['pms5003']


@pytest.fixture(scope='function', autouse=False)
def gpiod():
sys.modules['gpiod'] = mock.Mock()
sys.modules['gpiod.line'] = mock.Mock()
yield sys.modules['gpiod']
del sys.modules['gpiod.line']
del sys.modules['gpiod']


@pytest.fixture(scope='function', autouse=False)
def gpiodevice():
gpiodevice = mock.Mock()
gpiodevice.get_pins_for_platform.return_value = [(mock.Mock(), 0), (mock.Mock(), 0)]

sys.modules['gpiodevice'] = gpiodevice
yield gpiodevice
del sys.modules['gpiodevice']


@pytest.fixture(scope='function', autouse=False)
def serial():
sys.modules['serial'] = mock.Mock()
sys.modules['serial'].Serial = MockSerial
yield sys.modules['serial']
del sys.modules['serial']


@pytest.fixture(scope='function', autouse=False)
def serial_fail():
sys.modules['serial'] = mock.Mock()
sys.modules['serial'].Serial = MockSerialFail
yield sys.modules['serial']
del sys.modules['serial']

64 changes: 6 additions & 58 deletions tests/test_setup.py
Original file line number Diff line number Diff line change
@@ -1,74 +1,22 @@
import struct
import sys

import mock
import pytest


class MockSerialFail:
def __init__(self):
pass

def read(self, length):
return b"\x00" * length


class MockSerial:
def __init__(self):
self.ptr = 0
self.sof = b"\x42\x4d"
self.data = self.sof
self.data += struct.pack(">H", 28)
self.data += b"\x00" * 26
checksum = struct.pack(">H", sum(bytearray(self.data)))
self.data += checksum

def read(self, length):
result = self.data[self.ptr : self.ptr + length]
self.ptr += length
if self.ptr >= len(self.data):
self.ptr = 0
return result


def _mock():
sys.modules["RPi"] = mock.Mock()
sys.modules["RPi.GPIO"] = mock.Mock()
sys.modules["serial"] = mock.Mock()
def test_setup(gpiod, gpiodevice, serial, pms5003):
_ = pms5003.PMS5003()


def test_setup():
_mock()
import pms5003

sensor = pms5003.PMS5003()
del sensor


def test_double_setup():
_mock()
import pms5003

def test_double_setup(gpiod, gpiodevice, serial, pms5003):
sensor = pms5003.PMS5003()
sensor.setup()


def test_read():
_mock()
import pms5003

def test_read(gpiod, gpiodevice, serial, pms5003):
sensor = pms5003.PMS5003()
sensor._serial = MockSerial()
data = sensor.read()
data.pm_ug_per_m3(2.5)


def test_read_fail():
_mock()
import pms5003

def test_read_fail(gpiod, gpiodevice, serial_fail, pms5003):
sensor = pms5003.PMS5003()
sensor._serial = MockSerialFail()
with pytest.raises(pms5003.ReadTimeoutError):
data = sensor.read()
del data
_ = sensor.read()

0 comments on commit 77d1e34

Please sign in to comment.