diff --git a/src/app/MessageDef/EventDataIB.cpp b/src/app/MessageDef/EventDataIB.cpp index 11020576149ae2..8e553eeb12654f 100644 --- a/src/app/MessageDef/EventDataIB.cpp +++ b/src/app/MessageDef/EventDataIB.cpp @@ -455,10 +455,7 @@ CHIP_ERROR EventDataIB::Parser::ProcessEventTimestamp(EventHeader & aEventHeader hasEpochTimestamp = true; } - if ((hasSystemTimestamp && !hasEpochTimestamp && !hasDeltaSystemTimestamp && !hasDeltaEpochTimestamp) || - (!hasSystemTimestamp && hasEpochTimestamp && !hasDeltaSystemTimestamp && !hasDeltaEpochTimestamp) || - (!hasSystemTimestamp && !hasEpochTimestamp && hasDeltaSystemTimestamp && !hasDeltaEpochTimestamp) || - (!hasSystemTimestamp && !hasEpochTimestamp && !hasDeltaSystemTimestamp && hasDeltaEpochTimestamp)) + if (hasSystemTimestamp + hasEpochTimestamp + hasDeltaSystemTimestamp + hasDeltaEpochTimestamp == 1) { return CHIP_NO_ERROR; } diff --git a/src/app/ReadClient.h b/src/app/ReadClient.h index de7185068d53db..b7fb7557bcd2cc 100644 --- a/src/app/ReadClient.h +++ b/src/app/ReadClient.h @@ -76,11 +76,10 @@ class ReadClient : public Messaging::ExchangeDelegate * The ReadClient object MUST continue to exist after this call is completed. * * This callback will be called when receiving event data received in the Read and Subscribe interactions - * + * only one of the apData and apStatus will be non-null. * @param[in] apReadClient: The read client object that initiated the read or subscribe transaction. * @param[in] aEventHeader: The event header in report response. - * @param[in] apData: A TLVReader positioned right on the payload of the event. This will be set to null if the apStatus is - * not null. + * @param[in] apData: A TLVReader positioned right on the payload of the event. * @param[in] apStatus: Event-specific status, containing an InteractionModel::Status code as well as an optional * cluster-specific status code. */ diff --git a/src/app/reporting/Engine.cpp b/src/app/reporting/Engine.cpp index 9a5e5aed607a57..ed0a3f1115fa07 100644 --- a/src/app/reporting/Engine.cpp +++ b/src/app/reporting/Engine.cpp @@ -227,14 +227,14 @@ CHIP_ERROR Engine::BuildSingleReportDataEventReports(ReportDataMessage::Builder VerifyOrExit(clusterInfoList != nullptr, ); VerifyOrExit(apReadHandler != nullptr, err = CHIP_ERROR_INVALID_ARGUMENT); + // If the eventManager is not valid or has not been initialized, + // skip the rest of processing + VerifyOrExit(eventManager.IsValid(), ChipLogError(DataManagement, "EventManagement has not yet initialized")); EventReports = aReportDataBuilder.CreateEventReports(); SuccessOrExit(err = EventReports.GetError()); memcpy(initialEvents, eventNumberList, sizeof(initialEvents)); - // If the eventManager is not valid or has not been initialized, - // skip the rest of processing - VerifyOrExit(eventManager.IsValid(), err = CHIP_ERROR_INCORRECT_STATE); for (size_t index = 0; index < kNumPriorityLevel; index++) { diff --git a/src/controller/ReadInteraction.h b/src/controller/ReadInteraction.h index 1fee2412c3aefa..274ad18d214009 100644 --- a/src/controller/ReadInteraction.h +++ b/src/controller/ReadInteraction.h @@ -93,10 +93,11 @@ CHIP_ERROR ReadAttribute(Messaging::ExchangeManager * aExchangeMgr, const Sessio */ template CHIP_ERROR ReadEvent(Messaging::ExchangeManager * apExchangeMgr, const SessionHandle sessionHandle, EndpointId endpointId, - ClusterId clusterId, EventId eventId, typename TypedReadEventCallback::OnSuccessCallbackType onSuccessCb, typename TypedReadEventCallback::OnErrorCallbackType onErrorCb) { + ClusterId clusterId = DecodableEventTypeInfo::GetClusterId(); + EventId eventId = DecodableEventTypeInfo::GetEventId(); app::EventPathParams eventPath(endpointId, clusterId, eventId); app::ReadPrepareParams readParams(sessionHandle); app::ReadClient * readClient = nullptr; @@ -150,9 +151,7 @@ CHIP_ERROR ReadEvent(Messaging::ExchangeManager * aExchangeMgr, const SessionHan typename TypedReadEventCallback::OnSuccessCallbackType onSuccessCb, typename TypedReadEventCallback::OnErrorCallbackType onErrorCb) { - return ReadAttribute(aExchangeMgr, sessionHandle, endpointId, - EventTypeInfo::GetClusterId(), EventTypeInfo::GetEventId(), - onSuccessCb, onErrorCb); + return ReadEvent(aExchangeMgr, sessionHandle, endpointId, onSuccessCb, onErrorCb); } } // namespace Controller } // namespace chip diff --git a/src/controller/TypedReadCallback.h b/src/controller/TypedReadCallback.h index c417ddfc991954..75cf2aa6586091 100644 --- a/src/controller/TypedReadCallback.h +++ b/src/controller/TypedReadCallback.h @@ -128,12 +128,10 @@ class TypedReadEventCallback final : public app::ReadClient::Callback { CHIP_ERROR err = CHIP_NO_ERROR; DecodableEventTypeInfo value; - VerifyOrExit(aEventHeader.mPath.mClusterId == DecodableEventTypeInfo::GetClusterId() && - aEventHeader.mPath.mEventId == DecodableEventTypeInfo::GetEventId(), - CHIP_ERROR_SCHEMA_MISMATCH); + VerifyOrExit(apData != nullptr, err = CHIP_ERROR_INVALID_ARGUMENT); - err = app::DataModel::Decode(*apData, value); + err = apReadClient->DecodeEvent(aEventHeader, value, *apData); SuccessOrExit(err); mOnSuccess(aEventHeader, value); diff --git a/src/controller/python/chip/ChipDeviceCtrl.py b/src/controller/python/chip/ChipDeviceCtrl.py index acfea12d733ddc..a993c2e2d4760e 100644 --- a/src/controller/python/chip/ChipDeviceCtrl.py +++ b/src/controller/python/chip/ChipDeviceCtrl.py @@ -486,6 +486,81 @@ async def ReadAttribute(self, nodeid: int, attributes: typing.List[typing.Union[ raise self._ChipStack.ErrorToException(res) return await future + async def ReadEvent(self, nodeid: int, events: typing.List[typing.Union[ + None, # Empty tuple, all wildcard + typing.Tuple[int], # Endpoint + # Wildcard endpoint, Cluster id present + typing.Tuple[typing.Type[ClusterObjects.Cluster]], + # Wildcard endpoint, Cluster + Event present + typing.Tuple[typing.Type[ClusterObjects.ClusterEventDescriptor]], + # Wildcard event id + typing.Tuple[int, typing.Type[ClusterObjects.Cluster]], + # Concrete path + typing.Tuple[int, typing.Type[ClusterObjects.ClusterEventDescriptor]] + ]], reportInterval: typing.Tuple[int, int] = None): + ''' + Read a list of events from a target node + + nodeId: Target's Node ID + events: A list of tuples of varying types depending on the type of read being requested: + (endpoint, Clusters.ClusterA.EventA): Endpoint = specific, Cluster = specific, Event = specific + (endpoint, Clusters.ClusterA): Endpoint = specific, Cluster = specific, Event = * + (Clusters.ClusterA.EventA): Endpoint = *, Cluster = specific, Event = specific + endpoint: Endpoint = specific, Cluster = *, Event = * + Clusters.ClusterA: Endpoint = *, Cluster = specific, Event = * + '*' or (): Endpoint = *, Cluster = *, Event = * + + The cluster and events specified above are to be selected from the generated cluster objects. + + e.g. + ReadEvent(1, [ 1 ] ) -- case 4 above. + ReadEvent(1, [ Clusters.Basic ] ) -- case 5 above. + ReadEvent(1, [ (1, Clusters.Basic.Events.Location ] ) -- case 1 above. + + reportInterval: A tuple of two int-s for (MinIntervalFloor, MaxIntervalCeiling). Used by establishing subscriptions. + When not provided, a read request will be sent. + ''' + + eventLoop = asyncio.get_running_loop() + future = eventLoop.create_future() + + device = self.GetConnectedDeviceSync(nodeid) + eves = [] + for v in events: + endpoint = None + cluster = None + event = None + if v in [('*'), ()]: + # Wildcard + pass + elif type(v) is not tuple: + print(type(v)) + if type(v) is int: + endpoint = v + elif issubclass(v, ClusterObjects.Cluster): + cluster = v + elif issubclass(v, ClusterObjects.ClusterEventDescriptor): + event = v + else: + raise ValueError("Unsupported Event Path") + else: + # endpoint + (cluster) event / endpoint + cluster + endpoint = v[0] + if issubclass(v[1], ClusterObjects.Cluster): + cluster = v[1] + elif issubclass(v[1], ClusterAttribute.ClusterEventDescriptor): + event = v[1] + else: + raise ValueError("Unsupported Attribute Path") + eves.append(ClusterAttribute.EventPath( + EndpointId=endpoint, Cluster=cluster, Event=event)) + res = self._ChipStack.Call( + lambda: ClusterAttribute.ReadEvents(future, eventLoop, device, self, eves, ClusterAttribute.SubscriptionParameters(reportInterval[0], reportInterval[1]) if reportInterval else None)) + if res != 0: + raise self._ChipStack.ErrorToException(res) + outcome = await future + return await future + def ZCLSend(self, cluster, command, nodeid, endpoint, groupid, args, blocking=False): req = None try: @@ -507,7 +582,8 @@ def ZCLReadAttribute(self, cluster, attribute, nodeid, endpoint, groupid, blocki except: raise UnknownAttribute(cluster, attribute) - result = asyncio.run(self.ReadAttribute(nodeid, [(endpoint, req)])) + result = asyncio.run(self.ReadAttribute( + nodeid, [(endpoint, req)]))['Attributes'] path = ClusterAttribute.AttributePath( EndpointId=endpoint, Attribute=req) return im.AttributeReadResult(path=im.AttributePath(nodeId=nodeid, endpointId=path.EndpointId, clusterId=path.ClusterId, attributeId=path.AttributeId), status=0, value=result[path].Data.value) diff --git a/src/controller/python/chip/clusters/Attribute.py b/src/controller/python/chip/clusters/Attribute.py index dac73b3b5fae08..4f34e195598159 100644 --- a/src/controller/python/chip/clusters/Attribute.py +++ b/src/controller/python/chip/clusters/Attribute.py @@ -19,13 +19,13 @@ import ctypes from dataclasses import dataclass from typing import Tuple, Type, Union, List, Any, Callable -from ctypes import CFUNCTYPE, c_char_p, c_size_t, c_void_p, c_uint32, c_uint16, py_object, c_uint64 +from ctypes import CFUNCTYPE, c_char_p, c_size_t, c_void_p, c_uint64, c_uint32, c_uint16, c_uint8, py_object, c_uint64 -from .ClusterObjects import Cluster, ClusterAttributeDescriptor +from .ClusterObjects import Cluster, ClusterAttributeDescriptor, ClusterEventDescriptor import chip.exceptions import chip.interaction_model import chip.tlv - +from enum import Enum, unique import inspect import sys import logging @@ -33,6 +33,19 @@ import builtins +@unique +class EventTimestampType(Enum): + SYSTEM = 1 + EPOCH = 2 + + +@unique +class EventPriority(Enum): + DEBUG = 1 + INFO = 2 + CRITICAL = 3 + + @dataclass class AttributePath: EndpointId: int = None @@ -65,17 +78,76 @@ def __hash__(self): return str(self).__hash__() +@dataclass +class EventPath: + EndpointId: int = None + ClusterId: int = None + EventId: int = None + + def __init__(self, EndpointId: int = None, Cluster=None, Event=None, ClusterId=None, EventId=None): + self.EndpointId = EndpointId + if Cluster is not None: + # Wildcard read for a specific cluster + if (Event is not None) or (ClusterId is not None) or (EventId is not None): + raise Warning( + "Event, ClusterId and AttributeId is ignored when Cluster is specified") + self.ClusterId = Cluster.id + return + if Event is not None: + if (ClusterId is not None) or (EventId is not None): + raise Warning( + "ClusterId and EventId is ignored when Event is specified") + self.ClusterId = Event.cluster_id + self.EventId = Event.event_id + return + self.ClusterId = ClusterId + self.EventId = EventId + + def __str__(self) -> str: + return f"{self.EndpointId}/{self.EventId}/{self.EventId}" + + def __hash__(self): + return str(self).__hash__() + + @dataclass class AttributePathWithListIndex(AttributePath): ListIndex: int = None +@dataclass +class EventHeader: + EndpointId: int = None + Event: ClusterEventDescriptor = None + EventNumber: int = None + Priority: EventPriority = None + Timestamp: int = None + TimestampType: EventTimestampType = None + + def __init__(self, EndpointId: int = None, Event=None, EventNumber=None, Priority=None, Timestamp=None, TimestampType=None): + self.EndpointId = EndpointId + self.Event = Event + self.EventNumber = EventNumber + self.Priority = Priority + self.Timestamp = Timestamp + self.Timestamp = TimestampType + + def __str__(self) -> str: + return f"{self.EndpointId}/{self.Event.cluster_id}/{self.Event.event_id}/{self.EventNumber}/{self.Priority}/{self.Timestamp}/{self.TimestampType}" + + @dataclass class AttributeStatus: Path: AttributePath Status: Union[chip.interaction_model.Status, int] +@dataclass +class EventStatus: + Header: EventHeader + Status: chip.interaction_model.Status + + AttributeWriteResult = AttributeStatus @@ -85,12 +157,19 @@ class AttributeDescriptorWithEndpoint: Attribute: ClusterAttributeDescriptor +@dataclass +class EventDescriptorWithEndpoint: + EndpointId: int + Event: ClusterEventDescriptor + + @dataclass class AttributeWriteRequest(AttributeDescriptorWithEndpoint): Data: Any AttributeReadRequest = AttributeDescriptorWithEndpoint +EventReadRequest = EventDescriptorWithEndpoint @dataclass @@ -108,7 +187,12 @@ class ValueDecodeFailure: Reason: Exception = None +class EventReadResult(EventStatus): + Data: Any = None + + _AttributeIndex = {} +_EventIndex = {} def _BuildAttributeIndex(): @@ -154,14 +238,17 @@ def __init__(self, transaction: 'AsyncReadTransaction', subscriptionId, devCtrl) self._subscriptionId = subscriptionId self._devCtrl = devCtrl - def GetValue(self, path: Tuple[int, Type[ClusterAttributeDescriptor]]): + def GetAttributeValue(self, path: Tuple[int, Type[ClusterAttributeDescriptor]]): ''' Gets the attribute from cache, returns the value and the timestamp when it was updated last time. ''' - return self._read_transaction.GetValue(AttributePath(path[0], Attribute=path[1])) + return self._read_transaction.GetAttributeValue(AttributePath(path[0], Attribute=path[1])) + + def GetAllAttributeValues(self): + return self._read_transaction.GetAllAttributeValues() - def GetAllValues(self): - return self._read_transaction.GetAllValues() + def GetAllEventValues(self): + return self._read_transaction.GetAllEventValues() def SetAttributeUpdateCallback(self, callback: Callable[[AttributePath, Any], None]): ''' @@ -183,22 +270,48 @@ def __repr__(self): return f'' +def _BuildEventIndex(): + ''' Build internal event index for locating the corresponding cluster object by path in the future. + We do this because this operation will take a long time when there are lots of events, it takes about 300ms for a single query. + This is acceptable during init, but unacceptable when the server returns lots of events at the same time. + ''' + for clusterName, obj in inspect.getmembers(sys.modules['chip.clusters.Objects']): + if ('chip.clusters.Objects' in str(obj)) and inspect.isclass(obj): + for objName, subclass in inspect.getmembers(obj): + if inspect.isclass(subclass) and (('Events') in str(subclass)): + for eventName, event in inspect.getmembers(subclass): + if inspect.isclass(event): + base_classes = inspect.getmro(event) + + # Only match on classes that extend the ClusterEventescriptor class + matched = [ + value for value in base_classes if 'ClusterEventDescriptor' in str(value)] + if (matched == []): + continue + + _EventIndex[str(EventPath(ClusterId=event.cluster_id, EventId=event.event_id))] = eval( + 'chip.clusters.Objects.' + clusterName + '.Events.' + eventName) + + class AsyncReadTransaction: def __init__(self, future: Future, eventLoop, devCtrl): self._event_loop = eventLoop self._future = future self._subscription_handler = None - self._res = {} + self._res = {'Attributes': {}, 'Events': []} self._devCtrl = devCtrl # For subscriptions, the data comes from CHIP Thread, whild the value will be accessed from Python's thread, so a lock is required here. self._resLock = threading.Lock() - def GetValue(self, path: AttributePath): + def GetAttributeValue(self, path: AttributePath): with self._resLock: - return self._res.get(path) + return self._res['Attributes'].get(path) + + def GetAllAttributeValues(self): + return self._res['Attributes'] - def GetAllValues(self): - return self._res + def GetAllEventValues(self): + return self._res['Events'] def _handleAttributeData(self, path: AttributePathWithListIndex, status: int, data: bytes): try: @@ -239,7 +352,7 @@ def _handleAttributeData(self, path: AttributePathWithListIndex, status: int, da raise with self._resLock: - self._res[path] = AttributeReadResult( + self._res['Attributes'][path] = AttributeReadResult( Path=path, Status=imStatus, Data=attributeValue) if self._subscription_handler is not None: self._subscription_handler.OnUpdate( @@ -254,6 +367,43 @@ def handleAttributeData(self, path: AttributePath, status: int, data: bytes): self._event_loop.call_soon_threadsafe( self._handleAttributeData, path, status, data) + def _handleEventData(self, header: EventHeader, path: EventPath, data: bytes): + try: + eventType = _EventIndex.get(str(path), None) + eventValue = None + tlvData = chip.tlv.TLVReader(data).get().get("Any", {}) + if eventType is None: + eventValue = ValueDecodeFailure( + tlvData, LookupError("event schema not found")) + else: + try: + eventValue = eventType(eventType.FromTLV(data)) + except Exception as ex: + logging.error( + f"Error convering TLV to Cluster Object for path: Endpoint = {path.EndpointId}/Cluster = {path.ClusterId}/Event = {path.EventId}") + logging.error( + f"Failed Cluster Object: {str(eventType)}") + logging.error(ex) + eventValue = ValueDecodeFailure( + tlvData, ex) + + # If we're in debug mode, raise the exception so that we can better debug what's happening. + if (builtins.enableDebugMode): + raise + + with self._resLock: + self._res['Events'].append[EventReadResult( + Header=header, Data=eventValue)] + except Exception as ex: + logging.exception(ex) + + def handleEventData(self, header: EventHeader, path: EventPath, data: bytes): + if self._subscription_handler is not None: + self._handleEventData(header, path, data) + else: + self._event_loop.call_soon_threadsafe( + self._handleEventData, header, path, data) + def _handleError(self, chipError: int): self._future.set_exception( chip.exceptions.ChipStackError(chipError)) @@ -318,6 +468,8 @@ def handleDone(self): _OnReadAttributeDataCallbackFunct = CFUNCTYPE( None, py_object, c_uint16, c_uint32, c_uint32, c_uint32, c_void_p, c_size_t) _OnSubscriptionEstablishedCallbackFunct = CFUNCTYPE(None, py_object, c_uint64) +_OnReadEventDataCallbackFunct = CFUNCTYPE( + None, py_object, c_uint16, c_uint32, c_uint32, c_uint32, c_uint8, c_uint64, c_uint8, c_void_p, c_size_t) _OnReadErrorCallbackFunct = CFUNCTYPE( None, py_object, c_uint32) _OnReadDoneCallbackFunct = CFUNCTYPE( @@ -331,6 +483,14 @@ def _OnReadAttributeDataCallback(closure, endpoint: int, cluster: int, attribute EndpointId=endpoint, ClusterId=cluster, AttributeId=attribute), status, dataBytes[:]) +@_OnReadEventDataCallbackFunct +def _OnReadEventDataCallback(closure, endpoint: int, cluster: int, event: int, number: int, priority: int, timestamp: int, timestampType: int, data, len): + dataBytes = ctypes.string_at(data, len) + path = EventPath(ClusterId=cluster, EventId=event) + closure.handleEventData(EventHeader( + EndpointId=endpoint, EventNumber=number, Priority=EventPriority(priority), Timestamp=timestamp, TimestampType=EventTimestampType(timestampType)), path, dataBytes[:]) + + @_OnSubscriptionEstablishedCallbackFunct def _OnSubscriptionEstablishedCallback(closure, subscriptionId): closure.handleSubscriptionEstablished(subscriptionId) @@ -430,6 +590,39 @@ def ReadAttributes(future: Future, eventLoop, device, devCtrl, attributes: List[ return res +def ReadEvents(future: Future, eventLoop, device, devCtrl, events: List[EventPath], subscriptionParameters: SubscriptionParameters = None) -> int: + handle = chip.native.GetLibraryHandle() + transaction = AsyncReadTransaction(future, eventLoop, devCtrl) + + readargs = [] + for attr in events: + path = chip.interaction_model.EventPathIBstruct.parse( + b'\xff' * chip.interaction_model.EventPathIBstruct.sizeof()) + if attr.EndpointId is not None: + path.EndpointId = attr.EndpointId + if attr.ClusterId is not None: + path.ClusterId = attr.ClusterId + if attr.EventId is not None: + path.EventId = attr.EventId + path = chip.interaction_model.EventPathIBstruct.build(path) + readargs.append(ctypes.c_char_p(path)) + + ctypes.pythonapi.Py_IncRef(ctypes.py_object(transaction)) + minInterval = 0 + maxInterval = 0 + if subscriptionParameters is not None: + minInterval = subscriptionParameters.MinReportIntervalFloorSeconds + maxInterval = subscriptionParameters.MaxReportIntervalCeilingSeconds + res = handle.pychip_ReadClient_ReadEvents( + ctypes.py_object(transaction), device, + ctypes.c_bool(subscriptionParameters is not None), + ctypes.c_uint32(minInterval), ctypes.c_uint32(maxInterval), + ctypes.c_size_t(len(events)), *readargs) + if res != 0: + ctypes.pythonapi.Py_DecRef(ctypes.py_object(transaction)) + return res + + def Init(): handle = chip.native.GetLibraryHandle() @@ -443,11 +636,12 @@ def Init(): _OnWriteResponseCallbackFunct, _OnWriteErrorCallbackFunct, _OnWriteDoneCallbackFunct]) handle.pychip_ReadClient_ReadAttributes.restype = c_uint32 setter.Set('pychip_ReadClient_InitCallbacks', None, [ - _OnReadAttributeDataCallbackFunct, _OnSubscriptionEstablishedCallbackFunct, _OnReadErrorCallbackFunct, _OnReadDoneCallbackFunct]) + _OnReadAttributeDataCallbackFunct, _OnReadEventDataCallbackFunct, _OnSubscriptionEstablishedCallbackFunct, _OnReadErrorCallbackFunct, _OnReadDoneCallbackFunct]) handle.pychip_WriteClient_InitCallbacks( _OnWriteResponseCallback, _OnWriteErrorCallback, _OnWriteDoneCallback) handle.pychip_ReadClient_InitCallbacks( - _OnReadAttributeDataCallback, _OnSubscriptionEstablishedCallback, _OnReadErrorCallback, _OnReadDoneCallback) + _OnReadAttributeDataCallback, _OnReadEventDataCallback, _OnSubscriptionEstablishedCallback, _OnReadErrorCallback, _OnReadDoneCallback) _BuildAttributeIndex() + _BuildEventIndex() diff --git a/src/controller/python/chip/clusters/attribute.cpp b/src/controller/python/chip/clusters/attribute.cpp index ff46592fafec99..99c4cced39a798 100644 --- a/src/controller/python/chip/clusters/attribute.cpp +++ b/src/controller/python/chip/clusters/attribute.cpp @@ -49,15 +49,26 @@ struct __attribute__((packed)) AttributePath chip::AttributeId attributeId; }; +struct __attribute__((packed)) EventPath +{ + chip::EndpointId endpointId; + chip::ClusterId clusterId; + chip::EventId eventId; +}; + using OnReadAttributeDataCallback = void (*)(PyObject * appContext, chip::EndpointId endpointId, chip::ClusterId clusterId, chip::AttributeId attributeId, std::underlying_type_t imstatus, uint8_t * data, uint32_t dataLen); +using OnReadEventDataCallback = void (*)(PyObject * appContext, chip::EndpointId endpointId, chip::ClusterId clusterId, + chip::EventId eventId, chip::EventNumber eventNumber, uint8_t priority, uint64_t timestamp, + uint8_t timestampType, uint8_t * data, uint32_t dataLen); using OnSubscriptionEstablishedCallback = void (*)(PyObject * appContext, uint64_t subscriptionId); using OnReadErrorCallback = void (*)(PyObject * appContext, uint32_t chiperror); using OnReadDoneCallback = void (*)(PyObject * appContext); OnReadAttributeDataCallback gOnReadAttributeDataCallback = nullptr; +OnReadEventDataCallback gOnReadEventDataCallback = nullptr; OnSubscriptionEstablishedCallback gOnSubscriptionEstablishedCallback = nullptr; OnReadErrorCallback gOnReadErrorCallback = nullptr; OnReadDoneCallback gOnReadDoneCallback = nullptr; @@ -107,6 +118,40 @@ class ReadClientCallback : public ReadClient::Callback gOnSubscriptionEstablishedCallback(mAppContext, apReadClient->GetSubscriptionId().ValueOr(0)); } + void OnEventData(const ReadClient * apReadClient, const EventHeader & aEventHeader, TLV::TLVReader * apData, + const StatusIB * apStatus) override + { + uint8_t buffer[CHIP_CONFIG_DEFAULT_UDP_MTU_SIZE]; + uint32_t size = 0; + CHIP_ERROR err = CHIP_NO_ERROR; + // When the apData is nullptr, means we did not receive a valid event data from server, status will be some error + // status. + if (apData != nullptr) + { + // The TLVReader's read head is not pointing to the first element in the container instead of the container itself, use + // a TLVWriter to get a TLV with a normalized TLV buffer (Wrapped with a anonymous tag, no extra "end of container" tag + // at the end.) + TLV::TLVWriter writer; + writer.Init(buffer); + err = writer.CopyElement(TLV::AnonymousTag, *apData); + if (err != CHIP_NO_ERROR) + { + this->OnError(apReadClient, err); + return; + } + size = writer.GetLengthWritten(); + } + else + { + err = CHIP_ERROR_INCORRECT_STATE; + this->OnError(apReadClient, err); + } + + gOnReadEventDataCallback(mAppContext, aEventHeader.mPath.mEndpointId, aEventHeader.mPath.mClusterId, + aEventHeader.mPath.mEventId, aEventHeader.mEventNumber, to_underlying(aEventHeader.mPriorityLevel), + aEventHeader.mTimestamp.mValue, to_underlying(aEventHeader.mTimestamp.mType), buffer, size); + } + void OnError(const ReadClient * apReadClient, CHIP_ERROR aError) override { gOnReadErrorCallback(mAppContext, aError.AsInteger()); @@ -175,10 +220,12 @@ void pychip_WriteClient_InitCallbacks(OnWriteResponseCallback onWriteResponseCal } void pychip_ReadClient_InitCallbacks(OnReadAttributeDataCallback onReadAttributeDataCallback, + OnReadEventDataCallback onReadEventDataCallback, OnSubscriptionEstablishedCallback onSubscriptionEstablishedCallback, OnReadErrorCallback onReadErrorCallback, OnReadDoneCallback onReadDoneCallback) { gOnReadAttributeDataCallback = onReadAttributeDataCallback; + gOnReadEventDataCallback = onReadEventDataCallback; gOnSubscriptionEstablishedCallback = onSubscriptionEstablishedCallback; gOnReadErrorCallback = onReadErrorCallback; gOnReadDoneCallback = onReadDoneCallback; @@ -288,6 +335,67 @@ chip::ChipError::StorageType pychip_ReadClient_ReadAttributes(void * appContext, callback.release(); +exit: + va_end(args); + return err.AsInteger(); +} + +chip::ChipError::StorageType pychip_ReadClient_ReadEvents(void * appContext, DeviceProxy * device, bool isSubscription, + uint32_t minInterval, uint32_t maxInterval, size_t n, ...) +{ + CHIP_ERROR err = CHIP_NO_ERROR; + + std::unique_ptr callback = std::make_unique(appContext); + + va_list args; + va_start(args, n); + + std::unique_ptr readPaths(new EventPathParams[n]); + + { + for (size_t i = 0; i < n; i++) + { + void * path = va_arg(args, void *); + + python::EventPath pathObj; + memcpy(&pathObj, path, sizeof(python::EventPath)); + + readPaths[i] = EventPathParams(pathObj.endpointId, pathObj.clusterId, pathObj.eventId); + } + } + + Optional session = device->GetSecureSession(); + ReadClient * readClient; + + VerifyOrExit(session.HasValue(), err = CHIP_ERROR_NOT_CONNECTED); + { + app::InteractionModelEngine::GetInstance()->NewReadClient( + &readClient, isSubscription ? ReadClient::InteractionType::Subscribe : ReadClient::InteractionType::Read, + callback.get()); + ReadPrepareParams params(session.Value()); + params.mpEventPathParamsList = readPaths.get(); + params.mEventPathParamsListSize = n; + + if (isSubscription) + { + params.mMinIntervalFloorSeconds = minInterval; + params.mMaxIntervalCeilingSeconds = maxInterval; + + err = readClient->SendSubscribeRequest(params); + } + else + { + err = readClient->SendReadRequest(params); + } + + if (err != CHIP_NO_ERROR) + { + readClient->Shutdown(); + } + } + + callback.release(); + exit: va_end(args); return err.AsInteger(); diff --git a/src/controller/python/chip/exceptions/__init__.py b/src/controller/python/chip/exceptions/__init__.py index 3d16a05dd80c4b..6b1969f1efe5c4 100644 --- a/src/controller/python/chip/exceptions/__init__.py +++ b/src/controller/python/chip/exceptions/__init__.py @@ -103,3 +103,12 @@ def __init__(self, cluster: str, attribute: str): def __str__(self): return "UnknownAttribute: cluster: {}, attribute: {}".format(self.cluster, self.attribute) + + +class UnknownEvent(ClusterError): + def __init__(self, cluster: str, event: str): + self.cluster = cluster + self.event = event + + def __str__(self): + return "UnknownEvent: cluster: {}, event: {}".format(self.cluster, self.event) diff --git a/src/controller/python/chip/interaction_model/Delegate.h b/src/controller/python/chip/interaction_model/Delegate.h index c27cf181b14d27..1e20e8a6508031 100644 --- a/src/controller/python/chip/interaction_model/Delegate.h +++ b/src/controller/python/chip/interaction_model/Delegate.h @@ -104,6 +104,10 @@ class PythonInteractionModelDelegate : public chip::Controller::DeviceController void OnAttributeData(const app::ReadClient * apReadClient, const app::ConcreteDataAttributePath & aPath, TLV::TLVReader * apData, const app::StatusIB & status) override; + void OnEventData(const app::ReadClient * apReadClient, const app::EventHeader & aEventHeader, TLV::TLVReader * apData, + const app::StatusIB * apStatus) override + {} + static PythonInteractionModelDelegate & Instance(); void SetOnCommandResponseStatusCodeReceivedCallback(PythonInteractionModelDelegate_OnCommandResponseStatusCodeReceivedFunct f) diff --git a/src/controller/python/chip/interaction_model/__init__.py b/src/controller/python/chip/interaction_model/__init__.py index 0ef2d861c18fc8..5277df326d0f4c 100644 --- a/src/controller/python/chip/interaction_model/__init__.py +++ b/src/controller/python/chip/interaction_model/__init__.py @@ -22,7 +22,7 @@ """Provides Python APIs for CHIP.""" import enum -from .delegate import OnSubscriptionReport, SetAttributeReportCallback, AttributePath, AttributePathIBstruct +from .delegate import OnSubscriptionReport, SetAttributeReportCallback, AttributePath, AttributePathIBstruct, EventPath, EventPathIBstruct from chip.exceptions import ChipStackException diff --git a/src/controller/python/chip/interaction_model/delegate.py b/src/controller/python/chip/interaction_model/delegate.py index bb28c44de165f6..5e246c6408682c 100644 --- a/src/controller/python/chip/interaction_model/delegate.py +++ b/src/controller/python/chip/interaction_model/delegate.py @@ -52,6 +52,13 @@ "AttributeId" / Int32ul, ) +# EventPath should not contain padding +EventPathIBstruct = Struct( + "EndpointId" / Int16ul, + "ClusterId" / Int32ul, + "EventId" / Int32ul, +) + @dataclass class AttributePath: @@ -61,6 +68,14 @@ class AttributePath: attributeId: int +@dataclass +class EventPath: + nodeId: int + endpointId: int + clusterId: int + eventId: int + + @dataclass class AttributeReadResult: path: AttributePath @@ -68,6 +83,12 @@ class AttributeReadResult: value: 'typing.Any' +@dataclass +class EventReadResult: + path: EventPath + value: 'typing.Any' + + @dataclass class AttributeWriteResult: path: AttributePath diff --git a/src/controller/python/test/test_scripts/cluster_objects.py b/src/controller/python/test/test_scripts/cluster_objects.py index 8774fb1720e49f..ccc79a04cb5d50 100644 --- a/src/controller/python/test/test_scripts/cluster_objects.py +++ b/src/controller/python/test/test_scripts/cluster_objects.py @@ -29,14 +29,18 @@ LIGHTING_ENDPOINT_ID = 1 -def _AssumeDecodeSuccess(values): - for k, v in values.items(): +def _AssumeAttributesDecodeSuccess(values): + for k, v in values['Attributes'].items(): print(f"{k} = {v}") if isinstance(v.Data, ValueDecodeFailure): raise AssertionError( f"Cannot decode value for path {k}, got error: '{str(v.Data.Reason)}', raw TLV data: '{v.Data.TLVValue}'") +def _AssumeEventsDecodeSuccess(values): + print(f"Dump the events: {values} ") + + class ClusterObjectTests: @classmethod def TestAPI(cls): @@ -127,7 +131,7 @@ def subUpdate(path, value): raise AssertionError("Did not receive updated attribute") @classmethod - async def TestReadRequests(cls, devCtrl): + async def TestReadAttributeRequests(cls, devCtrl): ''' Tests out various permutations of endpoint, cluster and attribute ID (with wildcards) to validate reads. @@ -143,40 +147,68 @@ async def TestReadRequests(cls, devCtrl): (0, Clusters.Basic.Attributes.HardwareVersion), ] res = await devCtrl.ReadAttribute(nodeid=NODE_ID, attributes=req) - if (len(res) != 3): + if (len(res['Attributes']) != 3): raise AssertionError( - f"Got back {len(res)} data items instead of 3") - _AssumeDecodeSuccess(res) + f"Got back {len(res['Attributes'])} data items instead of 3") + _AssumeAttributesDecodeSuccess(res) logger.info("2: Reading Ex Cx A*") req = [ (0, Clusters.Basic), ] - _AssumeDecodeSuccess(await devCtrl.ReadAttribute(nodeid=NODE_ID, attributes=req)) + _AssumeAttributesDecodeSuccess(await devCtrl.ReadAttribute(nodeid=NODE_ID, attributes=req)) logger.info("3: Reading E* Cx Ax") req = [ Clusters.Descriptor.Attributes.ServerList ] - _AssumeDecodeSuccess(await devCtrl.ReadAttribute(nodeid=NODE_ID, attributes=req)) + _AssumeAttributesDecodeSuccess(await devCtrl.ReadAttribute(nodeid=NODE_ID, attributes=req)) logger.info("4: Reading Ex C* A*") req = [ 0 ] - _AssumeDecodeSuccess(await devCtrl.ReadAttribute(nodeid=NODE_ID, attributes=req)) + _AssumeAttributesDecodeSuccess(await devCtrl.ReadAttribute(nodeid=NODE_ID, attributes=req)) logger.info("5: Reading E* Cx A*") req = [ Clusters.Descriptor ] - _AssumeDecodeSuccess(await devCtrl.ReadAttribute(nodeid=NODE_ID, attributes=req)) + _AssumeAttributesDecodeSuccess(await devCtrl.ReadAttribute(nodeid=NODE_ID, attributes=req)) logger.info("6: Reading E* C* A*") req = [ '*' ] - _AssumeDecodeSuccess(await devCtrl.ReadAttribute(nodeid=NODE_ID, attributes=req)) + _AssumeAttributesDecodeSuccess(await devCtrl.ReadAttribute(nodeid=NODE_ID, attributes=req)) + + @classmethod + async def TestReadEventRequests(cls, devCtrl): + logger.info("1: Reading Ex Cx Ex") + req = [ + (0, Clusters.TestCluster.Events.TestEvent), + ] + _AssumeEventsDecodeSuccess(await devCtrl.ReadEvent(nodeid=NODE_ID, events=req)) + + logger.info("2: Reading Ex Cx E*") + req = [ + (0, Clusters.TestCluster), + ] + _AssumeEventsDecodeSuccess(await devCtrl.ReadEvent(nodeid=NODE_ID, events=req)) + + logger.info("3: Reading Ex C* E*") + req = [ + 0 + ] + _AssumeEventsDecodeSuccess(await devCtrl.ReadEvent(nodeid=NODE_ID, events=req)) + + logger.info("4: Reading E* C* E*") + req = [ + '*' + ] + _AssumeEventsDecodeSuccess(await devCtrl.ReadEvent(nodeid=NODE_ID, events=req)) + + # TODO: Add more wildcard test for IM events. @classmethod async def RunTest(cls, devCtrl): @@ -186,7 +218,8 @@ async def RunTest(cls, devCtrl): await cls.RoundTripTestWithBadEndpoint(devCtrl) await cls.SendCommandWithResponse(devCtrl) await cls.SendWriteRequest(devCtrl) - await cls.TestReadRequests(devCtrl) + await cls.TestReadAttributeRequests(devCtrl) + await cls.TestReadEventRequests(devCtrl) await cls.TestSubscribeAttribute(devCtrl) except Exception as ex: logger.error(