Skip to content

Commit

Permalink
Updates to BRBINFO_4_1 after issues discovered during TE2 (#35040)
Browse files Browse the repository at this point in the history
* Updates to BRBINFO_4_1 after issues discovered during TE2

* Update test step 3 text

* Restyled by autopep8

---------

Co-authored-by: Restyled.io <[email protected]>
  • Loading branch information
2 people authored and pull[bot] committed Sep 25, 2024
1 parent d719560 commit 2038130
Showing 1 changed file with 57 additions and 39 deletions.
96 changes: 57 additions & 39 deletions src/python_testing/TC_BRBINFO_4_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
# limitations under the License.
#

# This test requires a TH_SERVER application. Please specify with --string-arg th_server_app_path:<path_to_app>
# TH_SERVER must support following arguments: --secured-device-port --discriminator --passcode --KVS
# This test requires a TH_ICD_SERVER application. Please specify with --string-arg th_icd_server_app_path:<path_to_app>
# TH_ICD_SERVER must support following arguments: --secured-device-port --discriminator --passcode --KVS
# E.g: python3 src/python_testing/TC_BRBINFO_4_1.py --commissioning-method on-network --qr-code MT:-24J042C00KA0648G00 \
# --string-arg th_server_app_path:out/linux-x64-lit-icd/lit-icd-app
# --string-arg th_icd_server_app_path:out/linux-x64-lit-icd/lit-icd-app

import logging
import os
Expand Down Expand Up @@ -62,7 +62,8 @@ def steps_TC_BRBINFO_4_1(self) -> list[TestStep]:
TestStep("1a", "TH reads from the ICD the A_IDLE_MODE_DURATION, A_ACTIVE_MODE_DURATION, and ACTIVE_MODE_THRESHOLD attributes"),
TestStep("1b", "Simple KeepActive command w/ subscription. ActiveChanged event received by TH contains PromisedActiveDuration"),
TestStep("2", "Sends 3x KeepActive commands w/ subscription. ActiveChanged event received ONCE and contains PromisedActiveDuration"),
TestStep("3", "KeepActive not returned after 60 minutes of offline ICD"),
TestStep("3", "TH waits for check-in from TH_ICD to confirm no additional ActiveChanged events are recieved"),
TestStep("4", "KeepActive not returned after 60 minutes of offline ICD"),
]
return steps

Expand Down Expand Up @@ -110,9 +111,10 @@ async def setup_class(self):

super().setup_class()
self.app_process = None
app = self.user_params.get("th_server_app_path", None)
self.app_process_paused = False
app = self.user_params.get("th_icd_server_app_path", None)
if not app:
asserts.fail('This test requires a TH_SERVER app. Specify app path with --string-arg th_server_app_path:<path_to_app>')
asserts.fail('This test requires a TH_ICD_SERVER app. Specify app path with --string-arg th_icd_server_app_path:<path_to_app>')

self.kvs = f'kvs_{str(uuid.uuid4())}'
self.port = 5543
Expand All @@ -129,6 +131,7 @@ async def setup_class(self):
logging.info("Commissioning of ICD to fabric one (TH)")
self.icd_nodeid = 1111

self.default_controller.EnableICDRegistration(self.default_controller.GenerateICDRegistrationParameters())
await self.default_controller.CommissionOnNetwork(nodeId=self.icd_nodeid, setupPinCode=passcode, filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=discriminator)

logging.info("Commissioning of ICD to fabric two (DUT)")
Expand All @@ -138,9 +141,10 @@ async def setup_class(self):
params.commissioningParameters.setupManualCode, params.commissioningParameters.setupQRCode)

def teardown_class(self):
# In case the th_server_app_path does not exist, then we failed the test
# In case the th_icd_server_app_path does not exist, then we failed the test
# and there is nothing to remove
if self.app_process is not None:
self.resume_th_icd_server(check_state=False)
logging.warning("Stopping app with SIGTERM")
self.app_process.send_signal(signal.SIGTERM.value)
self.app_process.wait()
Expand All @@ -150,6 +154,24 @@ def teardown_class(self):

super().teardown_class()

def pause_th_icd_server(self, check_state):
if check_state:
asserts.assert_false(self.app_process_paused, "ICD TH Server unexpectedly is already paused")
if self.app_process_paused:
return
# stops (halts) the ICD server process by sending a SIGTOP signal
self.app_process.send_signal(signal.SIGSTOP.value)
self.app_process_paused = True

def resume_th_icd_server(self, check_state):
if check_state:
asserts.assert_true(self.app_process_paused, "ICD TH Server unexpectedly is already running")
if not self.app_process_paused:
return
# resumes (continues) the ICD server process by sending a SIGCONT signal
self.app_process.send_signal(signal.SIGCONT.value)
self.app_process_paused = False

