Skip to content

Commit

Permalink
Python: Support for specifying absolute timeouts for Invoke/Write (#2…
Browse files Browse the repository at this point in the history
…0531)

* Python: Support for specifying absolute timeouts for Invoke/Write

This adds support for specifying an absolute timeout for
ChipDeviceController.SendCommand and ChipDeviceController.WriteAttribute
methods. This timeout is used to bound both mDNS discovery and
the subsequent IM interaction.

* Review feedback

* Review feedback
  • Loading branch information
mrjerryjohns authored and web-flow committed Jul 21, 2022
1 parent 63faaf5 commit fbdd45e
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 25 deletions.
1 change: 1 addition & 0 deletions src/controller/python/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ shared_library("ChipDeviceCtrl") {
"chip/internal/ChipThreadWork.h",
"chip/internal/CommissionerImpl.cpp",
"chip/logging/LoggingRedirect.cpp",
"chip/utils/DeviceProxyUtils.cpp",
]
} else {
sources += [
Expand Down
43 changes: 33 additions & 10 deletions src/controller/python/chip/ChipDeviceCtrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ def GetClusterHandler(self):

return self._Cluster

def GetConnectedDeviceSync(self, nodeid, allowPASE=True):
def GetConnectedDeviceSync(self, nodeid, allowPASE=True, timeoutMs: int = None):
self.CheckIsActive()

returnDevice = c_void_p(None)
Expand All @@ -582,57 +582,80 @@ def DeviceAvailableCallback(device, err):

if allowPASE:
res = self._ChipStack.Call(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned(
self.devCtrl, nodeid, byref(returnDevice)))
self.devCtrl, nodeid, byref(returnDevice)), timeoutMs)
if res == 0:
print('Using PASE connection')
return returnDevice

res = self._ChipStack.Call(lambda: self._dmLib.pychip_GetConnectedDeviceByNodeId(
self.devCtrl, nodeid, DeviceAvailableCallback))
self.devCtrl, nodeid, DeviceAvailableCallback), timeoutMs)
if res != 0:
raise self._ChipStack.ErrorToException(res)

# The callback might have been received synchronously (during self._ChipStack.Call()).
# Check if the device is already set before waiting for the callback.
if returnDevice.value is None:
with deviceAvailableCV:
deviceAvailableCV.wait()
timeout = None
if (timeoutMs):
timeout = float(timeoutMs) / 1000

ret = deviceAvailableCV.wait(timeout)
if ret is False:
raise TimeoutError("Timed out waiting for DNS-SD resolution")

if returnDevice.value is None:
raise self._ChipStack.ErrorToException(returnErr)
return returnDevice

async def SendCommand(self, nodeid: int, endpoint: int, payload: ClusterObjects.ClusterCommand, responseType=None, timedRequestTimeoutMs: int = None):
def ComputeRoundTripTimeout(self, nodeid, upperLayerProcessingTimeoutMs: int = 0):
''' Returns a computed timeout value based on the round-trip time it takes for the peer at the other end of the session to
receive a message, process it and send it back. This is computed based on the session type, the type of transport, sleepy
characteristics of the target and a caller-provided value for the time it takes to process a message at the upper layer on
the target For group sessions.
This will result in a session being established if one wasn't already.
'''
device = self.GetConnectedDeviceSync(nodeid)
res = self._ChipStack.Call(lambda: self._dmLib.pychip_DeviceProxy_ComputeRoundTripTimeout(
device, upperLayerProcessingTimeoutMs))
return res

async def SendCommand(self, nodeid: int, endpoint: int, payload: ClusterObjects.ClusterCommand, responseType=None, timedRequestTimeoutMs: int = None, interactionTimeoutMs: int = None):
'''
Send a cluster-object encapsulated command to a node and get returned a future that can be awaited upon to receive the response.
If a valid responseType is passed in, that will be used to deserialize the object. If not, the type will be automatically deduced
from the metadata received over the wire.
timedWriteTimeoutMs: Timeout for a timed invoke request. Omit or set to 'None' to indicate a non-timed request.
interactionTimeoutMs: Overall timeout for the interaction. Omit or set to 'None' to have the SDK automatically compute the right
timeout value based on transport characteristics as well as the responsiveness of the target.
'''
self.CheckIsActive()

eventLoop = asyncio.get_running_loop()
future = eventLoop.create_future()

device = self.GetConnectedDeviceSync(nodeid)
device = self.GetConnectedDeviceSync(nodeid, timeoutMs=interactionTimeoutMs)
res = ClusterCommand.SendCommand(
future, eventLoop, responseType, device, ClusterCommand.CommandPath(
EndpointId=endpoint,
ClusterId=payload.cluster_id,
CommandId=payload.command_id,
), payload, timedRequestTimeoutMs=timedRequestTimeoutMs)
), payload, timedRequestTimeoutMs=timedRequestTimeoutMs, interactionTimeoutMs=interactionTimeoutMs)
if res != 0:
future.set_exception(self._ChipStack.ErrorToException(res))
return await future

