Skip to content

Commit

Permalink
Merge pull request #69 from NebraLtd/marvinmarnold/56/support-arbitra…
Browse files Browse the repository at this point in the history
…ry-appname

feat: support arbitrary APPNAME and correct GPIOs
  • Loading branch information
shawaj authored Sep 9, 2021
2 parents 3855583 + cd45050 commit 8c9729e
Show file tree
Hide file tree
Showing 11 changed files with 119 additions and 60 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ Assuming virtualenv has been activated, execute the following command to run the
```
pip install -r test-requirements.txt
pytest
# Or test and run coverage report, with failure under 50%
PYTHONPATH=./ pytest --cov=minerconfig --cov=lib
# Or test and run coverage report
PYTHONPATH=./ pytest --cov=minerconfig --cov=lib
```

## Generating protobufs
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
from lib.cputemp.advertisement import Advertisement

ADVERTISEMENT_SERVICE_UUID = "0fda92b2-44a2-4af2-84f5-fa682baa2b8d"
UNKNOWN_MAC_ADDRESS_VAL = "XXXXXX"
from gatewayconfig.logger import logger
import gatewayconfig.constants as constants

# BLE advertisement
class BluetoothConnectionAdvertisement(Advertisement):
def __init__(self, index, eth0_mac_address, advertisement_type):
def __init__(self, index, eth0_mac_address, advertisement_type, variant_details):
Advertisement.__init__(self, index, advertisement_type)
try:
mac_address_last6 = eth0_mac_address.replace(":", "")[-6:]
logger.debug("Creating advertisement with MAC %s and variant details %s" % (eth0_mac_address, variant_details))
# assumes eth0_mac_address already stripped and uppercase
friendly_mac_address = eth0_mac_address.replace(":", "")[-6:]

except FileNotFoundError:
mac_address_last6 = UNKNOWN_MAC_ADDRESS_VAL
if 'APPNAME' in variant_details:
friendly_variant = variant_details['APPNAME']
advertisement_name = "Nebra %s Hotspot %s" % (friendly_variant, friendly_mac_address)
else:
friendly_variant = variant_details['FRIENDLY']
advertisement_name = "%s %s" % (friendly_variant, friendly_mac_address)

advertisement_name = "Nebra %s Hotspot" % mac_address_last6
self.add_local_name(advertisement_name)
self.include_tx_power = True
self.service_uuids = [ADVERTISEMENT_SERVICE_UUID]
self.service_uuids = [constants.HELIUM_SERVICE_UUID]
5 changes: 4 additions & 1 deletion gatewayconfig/file_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ def read_miner_keys(miner_keys_filepath):
for key in json_line.keys():
miner_keys[key] = json_line[key]
except json.JSONDecodeError:
miner_keys[key] = False
logger.error("Unable to correctly decode miner_keys. The app will continue running but may not behave as expected.")
miner_keys['pubkey'] = False
miner_keys['onboarding_key'] = False
miner_keys['animal_name'] = False

# Keyfile exists, now running.
pub_key = miner_keys['pubkey']
Expand Down
39 changes: 20 additions & 19 deletions gatewayconfig/gatewayconfig_app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import sentry_sdk
import threading
from RPi import GPIO
from gpiozero import Button, LED
try:
# checks if you have access to RPi.GPIO, which is available inside RPi
import RPi.GPIO as GPIO
except:
# In case of exception, you are executing your script outside of RPi, so import Mock.GPIO
import Mock.GPIO as GPIO

from hm_hardware_defs.variant import variant_definitions

from gatewayconfig.logger import logger
from gatewayconfig.processors.bluetooth_services_processor import BluetoothServicesProcessor
Expand All @@ -11,21 +18,16 @@
from gatewayconfig.processors.bluetooth_advertisement_processor import BluetoothAdvertisementProcessor
from gatewayconfig.gatewayconfig_shared_state import GatewayconfigSharedState
from gatewayconfig.file_loader import read_eth0_mac_address, read_wlan0_mac_address, read_miner_keys
from gatewayconfig.helpers import is_indoor_variant
import gatewayconfig.nmcli_custom as nmcli_custom

