Skip to content

Commit

Permalink
Python: Support for specifying absolute timeouts for Invoke/Write
Browse files Browse the repository at this point in the history
This adds support for specifying an absolute timeout for
ChipDeviceController.SendCommand and ChipDeviceController.WriteAttribute
methods. This timeout is used to bound both mDNS discovery and
the subsequent IM interaction.
  • Loading branch information
mrjerryjohns committed Jul 9, 2022
1 parent e6e43fa commit d5e9940
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 25 deletions.
30 changes: 20 additions & 10 deletions src/controller/python/chip/ChipDeviceCtrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ def GetClusterHandler(self):

return self._Cluster

def GetConnectedDeviceSync(self, nodeid, allowPASE=True):
def GetConnectedDeviceSync(self, nodeid, allowPASE=True, timeoutMs: int = None):
self.CheckIsActive()

returnDevice = c_void_p(None)
Expand All @@ -567,57 +567,67 @@ def DeviceAvailableCallback(device, err):

if allowPASE:
res = self._ChipStack.Call(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned(
self.devCtrl, nodeid, byref(returnDevice)))
self.devCtrl, nodeid, byref(returnDevice)), timeoutMs)
if res == 0:
print('Using PASE connection')
return returnDevice

res = self._ChipStack.Call(lambda: self._dmLib.pychip_GetConnectedDeviceByNodeId(
self.devCtrl, nodeid, DeviceAvailableCallback))
self.devCtrl, nodeid, DeviceAvailableCallback), timeoutMs)
if res != 0:
raise self._ChipStack.ErrorToException(res)

# The callback might have been received synchronously (during self._ChipStack.Call()).
# Check if the device is already set before waiting for the callback.
if returnDevice.value is None:
with deviceAvailableCV:
deviceAvailableCV.wait()
timeout = None
if (timeoutMs):
timeout = float(timeoutMs) / 1000

ret = deviceAvailableCV.wait(timeout)
if ret is False:
raise TimeoutError("Timed out waiting for DNS-SD resolution")

if returnDevice.value is None:
raise self._ChipStack.ErrorToException(returnErr)
return returnDevice

async def SendCommand(self, nodeid: int, endpoint: int, payload: ClusterObjects.ClusterCommand, responseType=None, timedRequestTimeoutMs: int = None):
async def SendCommand(self, nodeid: int, endpoint: int, payload: ClusterObjects.ClusterCommand, responseType=None, timedRequestTimeoutMs: int = None, interactionTimeoutMs: int = None):
'''
Send a cluster-object encapsulated command to a node and get returned a future that can be awaited upon to receive the response.
If a valid responseType is passed in, that will be used to deserialize the object. If not, the type will be automatically deduced
from the metadata received over the wire.
timedWriteTimeoutMs: Timeout for a timed invoke request. Omit or set to 'None' to indicate a non-timed request.
interactionTimeoutMs: Overall timeout for the interaction. Omit or set to 'None' to have the SDK automatically compute the right
timeout value based on transport characteristics as well as the responsiveness of the target.
'''
self.CheckIsActive()

eventLoop = asyncio.get_running_loop()
future = eventLoop.create_future()

device = self.GetConnectedDeviceSync(nodeid)
device = self.GetConnectedDeviceSync(nodeid, timeoutMs=interactionTimeoutMs)
res = ClusterCommand.SendCommand(
future, eventLoop, responseType, device, ClusterCommand.CommandPath(
EndpointId=endpoint,
ClusterId=payload.cluster_id,
CommandId=payload.command_id,
), payload, timedRequestTimeoutMs=timedRequestTimeoutMs)
), payload, timedRequestTimeoutMs=timedRequestTimeoutMs, interactionTimeoutMs=interactionTimeoutMs)
if res != 0:
future.set_exception(self._ChipStack.ErrorToException(res))
return await future