#
# BRBINFO 4.1 Test Body
#
Expand Down Expand Up @@ -232,58 +254,54 @@ async def test_TC_BRBINFO_4_1(self):

self.step("2")

stay_active_duration_ms = 1500
logging.info(f"Sending KeepActiveCommand({stay_active_duration_ms}ms)")
await self._send_keep_active_command(stay_active_duration_ms, dynamic_endpoint_id)

logging.info("Waiting for ActiveChanged from DUT...")
timeout_s = idle_mode_duration_s + max(active_mode_duration_ms, stay_active_duration_ms)/1000
promised_active_duration_ms = await self._wait_for_active_changed_event(timeout_s)

# wait for active time duration
sleep_time_s = max(stay_active_duration_ms, promised_active_duration_ms)/1000
time.sleep(sleep_time_s)
# ICD now should be in idle mode

# Prevent icd app from sending any check-in messages.
self.pause_th_icd_server(check_state=True)
# sends 3x keep active commands
logging.info(f"Step3 Sending first KeepActiveCommand({stay_active_duration_ms})")
stay_active_duration_ms = 2000
logging.info(f"Sending first KeepActiveCommand({stay_active_duration_ms})")
await self._send_keep_active_command(stay_active_duration_ms, dynamic_endpoint_id)
time.sleep(0.1)
logging.info(f"Step3 Sending second KeepActiveCommand({stay_active_duration_ms})")
logging.info(f"Sending second KeepActiveCommand({stay_active_duration_ms})")
await self._send_keep_active_command(stay_active_duration_ms, dynamic_endpoint_id)
time.sleep(0.1)
logging.info(f"Step3 Sending third KeepActiveCommand({stay_active_duration_ms})")
logging.info(f"Sending third KeepActiveCommand({stay_active_duration_ms})")
await self._send_keep_active_command(stay_active_duration_ms, dynamic_endpoint_id)
time.sleep(0.1)
self.resume_th_icd_server(check_state=True)

logging.info("Waiting for ActiveChanged from DUT...")
promised_active_duration_ms = await self._wait_for_active_changed_event((idle_mode_duration_s + max(active_mode_duration_ms, stay_active_duration_ms))/1000)

asserts.assert_equal(self.q.qSize(), 0, "More than one event received from DUT")
asserts.assert_equal(self.q.qsize(), 0, "More than one event received from DUT")

self.step("3")
await self.default_controller.WaitForActive(self.icd_nodeid, stayActiveDurationMs=5000)
asserts.assert_equal(self.q.qsize(), 0, "More than one event received from DUT")

self.step("4")

logging.info("TH waiting for checkin from TH_ICD...")
await self.default_controller.WaitForActive(self.icd_nodeid, stayActiveDurationMs=10000)
stay_active_duration_ms = 10000
logging.info(f"Sending KeepActiveCommand({stay_active_duration_ms})")
await self._send_keep_active_command(stay_active_duration_ms, dynamic_endpoint_id)

# stops (halts) the ICD server process by sending a SIGTOP signal
self.app_process.send_signal(signal.SIGSTOP.value)
self.pause_th_icd_server(check_state=True)
# If we are seeing assertion below fail test assumption is likely incorrect.
# Test assumes after TH waits for check-in from TH_ICD it has enough time to
# call the KeepActive command and pause the app to prevent it from checking in
# after DUT recieved the KeepActive command. Should this assumption be incorrect
# we could look into using existing ICDTestEventTriggerEvent, or adding test
# event trigger that will help suppress check-ins from the TH_ICD_SERVER.
asserts.assert_equal(self.q.qsize(), 0, "")

if not self.is_ci:
logging.info("Waiting for 60 minutes")
self._timeout
time.sleep(60*60)

# resumes (continues) the ICD server process by sending a SIGCONT signal
self.app_process.send_signal(signal.SIGCONT.value)
self.resume_th_icd_server(check_state=True)

# wait for active changed event, expect no event will be sent
event_timeout = (idle_mode_duration_s + max(active_mode_duration_ms, stay_active_duration_ms))/1000
try:
promised_active_duration_ms = self.q.get(block=True, timeout=event_timeout)
finally:
asserts.assert_true(queue.Empty(), "ActiveChanged event received when not expected")
logging.info("TH waiting for first checkin from TH_ICD...")
await self.default_controller.WaitForActive(self.icd_nodeid, stayActiveDurationMs=10000)
logging.info("TH waiting for second checkin from TH_ICD...")
await self.default_controller.WaitForActive(self.icd_nodeid, stayActiveDurationMs=10000)
asserts.assert_equal(self.q.qsize(), 0, "More than one event received from DUT")


if __name__ == "__main__":
Expand Down

0 comments on commit 2038130

Please sign in to comment.