USER_BUTTON_HOLD_SECONDS = 2
INDOOR_USER_BUTTON_GPIO = 26
INDOOR_STATUS_LED_GPIO = 25

OTHER_USER_BUTTON_GPIO = 24
OTHER_STATUS_LED_GPIO = 25

class GatewayconfigApp:
def __init__(self, sentry_dsn, balena_app_name, balena_device_uuid, variant, eth0_mac_address_filepath, wlan0_mac_address_filepath,
miner_keys_filepath, diagnostics_json_url, ethernet_is_online_filepath, firmware_version):

self.variant = variant
self.variant_details = variant_definitions[variant]
self.init_sentry(sentry_dsn, balena_app_name, balena_device_uuid, variant)
self.shared_state = GatewayconfigSharedState()
self.init_nmcli()
Expand All @@ -41,7 +43,7 @@ def __init__(self, sentry_dsn, balena_app_name, balena_device_uuid, variant, eth
self.led_processor = LEDProcessor(self.status_led, self.shared_state)
self.diagnostics_processor = DiagnosticsProcessor(diagnostics_json_url, self.shared_state)
self.wifi_processor = WifiProcessor(self.shared_state)
self.bluetooth_advertisement_processor = BluetoothAdvertisementProcessor(eth0_mac_address, self.shared_state)
self.bluetooth_advertisement_processor = BluetoothAdvertisementProcessor(eth0_mac_address, self.shared_state, self.variant_details)

def start(self):
logger.debug("Starting ConfigApp")
Expand All @@ -60,7 +62,7 @@ def stop(self):
logger.debug("Stopping ConfigApp")
GPIO.cleanup()
# Quits the cputemp application
self.bluetooth_processor.quit()
self.bluetooth_services_processor.quit()

def init_sentry(self, sentry_dsn, balena_app_name, balena_device_uuid, variant):
sentry_sdk.init(sentry_dsn, environment=balena_app_name)
Expand All @@ -71,16 +73,9 @@ def init_nmcli(self):
nmcli_custom.disable_use_sudo()

def init_gpio(self):
if is_indoor_variant(self.variant):
user_button_gpio = INDOOR_USER_BUTTON_GPIO
status_led_gpio = INDOOR_STATUS_LED_GPIO
else:
user_button_gpio = OTHER_USER_BUTTON_GPIO
status_led_gpio = OTHER_STATUS_LED_GPIO

self.user_button = Button(user_button_gpio, hold_time=USER_BUTTON_HOLD_SECONDS)
self.user_button = Button(self.get_button_pin(), hold_time=USER_BUTTON_HOLD_SECONDS)
self.user_button.when_held= self.start_bluetooth_advertisement
self.status_led = LED(status_led_gpio)
self.status_led = LED(self.get_status_led_pin())

# Use daemon threads so that everything exists cleanly when the program stops
def start_threads(self):
Expand All @@ -107,4 +102,10 @@ def start_threads(self):

def start_bluetooth_advertisement(self):
logger.debug("Starting bluetooth advertisement")
self.shared_state.should_advertise_bluetooth = True
self.shared_state.should_advertise_bluetooth = True

def get_button_pin(self):
return self.variant_details['BUTTON']

def get_status_led_pin(self):
return self.variant_details['STATUS']
3 changes: 0 additions & 3 deletions gatewayconfig/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,3 @@ def string_to_dbus_byte_array(str):

def is_valid_ssid(ssid_str):
return ssid_str != "--" and ssid_str != ""

def is_indoor_variant(variant):
return (variant == "NEBHNT-IN1") or (variant == "Indoor")
4 changes: 2 additions & 2 deletions gatewayconfig/processors/bluetooth_advertisement_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
ADVERTISEMENT_OFF_SLEEP_SECONDS = 5

class BluetoothAdvertisementProcessor:
def __init__(self, eth0_mac_address, shared_state):
def __init__(self, eth0_mac_address, shared_state, variant_details):
self.shared_state = shared_state
self.connection_advertisement = BluetoothConnectionAdvertisement(ADVERTISEMENT_INDEX, eth0_mac_address, ADVERTISEMENT_TYPE)
self.connection_advertisement = BluetoothConnectionAdvertisement(ADVERTISEMENT_INDEX, eth0_mac_address, ADVERTISEMENT_TYPE, variant_details)

def run(self):
logger.debug("Running BluetoothAdvertisementProcessor")
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ sentry-sdk==1.1.0
pycairo==1.20.1
PyGObject==3.40.1
retry==0.9.2
requests==2.26.0
requests==2.26.0
hm-hardware-defs==0.1.5
3 changes: 2 additions & 1 deletion test-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
flake8==3.9.2
bandit==1.7.0
pytest==6.2.4
pytest-cov==2.12.1
pytest-cov==2.12.1
Mock.GPIO==0.1.8
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import pytest
import sys
import uuid
from io import StringIO
Expand All @@ -7,6 +6,8 @@
import dbus.mainloop.glib
from unittest.mock import patch, mock_open

from hm_hardware_defs.variant import variant_definitions

from gatewayconfig.bluetooth.advertisements.bluetooth_connection_advertisement import BluetoothConnectionAdvertisement


Expand All @@ -21,7 +22,8 @@ class TestBluetoothConnectionAdvertisement(TestCase):
maxDiff = None

def test_instantiation(self):
advertisement = BluetoothConnectionAdvertisement(100, 'A1:B2:C3:DD:E5:F6_', 'peripheral_')
variant_details = variant_definitions['NEBHNT-OUT1']
advertisement = BluetoothConnectionAdvertisement(100, 'A1:B2:C3:DD:E5:F6', 'peripheral_', variant_details)
self.assertEqual(
advertisement.path,
'/org/bluez/example/advertisement100'
Expand All @@ -32,29 +34,39 @@ def test_instantiation(self):
)
self.assertEqual(
advertisement.local_name,
'Nebra DE5F6_ Hotspot'
'Nebra Outdoor Hotspot DDE5F6'
)
self.assertEqual(
advertisement.ad_type,
'peripheral_'
)

def test_name_no_friendly(self):
variant_details = { 'FRIENDLY': 'WALRUS' }
advertisement = BluetoothConnectionAdvertisement(105, 'A1:B2:C3:DD:E5:F6', 'peripheral_', variant_details)
self.assertEqual(
advertisement.local_name,
'WALRUS DDE5F6'
)

@patch("builtins.open", new_callable=mock_open, read_data='a1:B2:c3:Dd:e5:f6')
def test_get_properties(self, eth0_file_mock):
advertisement = BluetoothConnectionAdvertisement(101, 'A1:B2:C3:DD:E5:F6', 'peripheral')
variant_details = variant_definitions['NEBHNT-OUT1']
advertisement = BluetoothConnectionAdvertisement(110, 'A1:B2:C3:DD:E5:F6', 'peripheral', variant_details)
properties = advertisement.get_properties()
expected = {
'org.bluez.LEAdvertisement1': {
'Type': 'peripheral',
'IncludeTxPower': dbus.Boolean(True),
'LocalName': dbus.String('Nebra %s Hotspot' % 'DDE5F6'),
'LocalName': dbus.String('Nebra %s Hotspot %s' % ('Outdoor', 'DDE5F6')),
'ServiceUUIDs': dbus.Array([DEFAULT_SERVICE_UUID], signature=dbus.Signature('s'))
}
}
self.assertDictEqual(properties, expected)

def test_get_properties_extended(self):
advertisement = BluetoothConnectionAdvertisement(102, 'A1:B2:C3:DD:E5:F6', "peripheral")
variant_details = variant_definitions['NEBHNT-OUT1']
advertisement = BluetoothConnectionAdvertisement(115, 'A1:B2:C3:DD:E5:F6', "peripheral", variant_details)

service_uuids = [str(uuid.uuid4())]
advertisement.service_uuids = service_uuids
Expand Down Expand Up @@ -88,12 +100,14 @@ def test_get_properties_extended(self):
self.assertDictEqual(properties, expected)

def test_get_path(self):
advertisement = BluetoothConnectionAdvertisement(103, 'A1:B2:C3:DD:E5:F6', 'peripheral')
variant_details = variant_definitions['NEBHNT-OUT1']
advertisement = BluetoothConnectionAdvertisement(120, 'A1:B2:C3:DD:E5:F6', 'peripheral', variant_details)
path = advertisement.get_path()
self.assertIsInstance(path, dbus.ObjectPath)

def test_add_service_uuid(self):
advertisement = BluetoothConnectionAdvertisement(104, 'A1:B2:C3:DD:E5:F6', 'peripheral')
variant_details = variant_definitions['NEBHNT-OUT1']
advertisement = BluetoothConnectionAdvertisement(125, 'A1:B2:C3:DD:E5:F6', 'peripheral', variant_details)
service_uuid = str(uuid.uuid4())
advertisement.add_service_uuid(service_uuid)
# FIXME: There is currently test environment pollution and DEFAULT_SERVICE_ID has been
Expand All @@ -104,7 +118,8 @@ def test_add_service_uuid(self):
)

