From 1029216890a8835386dc16ec4f76ea57e48cfee0 Mon Sep 17 00:00:00 2001 From: ATmobica Date: Fri, 22 Oct 2021 14:17:36 +0000 Subject: [PATCH] Remove device_controller fixture is singleton al all Update wifi provisioning tests --- .../mbed/integration_tests/common/fixtures.py | 9 -- .../mbed/integration_tests/common/utils.py | 126 ++++++++++++++++-- .../lighting-app/test_app.py | 31 ++++- .../integration_tests/lock-app/test_app.py | 31 ++++- 4 files changed, 164 insertions(+), 33 deletions(-) diff --git a/src/test_driver/mbed/integration_tests/common/fixtures.py b/src/test_driver/mbed/integration_tests/common/fixtures.py index 13d66f6668a011..f56922f20ee6c8 100644 --- a/src/test_driver/mbed/integration_tests/common/fixtures.py +++ b/src/test_driver/mbed/integration_tests/common/fixtures.py @@ -24,8 +24,6 @@ from .serial_connection import SerialConnection from .serial_device import SerialDevice -from chip import ChipDeviceCtrl - import logging log = logging.getLogger(__name__) @@ -132,10 +130,3 @@ def device(board_allocator): device = board_allocator.allocate(name='DUT') yield device board_allocator.release(device) - - -@pytest.fixture(scope="function") -def device_controller(): - devCtrl = ChipDeviceCtrl.ChipDeviceController() - yield devCtrl - devCtrl.__del__ \ No newline at end of file diff --git a/src/test_driver/mbed/integration_tests/common/utils.py b/src/test_driver/mbed/integration_tests/common/utils.py index 455969cead66d1..b1e180abc818ed 100644 --- a/src/test_driver/mbed/integration_tests/common/utils.py +++ b/src/test_driver/mbed/integration_tests/common/utils.py @@ -18,6 +18,7 @@ import os import platform import random +import shlex from chip import exceptions @@ -29,8 +30,90 @@ import logging log = logging.getLogger(__name__) +class ParsingError(exceptions.ChipStackException): + def __init__(self, msg=None): + self.msg = "Parsing Error: " + msg -def scan_chip_ble_devices(devCtrl): + def __str__(self): + return self.msg + +def ParseEncodedString(value): + if value.find(":") < 0: + raise ParsingError( + "Value should be encoded in encoding:encodedvalue format") + enc, encValue = value.split(":", 1) + if enc == "str": + return encValue.encode("utf-8") + b'\x00' + elif enc == "hex": + return bytes.fromhex(encValue) + raise ParsingError("Only str and hex encoding is supported") + + +def ParseValueWithType(value, type): + if type == 'int': + return int(value) + elif type == 'str': + return value + elif type == 'bytes': + return ParseEncodedString(value) + elif type == 'bool': + return (value.upper() not in ['F', 'FALSE', '0']) + else: + raise ParsingError('Cannot recognize type: {}'.format(type)) + +def FormatZCLArguments(args, command): + commandArgs = {} + for kvPair in args: + if kvPair.find("=") < 0: + raise ParsingError("Argument should in key=value format") + key, value = kvPair.split("=", 1) + valueType = command.get(key, None) + commandArgs[key] = ParseValueWithType(value, valueType) + return commandArgs + +def send_zcl_command(devCtrl, line): + """ + Send ZCL message to device: + [key=value]... + :param devCtrl: device controller instance + :param line: command line + :return: error code and command responde + """ + res = None + err = 0 + try: + args = shlex.split(line) + all_commands = devCtrl.ZCLCommandList() + if len(args) < 5: + raise exceptions.InvalidArgumentCount(5, len(args)) + + if args[0] not in all_commands: + raise exceptions.UnknownCluster(args[0]) + command = all_commands.get(args[0]).get(args[1], None) + # When command takes no arguments, (not command) is True + if command == None: + raise exceptions.UnknownCommand(args[0], args[1]) + err, res = devCtrl.ZCLSend(args[0], args[1], int( + args[2]), int(args[3]), int(args[4]), FormatZCLArguments(args[5:], command), blocking=True) + if err != 0: + log.error("Failed to receive command response: {}".format(res)) + elif res != None: + log.info("Received command status response:") + log.info(res) + else: + log.info("Success, no status code is attached with response.") + except exceptions.ChipStackException as ex: + log.error("An exception occurred during process ZCL command:") + log.error(str(ex)) + err = -1 + except Exception as ex: + log.error("An exception occurred during processing input:") + log.error(str(ex)) + err = -1 + + return (err, res) + +def scan_chip_ble_devices(devCtrl, name): """ BLE scan CHIP device BLE scanning for 10 seconds and collect the results @@ -42,32 +125,55 @@ def scan_chip_ble_devices(devCtrl): bleMgr.scan("-t 10") for device in bleMgr.peripheral_list: + if device.Name != name: + continue devIdInfo = bleMgr.get_peripheral_devIdInfo(device) if devIdInfo: - log.info("Found CHIP device {}".format(device.Name)) devices.append(devIdInfo) return devices -def run_wifi_provisioning(devCtrl, ssid, password, discriminator, pinCode, nodeId=None): + +def connect_device_over_ble(devCtrl, discriminator, pinCode, nodeId=None): """ - Run WiFi provisionning via BLE + Connect to Matter accessory device over BLE :param devCtrl: device controller instance - :param ssid: network ssid - :param password: network password :param discriminator: CHIP device discriminator :param pinCode: CHIP device pin code :param nodeId: default value of node ID - :return: node ID is provisioning successful, otherwise None + :return: node ID is provisioning successful, otherwise None """ if nodeId == None: nodeId = random.randint(1, 1000000) try: - devCtrl.SetWifiCredential(ssid, password) devCtrl.ConnectBLE(int(discriminator), int(pinCode), int(nodeId)) except exceptions.ChipStackException as ex: - log.error("WiFi provisioning failed: {}".format(str(ex))) + log.error("Connect device over BLE failed: {}".format(str(ex))) return None - return nodeId \ No newline at end of file + return nodeId + +def commissioning_wifi(devCtrl, ssid, password, nodeId): + """ + Commissioning a Wi-Fi device + :param devCtrl: device controller instance + :param ssid: network ssid + :param password: network password + :param nodeId: value of node ID + :return: error code + """ + + # Inject the credentials to the device + err, res = send_zcl_command(devCtrl, "NetworkCommissioning AddWiFiNetwork {} 0 0 ssid=str:{} credentials=str:{} breadcrumb=0 timeoutMs=1000".format(nodeId, ssid, password)) + if err != 0 and res["Status"] != 0: + log.error("Set Wi-Fi credentials failed [{}]".format(err)) + return err + + # Enable the Wi-Fi interface + err, res = send_zcl_command(devCtrl, "NetworkCommissioning EnableNetwork {} 0 0 networkID=str:{} breadcrumb=0 timeoutMs=1000".format(nodeId, ssid)) + if err != 0 and res["Status"] != 0: + log.error("Enable Wi-Fi failed [{}]".format(err)) + return err + + return err \ No newline at end of file diff --git a/src/test_driver/mbed/integration_tests/lighting-app/test_app.py b/src/test_driver/mbed/integration_tests/lighting-app/test_app.py index 051c9b3a7d2246..d0a41a3f71cec6 100644 --- a/src/test_driver/mbed/integration_tests/lighting-app/test_app.py +++ b/src/test_driver/mbed/integration_tests/lighting-app/test_app.py @@ -14,15 +14,20 @@ # limitations under the License. import pytest +import re from chip.setup_payload import SetupPayload from chip import exceptions +from chip import ChipDeviceCtrl -from common.utils import scan_chip_ble_devices, run_wifi_provisioning +from common.utils import scan_chip_ble_devices, connect_device_over_ble, commissioning_wifi import logging log = logging.getLogger(__name__) +BLE_DEVICE_NAME = "MBED-lighting" +DEVICE_NODE_ID = 1234 + @pytest.mark.smoketest def test_smoke_test(device): device.reset(duration=1) @@ -32,18 +37,24 @@ def test_smoke_test(device): assert ret != None and len(ret) > 0 -def test_wifi_provisioning(device, network, device_controller): +ef test_wifi_provisioning(device, network): network_ssid = network[0] network_pass = network[1] + + devCtrl = ChipDeviceCtrl.ChipDeviceController() ret = device.wait_for_output("SetupQRCode") assert ret != None and len(ret) > 1 - qr_code = ret[-1].split('[', 1)[1].split(']')[0] - device_details = dict(SetupPayload().ParseQrCode("VP:vendorpayload%{}".format(qr_code)).attributes) + qr_code = re.sub(r"[\[\]]", "", ret[-1].partition("SetupQRCode:")[2]).strip() + try: + device_details = dict(SetupPayload().ParseQrCode("VP:vendorpayload%{}".format(qr_code)).attributes) + except exceptions.ChipStackError as ex: + log.error(ex.msg) + assert False assert device_details != None and len(device_details) != 0 - ble_chip_device = scan_chip_ble_devices(device_controller) + ble_chip_device = scan_chip_ble_devices(devCtrl, BLE_DEVICE_NAME) assert ble_chip_device != None and len(ble_chip_device) != 0 chip_device_found = False @@ -57,5 +68,11 @@ def test_wifi_provisioning(device, network, device_controller): assert chip_device_found - ret = run_wifi_provisioning(device_controller, network_ssid, network_pass, int(device_details["Discriminator"]), int(device_details["SetUpPINCode"]), DEVICE_NODE_ID) - assert ret != None and ret == DEVICE_NODE_ID \ No newline at end of file + ret = connect_device_over_ble(devCtrl, int(device_details["Discriminator"]), int(device_details["SetUpPINCode"]), DEVICE_NODE_ID) + assert ret != None and ret == DEVICE_NODE_ID + + ret = device.wait_for_output("Device completed Rendezvous process") + assert ret != None and len(ret) > 0 + + ret = commissioning_wifi(devCtrl, network_ssid, network_pass, DEVICE_NODE_ID) + assert ret == 0 \ No newline at end of file diff --git a/src/test_driver/mbed/integration_tests/lock-app/test_app.py b/src/test_driver/mbed/integration_tests/lock-app/test_app.py index 5bbedc6ed41bb4..0a680482d76153 100644 --- a/src/test_driver/mbed/integration_tests/lock-app/test_app.py +++ b/src/test_driver/mbed/integration_tests/lock-app/test_app.py @@ -14,15 +14,20 @@ # limitations under the License. import pytest +import re from chip.setup_payload import SetupPayload from chip import exceptions +from chip import ChipDeviceCtrl -from common.utils import scan_chip_ble_devices, run_wifi_provisioning +from common.utils import scan_chip_ble_devices, connect_device_over_ble, commissioning_wifi import logging log = logging.getLogger(__name__) +BLE_DEVICE_NAME = "MBED-lock" +DEVICE_NODE_ID = 1234 + @pytest.mark.smoketest def test_smoke_test(device): device.reset(duration=1) @@ -32,18 +37,24 @@ def test_smoke_test(device): assert ret != None and len(ret) > 0 -def test_wifi_provisioning(device, network, device_controller): +def test_wifi_provisioning(device, network): network_ssid = network[0] network_pass = network[1] + + devCtrl = ChipDeviceCtrl.ChipDeviceController() ret = device.wait_for_output("SetupQRCode") assert ret != None and len(ret) > 1 - qr_code = ret[-1].split('[', 1)[1].split(']')[0] - device_details = dict(SetupPayload().ParseQrCode("VP:vendorpayload%{}".format(qr_code)).attributes) + qr_code = re.sub(r"[\[\]]", "", ret[-1].partition("SetupQRCode:")[2]).strip() + try: + device_details = dict(SetupPayload().ParseQrCode("VP:vendorpayload%{}".format(qr_code)).attributes) + except exceptions.ChipStackError as ex: + log.error(ex.msg) + assert False assert device_details != None and len(device_details) != 0 - ble_chip_device = scan_chip_ble_devices(device_controller) + ble_chip_device = scan_chip_ble_devices(devCtrl, BLE_DEVICE_NAME) assert ble_chip_device != None and len(ble_chip_device) != 0 chip_device_found = False @@ -57,5 +68,11 @@ def test_wifi_provisioning(device, network, device_controller): assert chip_device_found - ret = run_wifi_provisioning(device_controller, network_ssid, network_pass, int(device_details["Discriminator"]), int(device_details["SetUpPINCode"]), DEVICE_NODE_ID) - assert ret != None and ret == DEVICE_NODE_ID \ No newline at end of file + ret = connect_device_over_ble(devCtrl, int(device_details["Discriminator"]), int(device_details["SetUpPINCode"]), DEVICE_NODE_ID) + assert ret != None and ret == DEVICE_NODE_ID + + ret = device.wait_for_output("Device completed Rendezvous process") + assert ret != None and len(ret) > 0 + + ret = commissioning_wifi(devCtrl, network_ssid, network_pass, DEVICE_NODE_ID) + assert ret == 0 \ No newline at end of file