async def WriteAttribute(self, nodeid: int, attributes: typing.List[typing.Tuple[int, ClusterObjects.ClusterAttributeDescriptor, int]], timedRequestTimeoutMs: int = None):
async def WriteAttribute(self, nodeid: int, attributes: typing.List[typing.Tuple[int, ClusterObjects.ClusterAttributeDescriptor, int]], timedRequestTimeoutMs: int = None, interactionTimeoutMs: int = None):
'''
Write a list of attributes on a target node.
nodeId: Target's Node ID
timedWriteTimeoutMs: Timeout for a timed write request. Omit or set to 'None' to indicate a non-timed request.
attributes: A list of tuples of type (endpoint, cluster-object):
interactionTimeoutMs: Overall timeout for the interaction. Omit or set to 'None' to have the SDK automatically compute the right
timeout value based on transport characteristics as well as the responsiveness of the target.
E.g
(1, Clusters.TestCluster.Attributes.XYZAttribute('hello')) -- Write 'hello' to the XYZ attribute on the test cluster to endpoint 1
Expand All @@ -642,7 +665,7 @@ async def WriteAttribute(self, nodeid: int, attributes: typing.List[typing.Tuple
eventLoop = asyncio.get_running_loop()
future = eventLoop.create_future()

device = self.GetConnectedDeviceSync(nodeid)
device = self.GetConnectedDeviceSync(nodeid, timeoutMs=interactionTimeoutMs)

attrs = []
for v in attributes:
Expand All @@ -654,7 +677,7 @@ async def WriteAttribute(self, nodeid: int, attributes: typing.List[typing.Tuple
v[0], v[1], v[2], 1, v[1].value))

res = ClusterAttribute.WriteAttributes(
future, eventLoop, device, attrs, timedRequestTimeoutMs=timedRequestTimeoutMs)
future, eventLoop, device, attrs, timedRequestTimeoutMs=timedRequestTimeoutMs, interactionTimeoutMs=interactionTimeoutMs)
if res != 0:
raise self._ChipStack.ErrorToException(res)
return await future
Expand Down
15 changes: 11 additions & 4 deletions src/controller/python/chip/ChipStack.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,17 @@ def __call__(self):
self._cv.notify_all()
pythonapi.Py_DecRef(py_object(self))

def Wait(self):
def Wait(self, timeoutMs: int = None):
timeout = None
if timeoutMs is not None:
timeout = float(timeoutMs) / 1000

with self._cv:
while self._finish is False:
self._cv.wait()
res = self._cv.wait(timeout)
if res is False:
raise TimeoutError("Timed out waiting for task to finish executing on the Matter thread")

if self._exc is not None:
raise self._exc
return self._res
Expand Down Expand Up @@ -335,7 +342,7 @@ def Shutdown(self):
self.devMgr = None
self.callbackRes = None

def Call(self, callFunct):
def Call(self, callFunct, timeoutMs: int = None):
'''Run a Python function on CHIP stack, and wait for the response.
This function is a wrapper of PostTaskOnChipThread, which includes some handling of application specific logics.
Calling this function on CHIP on CHIP mainloop thread will cause deadlock.
Expand All @@ -344,7 +351,7 @@ def Call(self, callFunct):
self.callbackRes = None
self.completeEvent.clear()
with self.networkLock:
res = self.PostTaskOnChipThread(callFunct).Wait()
res = self.PostTaskOnChipThread(callFunct).Wait(timeoutMs)
self.completeEvent.set()
if res == 0 and self.callbackRes != None:
return self.callbackRes
Expand Down
4 changes: 2 additions & 2 deletions src/controller/python/chip/clusters/Attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,7 @@ def _OnWriteDoneCallback(closure):
closure.handleDone()


def WriteAttributes(future: Future, eventLoop, device, attributes: List[AttributeWriteRequest], timedRequestTimeoutMs: int = None) -> int:
def WriteAttributes(future: Future, eventLoop, device, attributes: List[AttributeWriteRequest], timedRequestTimeoutMs: int = None, interactionTimeoutMs: int = None) -> int:
handle = chip.native.GetLibraryHandle()