def test_add_solicit_uuid(self):
advertisement = BluetoothConnectionAdvertisement(105, 'A1:B2:C3:DD:E5:F6', 'peripheral')
variant_details = variant_definitions['NEBHNT-OUT1']
advertisement = BluetoothConnectionAdvertisement(130, 'A1:B2:C3:DD:E5:F6', 'peripheral', variant_details)
solicit_uuid = str(uuid.uuid4())
advertisement.add_solicit_uuid(solicit_uuid)
self.assertEqual(
Expand All @@ -113,7 +128,8 @@ def test_add_solicit_uuid(self):
)

def test_add_manufacturer_data(self):
advertisement = BluetoothConnectionAdvertisement(106, 'A1:B2:C3:DD:E5:F6', 'peripheral')
variant_details = variant_definitions['NEBHNT-OUT1']
advertisement = BluetoothConnectionAdvertisement(135, 'A1:B2:C3:DD:E5:F6', 'peripheral', variant_details)
manufacturer_data = {'name': 'Nebra'}
advertisement.add_manufacturer_data(
'Nebra',
Expand All @@ -133,7 +149,8 @@ def test_add_manufacturer_data(self):
)

def test_add_service_data(self):
advertisement = BluetoothConnectionAdvertisement(107, 'A1:B2:C3:DD:E5:F6', 'peripheral')
variant_details = variant_definitions['NEBHNT-OUT1']
advertisement = BluetoothConnectionAdvertisement(140, 'A1:B2:C3:DD:E5:F6', 'peripheral', variant_details)
service_uuid = str(uuid.uuid4())
service_data = {'key': 'value'}
advertisement.add_service_data(
Expand All @@ -153,7 +170,8 @@ def test_add_service_data(self):
)