async def WriteAttribute(self, nodeid: int, attributes: typing.List[typing.Tuple[int, ClusterObjects.ClusterAttributeDescriptor, int]], timedRequestTimeoutMs: int = None):
async def WriteAttribute(self, nodeid: int, attributes: typing.List[typing.Tuple[int, ClusterObjects.ClusterAttributeDescriptor, int]], timedRequestTimeoutMs: int = None, interactionTimeoutMs: int = None):
'''
Write a list of attributes on a target node.
nodeId: Target's Node ID
timedWriteTimeoutMs: Timeout for a timed write request. Omit or set to 'None' to indicate a non-timed request.
attributes: A list of tuples of type (endpoint, cluster-object):
interactionTimeoutMs: Overall timeout for the interaction. Omit or set to 'None' to have the SDK automatically compute the right
timeout value based on transport characteristics as well as the responsiveness of the target.
E.g
(1, Clusters.TestCluster.Attributes.XYZAttribute('hello')) -- Write 'hello' to the XYZ attribute on the test cluster to endpoint 1
Expand All @@ -627,7 +637,7 @@ async def WriteAttribute(self, nodeid: int, attributes: typing.List[typing.Tuple
eventLoop = asyncio.get_running_loop()
future = eventLoop.create_future()

device = self.GetConnectedDeviceSync(nodeid)
device = self.GetConnectedDeviceSync(nodeid, timeoutMs=interactionTimeoutMs)

attrs = []
for v in attributes:
Expand All @@ -639,7 +649,7 @@ async def WriteAttribute(self, nodeid: int, attributes: typing.List[typing.Tuple
v[0], v[1], v[2], 1, v[1].value))

res = ClusterAttribute.WriteAttributes(
future, eventLoop, device, attrs, timedRequestTimeoutMs=timedRequestTimeoutMs)
future, eventLoop, device, attrs, timedRequestTimeoutMs=timedRequestTimeoutMs, interactionTimeoutMs=interactionTimeoutMs)
if res != 0:
raise self._ChipStack.ErrorToException(res)
return await future
Expand Down
12 changes: 8 additions & 4 deletions src/controller/python/chip/ChipStack.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,14 @@ def __call__(self):
self._cv.notify_all()
pythonapi.Py_DecRef(py_object(self))

def Wait(self):
def Wait(self, timeoutMs: int = None):
timeout = None
if timeoutMs is not None:
timeout = float(timeoutMs) / 1000

with self._cv:
while self._finish is False:
self._cv.wait()
self._cv.wait(timeout)
if self._exc is not None:
raise self._exc
return self._res
Expand Down Expand Up @@ -335,7 +339,7 @@ def Shutdown(self):
self.devMgr = None
self.callbackRes = None

def Call(self, callFunct):
def Call(self, callFunct, timeoutMs: int = None):
'''Run a Python function on CHIP stack, and wait for the response.
This function is a wrapper of PostTaskOnChipThread, which includes some handling of application specific logics.
Calling this function on CHIP on CHIP mainloop thread will cause deadlock.
Expand All @@ -344,7 +348,7 @@ def Call(self, callFunct):
self.callbackRes = None
self.completeEvent.clear()
with self.networkLock:
res = self.PostTaskOnChipThread(callFunct).Wait()
res = self.PostTaskOnChipThread(callFunct).Wait(timeoutMs)
self.completeEvent.set()
if res == 0 and self.callbackRes != None:
return self.callbackRes
Expand Down
4 changes: 2 additions & 2 deletions src/controller/python/chip/clusters/Attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,7 @@ def _OnWriteDoneCallback(closure):
closure.handleDone()


def WriteAttributes(future: Future, eventLoop, device, attributes: List[AttributeWriteRequest], timedRequestTimeoutMs: int = None) -> int:
def WriteAttributes(future: Future, eventLoop, device, attributes: List[AttributeWriteRequest], timedRequestTimeoutMs: int = None, interactionTimeoutMs: int = None) -> int:
handle = chip.native.GetLibraryHandle()