writeargs = []
Expand All @@ -898,7 +898,7 @@ def WriteAttributes(future: Future, eventLoop, device, attributes: List[Attribut
ctypes.pythonapi.Py_IncRef(ctypes.py_object(transaction))
res = builtins.chipStack.Call(
lambda: handle.pychip_WriteClient_WriteAttributes(
ctypes.py_object(transaction), device, ctypes.c_uint16(0 if timedRequestTimeoutMs is None else timedRequestTimeoutMs), ctypes.c_size_t(len(attributes)), *writeargs))
ctypes.py_object(transaction), device, ctypes.c_uint16(0 if timedRequestTimeoutMs is None else timedRequestTimeoutMs), ctypes.c_uint16(0 if interactionTimeoutMs is None else interactionTimeoutMs), ctypes.c_size_t(len(attributes)), *writeargs))
if res != 0:
ctypes.pythonapi.Py_DecRef(ctypes.py_object(transaction))
return res
Expand Down
9 changes: 7 additions & 2 deletions src/controller/python/chip/clusters/Command.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,18 @@ def _OnCommandSenderDoneCallback(closure):
ctypes.pythonapi.Py_DecRef(ctypes.py_object(closure))


def SendCommand(future: Future, eventLoop, responseType: Type, device, commandPath: CommandPath, payload: ClusterCommand, timedRequestTimeoutMs: int = None) -> int:
def SendCommand(future: Future, eventLoop, responseType: Type, device, commandPath: CommandPath, payload: ClusterCommand, timedRequestTimeoutMs: int = None, interactionTimeoutMs: int = None) -> int:
''' Send a cluster-object encapsulated command to a device and does the following:
- On receipt of a successful data response, returns the cluster-object equivalent through the provided future.
- None (on a successful response containing no data)
- Raises an exception if any errors are encountered.
If no response type is provided above, the type will be automatically deduced.
If a valid timedRequestTimeoutMs is provided, a timed interaction will be initiated instead.
If a valid interactionTimeoutMs is provided, the interaction will terminate with a CHIP_ERROR_TIMEOUT if a response
has not been received within that timeout. If it isn't provided, a sensible value will be automatically computed that
accounts for the underlying characteristics of both the transport and the responsiveness of the receiver.
'''
if (responseType is not None) and (not issubclass(responseType, ClusterCommand)):
raise ValueError("responseType must be a ClusterCommand or None")
Expand All @@ -166,7 +171,7 @@ def SendCommand(future: Future, eventLoop, responseType: Type, device, commandPa
ctypes.pythonapi.Py_IncRef(ctypes.py_object(transaction))
return builtins.chipStack.Call(
lambda: handle.pychip_CommandSender_SendCommand(ctypes.py_object(
transaction), device, c_uint16(0 if timedRequestTimeoutMs is None else timedRequestTimeoutMs), commandPath.EndpointId, commandPath.ClusterId, commandPath.CommandId, payloadTLV, len(payloadTLV)))
transaction), device, c_uint16(0 if timedRequestTimeoutMs is None else timedRequestTimeoutMs), commandPath.EndpointId, commandPath.ClusterId, commandPath.CommandId, payloadTLV, len(payloadTLV), ctypes.c_uint16(0 if interactionTimeoutMs is None else interactionTimeoutMs)))


def Init():
Expand Down
14 changes: 10 additions & 4 deletions src/controller/python/chip/clusters/attribute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include <cstdio>
#include <lib/support/logging/CHIPLogging.h>

#include <lib/core/Optional.h>

using namespace chip;
using namespace chip::app;

Expand Down Expand Up @@ -241,7 +243,8 @@ struct __attribute__((packed)) PyReadAttributeParams

// Encodes n attribute write requests, follows 3 * n arguments, in the (AttributeWritePath*=void *, uint8_t*, size_t) order.
chip::ChipError::StorageType pychip_WriteClient_WriteAttributes(void * appContext, DeviceProxy * device,
uint16_t timedWriteTimeoutMs, size_t n, ...);
uint16_t timedWriteTimeoutMs, uint16_t interactionTimeoutMs,
size_t n, ...);
chip::ChipError::StorageType pychip_ReadClient_ReadAttributes(void * appContext, ReadClient ** pReadClient,
ReadClientCallback ** pCallback, DeviceProxy * device,
uint8_t * readParamsBuf, size_t n, size_t total, ...);
Expand Down Expand Up @@ -319,7 +322,8 @@ void pychip_ReadClient_InitCallbacks(OnReadAttributeDataCallback onReadAttribute
}