def test_add_local_name(self):
advertisement = BluetoothConnectionAdvertisement(108, 'A1:B2:C3:DD:E5:F6', 'peripheral')
variant_details = variant_definitions['NEBHNT-OUT1']
advertisement = BluetoothConnectionAdvertisement(145, 'A1:B2:C3:DD:E5:F6', 'peripheral', variant_details)
local_name = 'LocalHost'
advertisement.add_local_name(
local_name
Expand All @@ -165,19 +183,21 @@ def test_add_local_name(self):

@patch("builtins.open", new_callable=mock_open, read_data='a1:B2:c3:Dd:e5:f6')
def test_getall_valid_iface(self, eth0_file_mock):
advertisement = BluetoothConnectionAdvertisement(109, 'A1:B2:C3:DD:E5:F6', 'peripheral')
variant_details = variant_definitions['NEBHNT-OUT1']
advertisement = BluetoothConnectionAdvertisement(150, 'A1:B2:C3:DD:E5:F6', 'peripheral', variant_details)
self.assertDictEqual(
advertisement.GetAll(VALID_LE_ADVERTISEMENT_IFACE),
{
'IncludeTxPower': dbus.Boolean(True),
'LocalName': dbus.String('Nebra %s Hotspot' % 'DDE5F6'),
'LocalName': dbus.String('Nebra %s Hotspot %s' % ('Outdoor', 'DDE5F6')),
'ServiceUUIDs': dbus.Array([DEFAULT_SERVICE_UUID]),
'Type': 'peripheral',
}
)

def test_getall_invalid_iface(self):
advertisement = BluetoothConnectionAdvertisement(110, 'A1:B2:C3:DD:E5:F6', 'peripheral')
variant_details = variant_definitions['NEBHNT-OUT1']
advertisement = BluetoothConnectionAdvertisement(155, 'A1:B2:C3:DD:E5:F6', 'peripheral', variant_details)

exception = False
exception_type = None
Expand All @@ -192,22 +212,24 @@ def test_getall_invalid_iface(self):
self.assertIsInstance(exception_type, Exception)

def test_release(self):
advertisement = BluetoothConnectionAdvertisement(111, 'A1:B2:C3:DD:E5:F6', 'peripheral')
variant_details = variant_definitions['NEBHNT-OUT1']
advertisement = BluetoothConnectionAdvertisement(160, 'A1:B2:C3:DD:E5:F6', 'peripheral', variant_details)

out = StringIO()
sys.stdout = out
result = advertisement.Release()
printed_output = out.getvalue().strip()
self.assertEqual(
printed_output,
'/org/bluez/example/advertisement111: Released!'
'/org/bluez/example/advertisement160: Released!'
)

# Returns nothing
self.assertEqual(result, None)

def test_register_callback(self):
advertisement = BluetoothConnectionAdvertisement(112, 'A1:B2:C3:DD:E5:F6', 'peripheral')
variant_details = variant_definitions['NEBHNT-OUT1']
advertisement = BluetoothConnectionAdvertisement(165, 'A1:B2:C3:DD:E5:F6', 'peripheral', variant_details)

out = StringIO()
sys.stdout = out
Expand All @@ -222,7 +244,8 @@ def test_register_callback(self):
self.assertEqual(result, None)

def test_register_ad_error_callback(self):
advertisement = BluetoothConnectionAdvertisement(113, 'A1:B2:C3:DD:E5:F6', 'peripheral')
variant_details = variant_definitions['NEBHNT-OUT1']
advertisement = BluetoothConnectionAdvertisement(170, 'A1:B2:C3:DD:E5:F6', 'peripheral', variant_details)

out = StringIO()
sys.stdout = out
Expand All @@ -246,7 +269,8 @@ def test_register(
mock_findadapter,
mock_getbus
):
advertisement = BluetoothConnectionAdvertisement(114, 'A1:B2:C3:DD:E5:F6', 'peripheral')
variant_details = variant_definitions['NEBHNT-OUT1']
advertisement = BluetoothConnectionAdvertisement(175, 'A1:B2:C3:DD:E5:F6', 'peripheral', variant_details)
result = advertisement.register()

mock_dbus_interface.assert_called()
Expand All @@ -265,10 +289,11 @@ def test_unregister(
mock_findadapter,
mock_getbus
):
variant_details = variant_definitions['NEBHNT-OUT1']
out = StringIO()
sys.stdout = out

advertisement = BluetoothConnectionAdvertisement(115, 'A1:B2:C3:DD:E5:F6', 'peripheral')
advertisement = BluetoothConnectionAdvertisement(180, 'A1:B2:C3:DD:E5:F6', 'peripheral', variant_details)
result = advertisement.unregister()

mock_dbus_interface.assert_called()
Expand Down
Loading

0 comments on commit 8c9729e

Please sign in to comment.