writeargs = []
Expand All @@ -898,7 +898,7 @@ def WriteAttributes(future: Future, eventLoop, device, attributes: List[Attribut
ctypes.pythonapi.Py_IncRef(ctypes.py_object(transaction))
res = builtins.chipStack.Call(
lambda: handle.pychip_WriteClient_WriteAttributes(
ctypes.py_object(transaction), device, ctypes.c_uint16(0 if timedRequestTimeoutMs is None else timedRequestTimeoutMs), ctypes.c_size_t(len(attributes)), *writeargs))
ctypes.py_object(transaction), device, ctypes.c_uint16(0 if timedRequestTimeoutMs is None else timedRequestTimeoutMs), ctypes.c_uint16(0 if interactionTimeoutMs is None else interactionTimeoutMs), ctypes.c_size_t(len(attributes)), *writeargs))
if res != 0:
ctypes.pythonapi.Py_DecRef(ctypes.py_object(transaction))
return res
Expand Down
9 changes: 7 additions & 2 deletions src/controller/python/chip/clusters/Command.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,18 @@ def _OnCommandSenderDoneCallback(closure):
ctypes.pythonapi.Py_DecRef(ctypes.py_object(closure))


def SendCommand(future: Future, eventLoop, responseType: Type, device, commandPath: CommandPath, payload: ClusterCommand, timedRequestTimeoutMs: int = None) -> int:
def SendCommand(future: Future, eventLoop, responseType: Type, device, commandPath: CommandPath, payload: ClusterCommand, timedRequestTimeoutMs: int = None, interactionTimeoutMs: int = None) -> int:
''' Send a cluster-object encapsulated command to a device and does the following:
- On receipt of a successful data response, returns the cluster-object equivalent through the provided future.
- None (on a successful response containing no data)
- Raises an exception if any errors are encountered.
If no response type is provided above, the type will be automatically deduced.
If a valid timedRequestTimeoutMs is provided, a timed interaction will be initiated instead.
If a valid interactionTimeoutMs is provided, the interaction will terminate with a CHIP_ERROR_TIMEOUT if a response
has not been received within that timeout. If it isn't provided, a sensible value will be automatically computed that
accounts for the underlying characteristics of both the transport and the responsiveness of the receiver.
'''
if (responseType is not None) and (not issubclass(responseType, ClusterCommand)):
raise ValueError("responseType must be a ClusterCommand or None")
Expand All @@ -160,7 +165,7 @@ def SendCommand(future: Future, eventLoop, responseType: Type, device, commandPa
ctypes.pythonapi.Py_IncRef(ctypes.py_object(transaction))
return builtins.chipStack.Call(
lambda: handle.pychip_CommandSender_SendCommand(ctypes.py_object(
transaction), device, c_uint16(0 if timedRequestTimeoutMs is None else timedRequestTimeoutMs), commandPath.EndpointId, commandPath.ClusterId, commandPath.CommandId, payloadTLV, len(payloadTLV)))
transaction), device, c_uint16(0 if timedRequestTimeoutMs is None else timedRequestTimeoutMs), commandPath.EndpointId, commandPath.ClusterId, commandPath.CommandId, payloadTLV, len(payloadTLV), ctypes.c_uint16(0 if interactionTimeoutMs is None else interactionTimeoutMs)))


def Init():
Expand Down
14 changes: 10 additions & 4 deletions src/controller/python/chip/clusters/attribute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include <cstdio>
#include <lib/support/logging/CHIPLogging.h>

#include <lib/core/Optional.h>

using namespace chip;
using namespace chip::app;

Expand Down Expand Up @@ -241,7 +243,8 @@ struct __attribute__((packed)) PyReadAttributeParams

// Encodes n attribute write requests, follows 3 * n arguments, in the (AttributeWritePath*=void *, uint8_t*, size_t) order.
chip::ChipError::StorageType pychip_WriteClient_WriteAttributes(void * appContext, DeviceProxy * device,
uint16_t timedWriteTimeoutMs, size_t n, ...);
uint16_t timedWriteTimeoutMs, uint16_t interactionTimeoutMs,
size_t n, ...);
chip::ChipError::StorageType pychip_ReadClient_ReadAttributes(void * appContext, ReadClient ** pReadClient,
ReadClientCallback ** pCallback, DeviceProxy * device,
uint8_t * readParamsBuf, size_t n, size_t total, ...);
Expand Down Expand Up @@ -319,7 +322,8 @@ void pychip_ReadClient_InitCallbacks(OnReadAttributeDataCallback onReadAttribute
}