chip::ChipError::StorageType pychip_WriteClient_WriteAttributes(void * appContext, DeviceProxy * device,
uint16_t timedWriteTimeoutMs, size_t n, ...)
uint16_t timedWriteTimeoutMs, uint16_t interactionTimeoutMs,
size_t n, ...)
{
CHIP_ERROR err = CHIP_NO_ERROR;

Expand All @@ -331,7 +335,7 @@ chip::ChipError::StorageType pychip_WriteClient_WriteAttributes(void * appContex
va_list args;
va_start(args, n);

VerifyOrExit(device != nullptr && device->GetSecureSession().HasValue(), err = CHIP_ERROR_INCORRECT_STATE);
VerifyOrExit(device != nullptr && device->GetSecureSession().HasValue(), err = CHIP_ERROR_MISSING_SECURE_SESSION);

{
for (size_t i = 0; i < n; i++)
Expand Down Expand Up @@ -359,7 +363,9 @@ chip::ChipError::StorageType pychip_WriteClient_WriteAttributes(void * appContex
}
}

SuccessOrExit(err = client->SendWriteRequest(device->GetSecureSession().Value()));
SuccessOrExit(err = client->SendWriteRequest(device->GetSecureSession().Value(),
interactionTimeoutMs != 0 ? System::Clock::Milliseconds32(interactionTimeoutMs)
: System::Clock::kZero));

client.release();
callback.release();
Expand Down
13 changes: 10 additions & 3 deletions src/controller/python/chip/clusters/command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ extern "C" {
chip::ChipError::StorageType pychip_CommandSender_SendCommand(void * appContext, DeviceProxy * device,
uint16_t timedRequestTimeoutMs, chip::EndpointId endpointId,
chip::ClusterId clusterId, chip::CommandId commandId,
const uint8_t * payload, size_t length);
const uint8_t * payload, size_t length,
uint16_t interactionTimeoutMs);
}

namespace chip {
Expand Down Expand Up @@ -127,10 +128,12 @@ void pychip_CommandSender_InitCallbacks(OnCommandSenderResponseCallback onComman
chip::ChipError::StorageType pychip_CommandSender_SendCommand(void * appContext, DeviceProxy * device,
uint16_t timedRequestTimeoutMs, chip::EndpointId endpointId,
chip::ClusterId clusterId, chip::CommandId commandId,
const uint8_t * payload, size_t length)
const uint8_t * payload, size_t length, uint16_t interactionTimeoutMs)
{
CHIP_ERROR err = CHIP_NO_ERROR;

VerifyOrReturnError(device->GetSecureSession().HasValue(), CHIP_ERROR_MISSING_SECURE_SESSION.AsInteger());

std::unique_ptr<CommandSenderCallback> callback = std::make_unique<CommandSenderCallback>(appContext);
std::unique_ptr<CommandSender> sender = std::make_unique<CommandSender>(callback.get(), device->GetExchangeManager(),
/* is timed request */ timedRequestTimeoutMs != 0);
Expand All @@ -151,7 +154,11 @@ chip::ChipError::StorageType pychip_CommandSender_SendCommand(void * appContext,

SuccessOrExit(err = sender->FinishCommand(timedRequestTimeoutMs != 0 ? Optional<uint16_t>(timedRequestTimeoutMs)
: Optional<uint16_t>::Missing()));
SuccessOrExit(err = device->SendCommands(sender.get()));

SuccessOrExit(err = sender->SendCommandRequest(device->GetSecureSession().Value(),
interactionTimeoutMs != 0
? MakeOptional(System::Clock::Milliseconds32(interactionTimeoutMs))
: Optional<System::Clock::Timeout>::Missing()));

sender.release();
callback.release();
Expand Down
63 changes: 63 additions & 0 deletions src/controller/python/chip/utils/DeviceProxyUtils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
*
* Copyright (c) 2020-2022 Project CHIP Authors
* Copyright (c) 2019-2020 Google LLC.
* Copyright (c) 2013-2018 Nest Labs, Inc.
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* @file
* Implementation of the native methods expected by the Python
* version of Chip Device Manager.
*
*/

#include "system/SystemClock.h"
#include <app/DeviceProxy.h>
#include <stdio.h>
#include <system/SystemError.h>

using namespace chip;

static_assert(std::is_same<uint32_t, ChipError::StorageType>::value, "python assumes CHIP_ERROR maps to c_uint32");

extern "C" {

/**
* @brief
*
* This computes the value for a timeout based on the round trip time it takes for a message to be sent to a peer,
* the message to be processed given the upperLayerProcessingTimeoutMs argument, and a response to come back.
*
* See Session::ComputeRoundTripTimeout for more specific details.
*
* A valid DeviceProxy pointer with a valid, established session is required for this method.
*
*
*/
uint32_t pychip_DeviceProxy_ComputeRoundTripTimeout(DeviceProxy * device, uint32_t upperLayerProcessingTimeoutMs)
{
VerifyOrDie(device != nullptr);

auto * deviceProxy = static_cast<DeviceProxy *>(device);
VerifyOrDie(deviceProxy->GetSecureSession().HasValue());

return deviceProxy->GetSecureSession()
.Value()
->ComputeRoundTripTimeout(System::Clock::Milliseconds32(upperLayerProcessingTimeoutMs))
.count();
}
}

0 comments on commit fbdd45e

Please sign in to comment.