Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add read/subscribe event to chip-repl based yamltests #24904

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions scripts/tests/chiptest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,6 @@ def _GetInDevelopmentTests() -> Set[str]:
Goal is for this set to become empty.
"""
return {
# TODO: Event not yet supported:
"Test_TC_ACL_2_10.yaml",
"Test_TC_ACL_2_7.yaml",
"Test_TC_ACL_2_8.yaml",
"Test_TC_ACL_2_9.yaml",
"TestEvents.yaml",

"TestClusterMultiFabric.yaml", # Enum mismatch
"TestGroupMessaging.yaml", # Needs group support in repl
"TestMultiAdmin.yaml", # chip-repl hang on command expeted to fail
Expand Down
199 changes: 189 additions & 10 deletions src/controller/python/chip/yaml/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
import chip.yaml.format_converter as Converter
import stringcase
from chip.ChipDeviceCtrl import ChipDeviceController, discovery
from chip.clusters.Attribute import AttributeStatus, SubscriptionTransaction, TypedAttributePath, ValueDecodeFailure
from chip.clusters.Attribute import (AttributeStatus, EventReadResult, SubscriptionTransaction, TypedAttributePath,
ValueDecodeFailure)
from chip.exceptions import ChipStackError
from chip.yaml.errors import ParsingError, UnexpectedParsingError
from matter_yamltests.pseudo_clusters.clusters.delay_commands import DelayCommands
Expand Down Expand Up @@ -56,6 +57,11 @@ class _GetCommissionerNodeIdResult:
node_id: int


@dataclass
class EventResponse:
event_result_list: list[EventReadResult]


@dataclass
class _ActionResult:
status: _ActionStatus
Expand All @@ -69,6 +75,12 @@ class _AttributeSubscriptionCallbackResult:
result: _ActionResult


@dataclass
class _EventSubscriptionCallbackResult:
name: str
result: _ActionResult


@dataclass
class _ExecutionContext:
''' Objects that is commonly passed around this file that are vital to test execution.'''
Expand All @@ -78,7 +90,8 @@ class _ExecutionContext:
subscriptions: list = field(default_factory=list)
# The key is the attribute/event name, and the value is a queue of subscription callback results
# that been sent by device under test. For attribute subscription the queue is of type
# _AttributeSubscriptionCallbackResult.
# _AttributeSubscriptionCallbackResult, for event the queue is of type
# _EventSubscriptionCallbackResult.
subscription_callback_result_queue: dict = field(default_factory=dict)


Expand Down Expand Up @@ -266,6 +279,51 @@ def parse_raw_response(self, raw_resp) -> _ActionResult:
return _ActionResult(status=_ActionStatus.SUCCESS, response=return_val)


class ReadEventAction(BaseAction):
''' Read Event action to be executed.'''

def __init__(self, test_step, cluster: str, context: _ExecutionContext):
'''Converts 'test_step' to read event action that can execute with ChipDeviceController.

Args:
'test_step': Step containing information required to run read event action.
'cluster': Name of cluster read event action is targeting.
'context': Contains test-wide common objects such as DataModelLookup instance.
Raises:
UnexpectedParsingError: Raised if there is an unexpected parsing error.
'''
super().__init__(test_step)
self._event_name = stringcase.pascalcase(test_step.event)
self._cluster = cluster
self._endpoint = test_step.endpoint
self._node_id = test_step.node_id
self._cluster_object = None
self._request_object = None
self._fabric_filtered = True
tehampson marked this conversation as resolved.
Show resolved Hide resolved
self._event_number_filter = test_step.event_number

self._request_object = context.data_model_lookup.get_event(self._cluster,
self._event_name)
if self._request_object is None:
raise UnexpectedParsingError(
f'ReadEvent failed to find cluster:{self._cluster} Event:{self._event_name}')

if test_step.arguments:
raise UnexpectedParsingError(
f'ReadEvent should not contain arguments. {self.label}')

def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
try:
urgent = 0
request = [(self._endpoint, self._request_object, urgent)]
resp = asyncio.run(dev_ctrl.ReadEvent(self._node_id, events=request, eventNumberFilter=self._event_number_filter))
except chip.interaction_model.InteractionModelError as error:
return _ActionResult(status=_ActionStatus.ERROR, response=error)

parsed_resp = EventResponse(event_result_list=resp)
return _ActionResult(status=_ActionStatus.SUCCESS, response=parsed_resp)


class WaitForCommissioneeAction(BaseAction):
''' Wait for commissionee action to be executed.'''

Expand Down Expand Up @@ -327,6 +385,27 @@ def name(self) -> str:
return self._name


class EventChangeAccumulator:
def __init__(self, name: str, expected_event, output_queue: queue.SimpleQueue):
self._name = name
self._expected_event = expected_event
self._output_queue = output_queue

def __call__(self, event_result: EventReadResult, transaction: SubscriptionTransaction):
if (self._expected_event.cluster_id == event_result.Header.ClusterId and
self._expected_event.event_id == event_result.Header.EventId):
event_response = EventResponse(event_result_list=[event_result])
result = _ActionResult(status=_ActionStatus.SUCCESS, response=event_response)

item = _EventSubscriptionCallbackResult(self._name, result)
logging.debug(f'Got subscription report on client {self.name}')
self._output_queue.put(item)

@property
def name(self) -> str:
return self._name


class SubscribeAttributeAction(ReadAttributeAction):
'''Single subscribe attribute action to be executed.'''

Expand Down Expand Up @@ -382,6 +461,63 @@ def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
return self.parse_raw_response(raw_resp)


class SubscribeEventAction(ReadEventAction):
'''Single subscribe event action to be executed.'''

def __init__(self, test_step, cluster: str, context: _ExecutionContext):
'''Converts 'test_step' to subscribe event action that can execute with ChipDeviceController.

Args:
'test_step': Step containing information required to run subscribe event action.
'cluster': Name of cluster subscribe event action is targeting.
'context': Contains test-wide common objects such as DataModelLookup instance.
Raises:
ParsingError: Raised if there is a benign error, and there is currently no
action to perform for this subscribe event.
UnexpectedParsingError: Raised if there is an unexpected parsing error.
'''
super().__init__(test_step, cluster, context)
self._context = context
if test_step.min_interval is None:
raise UnexpectedParsingError(
f'SubscribeAttribute action does not have min_interval {self.label}')
tehampson marked this conversation as resolved.
Show resolved Hide resolved
self._min_interval = test_step.min_interval

if test_step.max_interval is None:
raise UnexpectedParsingError(
f'SubscribeAttribute action does not have max_interval {self.label}')
tehampson marked this conversation as resolved.
Show resolved Hide resolved
self._max_interval = test_step.max_interval

def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
try:
urgent = 0
request = [(self._endpoint, self._request_object, urgent)]
subscription = asyncio.run(
dev_ctrl.ReadEvent(self._node_id, events=request, eventNumberFilter=self._event_number_filter,
reportInterval=(self._min_interval, self._max_interval),
keepSubscriptions=False))
except chip.interaction_model.InteractionModelError as error:
return _ActionResult(status=_ActionStatus.ERROR, response=error)

self._context.subscriptions.append(subscription)
output_queue = self._context.subscription_callback_result_queue.get(self._event_name,
None)
if output_queue is None:
output_queue = queue.SimpleQueue()
self._context.subscription_callback_result_queue[self._event_name] = output_queue

while not output_queue.empty():
output_queue.get(block=False)

subscription_handler = EventChangeAccumulator(self.label, self._request_object, output_queue)

subscription.SetEventUpdateCallback(subscription_handler)

events = subscription.GetEvents()
response = EventResponse(event_result_list=events)
return _ActionResult(status=_ActionStatus.SUCCESS, response=response)


class WriteAttributeAction(BaseAction):
'''Single write attribute action to be executed.'''

Expand Down Expand Up @@ -462,9 +598,15 @@ def __init__(self, test_step, context: _ExecutionContext):
UnexpectedParsingError: Raised if the expected queue does not exist.
'''
super().__init__(test_step)
self._attribute_name = stringcase.pascalcase(test_step.attribute)
self._output_queue = context.subscription_callback_result_queue.get(self._attribute_name,
None)
if test_step.attribute is not None:
queue_name = stringcase.pascalcase(test_step.attribute)
elif test_step.event is not None:
queue_name = stringcase.pascalcase(test_step.event)
else:
raise UnexpectedParsingError(
f'WaitForReport needs to wait on either attribute or event, neither were provided')

self._output_queue = context.subscription_callback_result_queue.get(queue_name, None)
if self._output_queue is None:
raise UnexpectedParsingError(f'Could not find output queue')

Expand All @@ -477,6 +619,8 @@ def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
except queue.Empty:
return _ActionResult(status=_ActionStatus.ERROR, response=None)

if isinstance(item, _AttributeSubscriptionCallbackResult):
return item.result
return item.result


Expand Down Expand Up @@ -621,14 +765,15 @@ def _attribute_read_action_factory(self, test_step, cluster: str):
'cluster': Name of cluster read attribute action is targeting.
Returns:
ReadAttributeAction if 'test_step' is a valid read attribute to be executed.
None if we were unable to use the provided 'test_step' for a known reason that is not
fatal to test execution.
'''
try:
return ReadAttributeAction(test_step, cluster, self._context)
except ParsingError:
return None

def _event_read_action_factory(self, test_step, cluster: str):
return ReadEventAction(test_step, cluster, self._context)

def _attribute_subscribe_action_factory(self, test_step, cluster: str):
'''Creates subscribe attribute command from TestStep provided.

Expand All @@ -648,6 +793,17 @@ def _attribute_subscribe_action_factory(self, test_step, cluster: str):
# propogated.
return None

def _attribute_subscribe_event_factory(self, test_step, cluster: str):
'''Creates subscribe event command from TestStep provided.

Args:
'test_step': Step containing information required to run subscribe attribute action.
'cluster': Name of cluster write attribute action is targeting.
Returns:
SubscribeEventAction if 'test_step' is a valid subscribe attribute to be executed.
'''
return SubscribeEventAction(test_step, cluster, self._context)

def _attribute_write_action_factory(self, test_step, cluster: str):
'''Creates write attribute command TestStep.

Expand Down Expand Up @@ -712,11 +868,11 @@ def encode(self, request) -> BaseAction:
elif command == 'readAttribute':
action = self._attribute_read_action_factory(request, cluster)
elif command == 'readEvent':
# TODO need to implement _event_read_action_factory
# action = self._event_read_action_factory(request, cluster)
pass
action = self._event_read_action_factory(request, cluster)
elif command == 'subscribeAttribute':
action = self._attribute_subscribe_action_factory(request, cluster)
elif command == 'subscribeEvent':
action = self._attribute_subscribe_event_factory(request, cluster)
elif command == 'waitForReport':
action = self._wait_for_report_action_factory(request)
else:
Expand Down Expand Up @@ -779,6 +935,29 @@ def decode(self, result: _ActionResult):
}
return decoded_response

if isinstance(response, EventResponse):
if not response.event_result_list:
# This means that the event result we got back was empty, below is how we
# represent this.
decoded_response = [{}]
return decoded_response
decoded_response = []
for event in response.event_result_list:
if event.Status != chip.interaction_model.Status.Success:
error_message = stringcase.snakecase(event.Status.name).upper()
decoded_response.append({'error': error_message})
continue
cluster_id = event.Header.ClusterId
cluster_name = self._test_spec_definition.get_cluster_name(cluster_id)
event_id = event.Header.EventId
event_name = self._test_spec_definition.get_event_name(cluster_id, event_id)
event_definition = self._test_spec_definition.get_event_by_name(cluster_name, event_name)
is_fabric_scoped = bool(event_definition.is_fabric_sensitive)
decoded_event = Converter.from_data_model_to_test_definition(
self._test_spec_definition, cluster_name, event_definition.fields, event.Data, is_fabric_scoped)
decoded_response.append({'value': decoded_event})
return decoded_response

if isinstance(response, ChipStackError):
decoded_response['error'] = 'FAILURE'
return decoded_response
Expand Down