From 6b1d52a357a5d7ae94bb8651b1dfba7440adfa4e Mon Sep 17 00:00:00 2001 From: Jerry Johns Date: Wed, 20 Jul 2022 18:17:26 -0700 Subject: [PATCH] Python: Support for specifying absolute timeouts for Invoke/Write (#20531) * Python: Support for specifying absolute timeouts for Invoke/Write This adds support for specifying an absolute timeout for ChipDeviceController.SendCommand and ChipDeviceController.WriteAttribute methods. This timeout is used to bound both mDNS discovery and the subsequent IM interaction. * Review feedback * Review feedback --- src/controller/python/BUILD.gn | 1 + src/controller/python/chip/ChipDeviceCtrl.py | 43 ++++++++++--- src/controller/python/chip/ChipStack.py | 15 +++-- .../python/chip/clusters/Attribute.py | 4 +- .../python/chip/clusters/Command.py | 9 ++- .../python/chip/clusters/attribute.cpp | 14 +++-- .../python/chip/clusters/command.cpp | 13 +++- .../python/chip/utils/DeviceProxyUtils.cpp | 63 +++++++++++++++++++ 8 files changed, 137 insertions(+), 25 deletions(-) create mode 100644 src/controller/python/chip/utils/DeviceProxyUtils.cpp diff --git a/src/controller/python/BUILD.gn b/src/controller/python/BUILD.gn index 5297f0d2fba9c8..6e8c612c6e2e2a 100644 --- a/src/controller/python/BUILD.gn +++ b/src/controller/python/BUILD.gn @@ -67,6 +67,7 @@ shared_library("ChipDeviceCtrl") { "chip/internal/ChipThreadWork.h", "chip/internal/CommissionerImpl.cpp", "chip/logging/LoggingRedirect.cpp", + "chip/utils/DeviceProxyUtils.cpp", ] } else { sources += [ diff --git a/src/controller/python/chip/ChipDeviceCtrl.py b/src/controller/python/chip/ChipDeviceCtrl.py index edc2944afea6aa..5aa1c14efca6df 100644 --- a/src/controller/python/chip/ChipDeviceCtrl.py +++ b/src/controller/python/chip/ChipDeviceCtrl.py @@ -563,7 +563,7 @@ def GetClusterHandler(self): return self._Cluster - def GetConnectedDeviceSync(self, nodeid, allowPASE=True): + def GetConnectedDeviceSync(self, nodeid, allowPASE=True, timeoutMs: int = None): self.CheckIsActive() returnDevice = c_void_p(None) @@ -582,13 +582,13 @@ def DeviceAvailableCallback(device, err): if allowPASE: res = self._ChipStack.Call(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned( - self.devCtrl, nodeid, byref(returnDevice))) + self.devCtrl, nodeid, byref(returnDevice)), timeoutMs) if res == 0: print('Using PASE connection') return returnDevice res = self._ChipStack.Call(lambda: self._dmLib.pychip_GetConnectedDeviceByNodeId( - self.devCtrl, nodeid, DeviceAvailableCallback)) + self.devCtrl, nodeid, DeviceAvailableCallback), timeoutMs) if res != 0: raise self._ChipStack.ErrorToException(res) @@ -596,43 +596,66 @@ def DeviceAvailableCallback(device, err): # Check if the device is already set before waiting for the callback. if returnDevice.value is None: with deviceAvailableCV: - deviceAvailableCV.wait() + timeout = None + if (timeoutMs): + timeout = float(timeoutMs) / 1000 + + ret = deviceAvailableCV.wait(timeout) + if ret is False: + raise TimeoutError("Timed out waiting for DNS-SD resolution") if returnDevice.value is None: raise self._ChipStack.ErrorToException(returnErr) return returnDevice - async def SendCommand(self, nodeid: int, endpoint: int, payload: ClusterObjects.ClusterCommand, responseType=None, timedRequestTimeoutMs: int = None): + def ComputeRoundTripTimeout(self, nodeid, upperLayerProcessingTimeoutMs: int = 0): + ''' Returns a computed timeout value based on the round-trip time it takes for the peer at the other end of the session to + receive a message, process it and send it back. This is computed based on the session type, the type of transport, sleepy + characteristics of the target and a caller-provided value for the time it takes to process a message at the upper layer on + the target For group sessions. + + This will result in a session being established if one wasn't already. + ''' + device = self.GetConnectedDeviceSync(nodeid) + res = self._ChipStack.Call(lambda: self._dmLib.pychip_DeviceProxy_ComputeRoundTripTimeout( + device, upperLayerProcessingTimeoutMs)) + return res + + async def SendCommand(self, nodeid: int, endpoint: int, payload: ClusterObjects.ClusterCommand, responseType=None, timedRequestTimeoutMs: int = None, interactionTimeoutMs: int = None): ''' Send a cluster-object encapsulated command to a node and get returned a future that can be awaited upon to receive the response. If a valid responseType is passed in, that will be used to deserialize the object. If not, the type will be automatically deduced from the metadata received over the wire. timedWriteTimeoutMs: Timeout for a timed invoke request. Omit or set to 'None' to indicate a non-timed request. + interactionTimeoutMs: Overall timeout for the interaction. Omit or set to 'None' to have the SDK automatically compute the right + timeout value based on transport characteristics as well as the responsiveness of the target. ''' self.CheckIsActive() eventLoop = asyncio.get_running_loop() future = eventLoop.create_future() - device = self.GetConnectedDeviceSync(nodeid) + device = self.GetConnectedDeviceSync(nodeid, timeoutMs=interactionTimeoutMs) res = ClusterCommand.SendCommand( future, eventLoop, responseType, device, ClusterCommand.CommandPath( EndpointId=endpoint, ClusterId=payload.cluster_id, CommandId=payload.command_id, - ), payload, timedRequestTimeoutMs=timedRequestTimeoutMs) + ), payload, timedRequestTimeoutMs=timedRequestTimeoutMs, interactionTimeoutMs=interactionTimeoutMs) if res != 0: future.set_exception(self._ChipStack.ErrorToException(res)) return await future - async def WriteAttribute(self, nodeid: int, attributes: typing.List[typing.Tuple[int, ClusterObjects.ClusterAttributeDescriptor, int]], timedRequestTimeoutMs: int = None): + async def WriteAttribute(self, nodeid: int, attributes: typing.List[typing.Tuple[int, ClusterObjects.ClusterAttributeDescriptor, int]], timedRequestTimeoutMs: int = None, interactionTimeoutMs: int = None): ''' Write a list of attributes on a target node. nodeId: Target's Node ID timedWriteTimeoutMs: Timeout for a timed write request. Omit or set to 'None' to indicate a non-timed request. attributes: A list of tuples of type (endpoint, cluster-object): + interactionTimeoutMs: Overall timeout for the interaction. Omit or set to 'None' to have the SDK automatically compute the right + timeout value based on transport characteristics as well as the responsiveness of the target. E.g (1, Clusters.TestCluster.Attributes.XYZAttribute('hello')) -- Write 'hello' to the XYZ attribute on the test cluster to endpoint 1 @@ -642,7 +665,7 @@ async def WriteAttribute(self, nodeid: int, attributes: typing.List[typing.Tuple eventLoop = asyncio.get_running_loop() future = eventLoop.create_future() - device = self.GetConnectedDeviceSync(nodeid) + device = self.GetConnectedDeviceSync(nodeid, timeoutMs=interactionTimeoutMs) attrs = [] for v in attributes: @@ -654,7 +677,7 @@ async def WriteAttribute(self, nodeid: int, attributes: typing.List[typing.Tuple v[0], v[1], v[2], 1, v[1].value)) res = ClusterAttribute.WriteAttributes( - future, eventLoop, device, attrs, timedRequestTimeoutMs=timedRequestTimeoutMs) + future, eventLoop, device, attrs, timedRequestTimeoutMs=timedRequestTimeoutMs, interactionTimeoutMs=interactionTimeoutMs) if res != 0: raise self._ChipStack.ErrorToException(res) return await future diff --git a/src/controller/python/chip/ChipStack.py b/src/controller/python/chip/ChipStack.py index ef1cd597d9ef9e..ffae3d2b04fd25 100644 --- a/src/controller/python/chip/ChipStack.py +++ b/src/controller/python/chip/ChipStack.py @@ -151,10 +151,17 @@ def __call__(self): self._cv.notify_all() pythonapi.Py_DecRef(py_object(self)) - def Wait(self): + def Wait(self, timeoutMs: int = None): + timeout = None + if timeoutMs is not None: + timeout = float(timeoutMs) / 1000 + with self._cv: while self._finish is False: - self._cv.wait() + res = self._cv.wait(timeout) + if res is False: + raise TimeoutError("Timed out waiting for task to finish executing on the Matter thread") + if self._exc is not None: raise self._exc return self._res @@ -335,7 +342,7 @@ def Shutdown(self): self.devMgr = None self.callbackRes = None - def Call(self, callFunct): + def Call(self, callFunct, timeoutMs: int = None): '''Run a Python function on CHIP stack, and wait for the response. This function is a wrapper of PostTaskOnChipThread, which includes some handling of application specific logics. Calling this function on CHIP on CHIP mainloop thread will cause deadlock. @@ -344,7 +351,7 @@ def Call(self, callFunct): self.callbackRes = None self.completeEvent.clear() with self.networkLock: - res = self.PostTaskOnChipThread(callFunct).Wait() + res = self.PostTaskOnChipThread(callFunct).Wait(timeoutMs) self.completeEvent.set() if res == 0 and self.callbackRes != None: return self.callbackRes diff --git a/src/controller/python/chip/clusters/Attribute.py b/src/controller/python/chip/clusters/Attribute.py index a11d324d5890a4..87fd371cf963c3 100644 --- a/src/controller/python/chip/clusters/Attribute.py +++ b/src/controller/python/chip/clusters/Attribute.py @@ -873,7 +873,7 @@ def _OnWriteDoneCallback(closure): closure.handleDone() -def WriteAttributes(future: Future, eventLoop, device, attributes: List[AttributeWriteRequest], timedRequestTimeoutMs: int = None) -> int: +def WriteAttributes(future: Future, eventLoop, device, attributes: List[AttributeWriteRequest], timedRequestTimeoutMs: int = None, interactionTimeoutMs: int = None) -> int: handle = chip.native.GetLibraryHandle() writeargs = [] @@ -898,7 +898,7 @@ def WriteAttributes(future: Future, eventLoop, device, attributes: List[Attribut ctypes.pythonapi.Py_IncRef(ctypes.py_object(transaction)) res = builtins.chipStack.Call( lambda: handle.pychip_WriteClient_WriteAttributes( - ctypes.py_object(transaction), device, ctypes.c_uint16(0 if timedRequestTimeoutMs is None else timedRequestTimeoutMs), ctypes.c_size_t(len(attributes)), *writeargs)) + ctypes.py_object(transaction), device, ctypes.c_uint16(0 if timedRequestTimeoutMs is None else timedRequestTimeoutMs), ctypes.c_uint16(0 if interactionTimeoutMs is None else interactionTimeoutMs), ctypes.c_size_t(len(attributes)), *writeargs)) if res != 0: ctypes.pythonapi.Py_DecRef(ctypes.py_object(transaction)) return res diff --git a/src/controller/python/chip/clusters/Command.py b/src/controller/python/chip/clusters/Command.py index 7d53b21784ff5e..74101d4edb47cf 100644 --- a/src/controller/python/chip/clusters/Command.py +++ b/src/controller/python/chip/clusters/Command.py @@ -145,13 +145,18 @@ def _OnCommandSenderDoneCallback(closure): ctypes.pythonapi.Py_DecRef(ctypes.py_object(closure)) -def SendCommand(future: Future, eventLoop, responseType: Type, device, commandPath: CommandPath, payload: ClusterCommand, timedRequestTimeoutMs: int = None) -> int: +def SendCommand(future: Future, eventLoop, responseType: Type, device, commandPath: CommandPath, payload: ClusterCommand, timedRequestTimeoutMs: int = None, interactionTimeoutMs: int = None) -> int: ''' Send a cluster-object encapsulated command to a device and does the following: - On receipt of a successful data response, returns the cluster-object equivalent through the provided future. - None (on a successful response containing no data) - Raises an exception if any errors are encountered. If no response type is provided above, the type will be automatically deduced. + + If a valid timedRequestTimeoutMs is provided, a timed interaction will be initiated instead. + If a valid interactionTimeoutMs is provided, the interaction will terminate with a CHIP_ERROR_TIMEOUT if a response + has not been received within that timeout. If it isn't provided, a sensible value will be automatically computed that + accounts for the underlying characteristics of both the transport and the responsiveness of the receiver. ''' if (responseType is not None) and (not issubclass(responseType, ClusterCommand)): raise ValueError("responseType must be a ClusterCommand or None") @@ -166,7 +171,7 @@ def SendCommand(future: Future, eventLoop, responseType: Type, device, commandPa ctypes.pythonapi.Py_IncRef(ctypes.py_object(transaction)) return builtins.chipStack.Call( lambda: handle.pychip_CommandSender_SendCommand(ctypes.py_object( - transaction), device, c_uint16(0 if timedRequestTimeoutMs is None else timedRequestTimeoutMs), commandPath.EndpointId, commandPath.ClusterId, commandPath.CommandId, payloadTLV, len(payloadTLV))) + transaction), device, c_uint16(0 if timedRequestTimeoutMs is None else timedRequestTimeoutMs), commandPath.EndpointId, commandPath.ClusterId, commandPath.CommandId, payloadTLV, len(payloadTLV), ctypes.c_uint16(0 if interactionTimeoutMs is None else interactionTimeoutMs))) def Init(): diff --git a/src/controller/python/chip/clusters/attribute.cpp b/src/controller/python/chip/clusters/attribute.cpp index e4480a3bb1e059..59de72d01ffef5 100644 --- a/src/controller/python/chip/clusters/attribute.cpp +++ b/src/controller/python/chip/clusters/attribute.cpp @@ -29,6 +29,8 @@ #include #include +#include + using namespace chip; using namespace chip::app; @@ -241,7 +243,8 @@ struct __attribute__((packed)) PyReadAttributeParams // Encodes n attribute write requests, follows 3 * n arguments, in the (AttributeWritePath*=void *, uint8_t*, size_t) order. chip::ChipError::StorageType pychip_WriteClient_WriteAttributes(void * appContext, DeviceProxy * device, - uint16_t timedWriteTimeoutMs, size_t n, ...); + uint16_t timedWriteTimeoutMs, uint16_t interactionTimeoutMs, + size_t n, ...); chip::ChipError::StorageType pychip_ReadClient_ReadAttributes(void * appContext, ReadClient ** pReadClient, ReadClientCallback ** pCallback, DeviceProxy * device, uint8_t * readParamsBuf, size_t n, size_t total, ...); @@ -319,7 +322,8 @@ void pychip_ReadClient_InitCallbacks(OnReadAttributeDataCallback onReadAttribute } chip::ChipError::StorageType pychip_WriteClient_WriteAttributes(void * appContext, DeviceProxy * device, - uint16_t timedWriteTimeoutMs, size_t n, ...) + uint16_t timedWriteTimeoutMs, uint16_t interactionTimeoutMs, + size_t n, ...) { CHIP_ERROR err = CHIP_NO_ERROR; @@ -331,7 +335,7 @@ chip::ChipError::StorageType pychip_WriteClient_WriteAttributes(void * appContex va_list args; va_start(args, n); - VerifyOrExit(device != nullptr && device->GetSecureSession().HasValue(), err = CHIP_ERROR_INCORRECT_STATE); + VerifyOrExit(device != nullptr && device->GetSecureSession().HasValue(), err = CHIP_ERROR_MISSING_SECURE_SESSION); { for (size_t i = 0; i < n; i++) @@ -359,7 +363,9 @@ chip::ChipError::StorageType pychip_WriteClient_WriteAttributes(void * appContex } } - SuccessOrExit(err = client->SendWriteRequest(device->GetSecureSession().Value())); + SuccessOrExit(err = client->SendWriteRequest(device->GetSecureSession().Value(), + interactionTimeoutMs != 0 ? System::Clock::Milliseconds32(interactionTimeoutMs) + : System::Clock::kZero)); client.release(); callback.release(); diff --git a/src/controller/python/chip/clusters/command.cpp b/src/controller/python/chip/clusters/command.cpp index e87a2252048b26..342e624698ed18 100644 --- a/src/controller/python/chip/clusters/command.cpp +++ b/src/controller/python/chip/clusters/command.cpp @@ -35,7 +35,8 @@ extern "C" { chip::ChipError::StorageType pychip_CommandSender_SendCommand(void * appContext, DeviceProxy * device, uint16_t timedRequestTimeoutMs, chip::EndpointId endpointId, chip::ClusterId clusterId, chip::CommandId commandId, - const uint8_t * payload, size_t length); + const uint8_t * payload, size_t length, + uint16_t interactionTimeoutMs); } namespace chip { @@ -127,10 +128,12 @@ void pychip_CommandSender_InitCallbacks(OnCommandSenderResponseCallback onComman chip::ChipError::StorageType pychip_CommandSender_SendCommand(void * appContext, DeviceProxy * device, uint16_t timedRequestTimeoutMs, chip::EndpointId endpointId, chip::ClusterId clusterId, chip::CommandId commandId, - const uint8_t * payload, size_t length) + const uint8_t * payload, size_t length, uint16_t interactionTimeoutMs) { CHIP_ERROR err = CHIP_NO_ERROR; + VerifyOrReturnError(device->GetSecureSession().HasValue(), CHIP_ERROR_MISSING_SECURE_SESSION.AsInteger()); + std::unique_ptr callback = std::make_unique(appContext); std::unique_ptr sender = std::make_unique(callback.get(), device->GetExchangeManager(), /* is timed request */ timedRequestTimeoutMs != 0); @@ -151,7 +154,11 @@ chip::ChipError::StorageType pychip_CommandSender_SendCommand(void * appContext, SuccessOrExit(err = sender->FinishCommand(timedRequestTimeoutMs != 0 ? Optional(timedRequestTimeoutMs) : Optional::Missing())); - SuccessOrExit(err = device->SendCommands(sender.get())); + + SuccessOrExit(err = sender->SendCommandRequest(device->GetSecureSession().Value(), + interactionTimeoutMs != 0 + ? MakeOptional(System::Clock::Milliseconds32(interactionTimeoutMs)) + : Optional::Missing())); sender.release(); callback.release(); diff --git a/src/controller/python/chip/utils/DeviceProxyUtils.cpp b/src/controller/python/chip/utils/DeviceProxyUtils.cpp new file mode 100644 index 00000000000000..ea6add7a039609 --- /dev/null +++ b/src/controller/python/chip/utils/DeviceProxyUtils.cpp @@ -0,0 +1,63 @@ +/* + * + * Copyright (c) 2020-2022 Project CHIP Authors + * Copyright (c) 2019-2020 Google LLC. + * Copyright (c) 2013-2018 Nest Labs, Inc. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file + * Implementation of the native methods expected by the Python + * version of Chip Device Manager. + * + */ + +#include "system/SystemClock.h" +#include +#include +#include + +using namespace chip; + +static_assert(std::is_same::value, "python assumes CHIP_ERROR maps to c_uint32"); + +extern "C" { + +/** + * @brief + * + * This computes the value for a timeout based on the round trip time it takes for a message to be sent to a peer, + * the message to be processed given the upperLayerProcessingTimeoutMs argument, and a response to come back. + * + * See Session::ComputeRoundTripTimeout for more specific details. + * + * A valid DeviceProxy pointer with a valid, established session is required for this method. + * + * + */ +uint32_t pychip_DeviceProxy_ComputeRoundTripTimeout(DeviceProxy * device, uint32_t upperLayerProcessingTimeoutMs) +{ + VerifyOrDie(device != nullptr); + + auto * deviceProxy = static_cast(device); + VerifyOrDie(deviceProxy->GetSecureSession().HasValue()); + + return deviceProxy->GetSecureSession() + .Value() + ->ComputeRoundTripTimeout(System::Clock::Milliseconds32(upperLayerProcessingTimeoutMs)) + .count(); +} +}