chip::ChipError::StorageType pychip_WriteClient_WriteAttributes(void * appContext, DeviceProxy * device,
uint16_t timedWriteTimeoutMs, size_t n, ...)
uint16_t timedWriteTimeoutMs, uint16_t interactionTimeoutMs,
size_t n, ...)
{
CHIP_ERROR err = CHIP_NO_ERROR;

Expand All @@ -331,7 +335,7 @@ chip::ChipError::StorageType pychip_WriteClient_WriteAttributes(void * appContex
va_list args;
va_start(args, n);

VerifyOrExit(device != nullptr && device->GetSecureSession().HasValue(), err = CHIP_ERROR_INCORRECT_STATE);
VerifyOrExit(device != nullptr && device->GetSecureSession().HasValue(), err = CHIP_ERROR_MISSING_SECURE_SESSION);

{
for (size_t i = 0; i < n; i++)
Expand Down Expand Up @@ -359,7 +363,9 @@ chip::ChipError::StorageType pychip_WriteClient_WriteAttributes(void * appContex
}
}

SuccessOrExit(err = client->SendWriteRequest(device->GetSecureSession().Value()));
SuccessOrExit(err = client->SendWriteRequest(device->GetSecureSession().Value(),
interactionTimeoutMs != 0 ? System::Clock::Milliseconds32(interactionTimeoutMs)
: System::Clock::kZero));

client.release();
callback.release();
Expand Down
13 changes: 10 additions & 3 deletions src/controller/python/chip/clusters/command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ extern "C" {
chip::ChipError::StorageType pychip_CommandSender_SendCommand(void * appContext, DeviceProxy * device,
uint16_t timedRequestTimeoutMs, chip::EndpointId endpointId,
chip::ClusterId clusterId, chip::CommandId commandId,
const uint8_t * payload, size_t length);
const uint8_t * payload, size_t length,
uint16_t interactionTimeoutMs);
}

namespace chip {
Expand Down Expand Up @@ -127,10 +128,12 @@ void pychip_CommandSender_InitCallbacks(OnCommandSenderResponseCallback onComman
chip::ChipError::StorageType pychip_CommandSender_SendCommand(void * appContext, DeviceProxy * device,
uint16_t timedRequestTimeoutMs, chip::EndpointId endpointId,
chip::ClusterId clusterId, chip::CommandId commandId,
const uint8_t * payload, size_t length)
const uint8_t * payload, size_t length, uint16_t interactionTimeoutMs)
{
CHIP_ERROR err = CHIP_NO_ERROR;

VerifyOrReturnError(device->GetSecureSession().HasValue(), CHIP_ERROR_MISSING_SECURE_SESSION.AsInteger());

std::unique_ptr<CommandSenderCallback> callback = std::make_unique<CommandSenderCallback>(appContext);
std::unique_ptr<CommandSender> sender = std::make_unique<CommandSender>(callback.get(), device->GetExchangeManager(),
/* is timed request */ timedRequestTimeoutMs != 0);
Expand All @@ -151,7 +154,11 @@ chip::ChipError::StorageType pychip_CommandSender_SendCommand(void * appContext,

SuccessOrExit(err = sender->FinishCommand(timedRequestTimeoutMs != 0 ? Optional<uint16_t>(timedRequestTimeoutMs)
: Optional<uint16_t>::Missing()));
SuccessOrExit(err = device->SendCommands(sender.get()));

SuccessOrExit(err = sender->SendCommandRequest(device->GetSecureSession().Value(),
interactionTimeoutMs != 0
? MakeOptional(System::Clock::Milliseconds32(interactionTimeoutMs))
: Optional<System::Clock::Timeout>::Missing()));

sender.release();
callback.release();
Expand Down

0 comments on commit d5e9940

Please sign in to comment.