Skip to content

Commit

Permalink
Enable chip-repl to send batch commands (#30851)
Browse files Browse the repository at this point in the history
This enables chip-repl to send Batch commands in a single invoke request command. This is required in order to automate testing csg test plan for batch invoke commands.
  • Loading branch information
tehampson authored and pull[bot] committed Feb 13, 2024
1 parent 33f62fe commit 2100695
Show file tree
Hide file tree
Showing 8 changed files with 359 additions and 21 deletions.
39 changes: 39 additions & 0 deletions src/controller/python/chip/ChipDeviceCtrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,45 @@ async def SendCommand(self, nodeid: int, endpoint: int, payload: ClusterObjects.
interactionTimeoutMs=interactionTimeoutMs, busyWaitMs=busyWaitMs, suppressResponse=suppressResponse).raise_on_error()
return await future

async def SendBatchCommands(self, nodeid: int, commands: typing.List[ClusterCommand.InvokeRequestInfo],
timedRequestTimeoutMs: typing.Optional[int] = None,
interactionTimeoutMs: typing.Optional[int] = None, busyWaitMs: typing.Optional[int] = None,
suppressResponse: typing.Optional[bool] = None):
'''
Send a batch of cluster-object encapsulated commands to a node and get returned a future that can be awaited upon to receive
the responses. If a valid responseType is passed in, that will be used to deserialize the object. If not,
the type will be automatically deduced from the metadata received over the wire.
nodeId: Target's Node ID
commands: A list of InvokeRequestInfo containing the commands to invoke.
timedWriteTimeoutMs: Timeout for a timed invoke request. Omit or set to 'None' to indicate a non-timed request.
interactionTimeoutMs: Overall timeout for the interaction. Omit or set to 'None' to have the SDK automatically compute the
right timeout value based on transport characteristics as well as the responsiveness of the target.
busyWaitMs: How long to wait in ms after sending command to device before performing any other operations.
suppressResponse: Do not send a response to this action
Returns:
- List of command responses in the same order as what was given in `commands`. The type of the response is defined by the command.
- A value of `None` indicates success.
- If only a single command fails, for example with `UNSUPPORTED_COMMAND`, the corresponding index associated with the command will,
contain `interaction_model.Status.UnsupportedCommand`.
- If a command is not responded to by server, command will contain `interaction_model.Status.Failure`
Raises:
- InteractionModelError if error with sending of InvokeRequestMessage fails as a whole.
'''
self.CheckIsActive()

eventLoop = asyncio.get_running_loop()
future = eventLoop.create_future()

device = self.GetConnectedDeviceSync(nodeid, timeoutMs=interactionTimeoutMs)

ClusterCommand.SendBatchCommands(
future, eventLoop, device.deviceProxy, commands,
timedRequestTimeoutMs=timedRequestTimeoutMs,
interactionTimeoutMs=interactionTimeoutMs, busyWaitMs=busyWaitMs, suppressResponse=suppressResponse).raise_on_error()
return await future

def SendGroupCommand(self, groupid: int, payload: ClusterObjects.ClusterCommand, busyWaitMs: typing.Union[None, int] = None):
'''
Send a group cluster-object encapsulated command to a group_id and get returned a future
Expand Down
159 changes: 152 additions & 7 deletions src/controller/python/chip/clusters/Command.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from asyncio.futures import Future
from ctypes import CFUNCTYPE, c_bool, c_char_p, c_size_t, c_uint8, c_uint16, c_uint32, c_void_p, py_object
from dataclasses import dataclass
from typing import Type, Union
from typing import List, Optional, Type, Union

import chip.exceptions
import chip.interaction_model
Expand All @@ -42,6 +42,13 @@ class CommandPath:
CommandId: int


@dataclass
class InvokeRequestInfo:
EndpointId: int
Command: ClusterCommand
ResponseType: Optional[Type] = None


@dataclass
class Status:
IMStatus: int
Expand Down Expand Up @@ -94,7 +101,11 @@ def _handleResponse(self, path: CommandPath, status: Status, response: bytes):
else:
self._future.set_result(None)

def handleResponse(self, path: CommandPath, status: Status, response: bytes):
def handleResponse(self, path: CommandPath, index: int, status: Status, response: bytes):
# For AsyncCommandTransaction we only expect to ever get one response so we don't bother
# checking `index`. We just share a callback API with batch commands. If we ever get a
# second call to `handleResponse` we will see a different error on trying to set future
# that has already been set.
self._event_loop.call_soon_threadsafe(
self._handleResponse, path, status, response)

Expand All @@ -105,9 +116,79 @@ def _handleError(self, imError: Status, chipError: PyChipError, exception: Excep
self._future.set_exception(chipError.to_exception())
else:
try:
# If you got an exception from this call other than AttributeError please
# add it to the except block below. We changed Exception->AttributeError as
# that is what we thought we are trying to catch here.
self._future.set_exception(
chip.interaction_model.InteractionModelError(chip.interaction_model.Status(imError.IMStatus), imError.ClusterStatus))
except AttributeError:
logger.exception("Failed to map interaction model status received: %s. Remapping to Failure." % imError)
self._future.set_exception(chip.interaction_model.InteractionModelError(
chip.interaction_model.Status.Failure, imError.ClusterStatus))

def handleError(self, status: Status, chipError: PyChipError):
self._event_loop.call_soon_threadsafe(
self._handleError, status, chipError, None
)

def handleDone(self):
ctypes.pythonapi.Py_DecRef(ctypes.py_object(self))


class AsyncBatchCommandsTransaction:
def __init__(self, future: Future, eventLoop, expectTypes: List[Type]):
self._event_loop = eventLoop
self._future = future
self._expect_types = expectTypes
default_im_failure = chip.interaction_model.InteractionModelError(
chip.interaction_model.Status.NoCommandResponse)
self._responses = [default_im_failure] * len(expectTypes)

def _handleResponse(self, path: CommandPath, index: int, status: Status, response: bytes):
if index > len(self._responses):
self._handleError(status, 0, IndexError(f"CommandSenderCallback has given us an unexpected index value {index}"))
return

if (len(response) == 0):
self._responses[index] = None
else:
# If a type hasn't been assigned, let's auto-deduce it.
if (self._expect_types[index] is None):
self._expect_types[index] = FindCommandClusterObject(False, path)

if self._expect_types[index]:
try:
# If you got an exception from this call other than AttributeError please
# add it to the except block below. We changed Exception->AttributeError as
# that is what we thought we are trying to catch here.
self._responses[index] = self._expect_types[index].FromTLV(response)
except AttributeError as ex:
self._handleError(status, 0, ex)
else:
self._responses[index] = None

def handleResponse(self, path: CommandPath, index: int, status: Status, response: bytes):
self._event_loop.call_soon_threadsafe(
self._handleResponse, path, index, status, response)

def _handleError(self, imError: Status, chipError: PyChipError, exception: Exception):
if self._future.done():
# TODO Right now this even callback happens if there was a real IM Status error on one command.
# We need to update OnError to allow providing a CommandRef that we can try associating with it.
logger.exception(f"Recieved another error, but we have sent error. imError:{imError}, chipError {chipError}")
return
if exception:
self._future.set_exception(exception)
elif chipError != 0:
self._future.set_exception(chipError.to_exception())
else:
try:
# If you got an exception from this call other than AttributeError please
# add it to the except block below. We changed Exception->AttributeError as
# that is what we thought we are trying to catch here.
self._future.set_exception(
chip.interaction_model.InteractionModelError(chip.interaction_model.Status(imError.IMStatus), imError.ClusterStatus))
except Exception:
except AttributeError:
logger.exception("Failed to map interaction model status received: %s. Remapping to Failure." % imError)
self._future.set_exception(chip.interaction_model.InteractionModelError(
chip.interaction_model.Status.Failure, imError.ClusterStatus))
Expand All @@ -117,20 +198,29 @@ def handleError(self, status: Status, chipError: PyChipError):
self._handleError, status, chipError, None
)

def _handleDone(self):
self._future.set_result(self._responses)
ctypes.pythonapi.Py_DecRef(ctypes.py_object(self))

def handleDone(self):
self._event_loop.call_soon_threadsafe(
self._handleDone
)


_OnCommandSenderResponseCallbackFunct = CFUNCTYPE(
None, py_object, c_uint16, c_uint32, c_uint32, c_uint16, c_uint8, c_void_p, c_uint32)
None, py_object, c_uint16, c_uint32, c_uint32, c_size_t, c_uint16, c_uint8, c_void_p, c_uint32)
_OnCommandSenderErrorCallbackFunct = CFUNCTYPE(
None, py_object, c_uint16, c_uint8, PyChipError)
_OnCommandSenderDoneCallbackFunct = CFUNCTYPE(
None, py_object)


@_OnCommandSenderResponseCallbackFunct
def _OnCommandSenderResponseCallback(closure, endpoint: int, cluster: int, command: int,
def _OnCommandSenderResponseCallback(closure, endpoint: int, cluster: int, command: int, index: int,
imStatus: int, clusterStatus: int, payload, size):
data = ctypes.string_at(payload, size)
closure.handleResponse(CommandPath(endpoint, cluster, command), Status(
closure.handleResponse(CommandPath(endpoint, cluster, command), index, Status(
imStatus, clusterStatus), data[:])


Expand All @@ -141,7 +231,7 @@ def _OnCommandSenderErrorCallback(closure, imStatus: int, clusterStatus: int, ch

@_OnCommandSenderDoneCallbackFunct
def _OnCommandSenderDoneCallback(closure):
ctypes.pythonapi.Py_DecRef(ctypes.py_object(closure))
closure.handleDone()


def TestOnlySendCommandTimedRequestFlagWithNoTimedInvoke(future: Future, eventLoop, responseType, device, commandPath, payload):
Expand Down Expand Up @@ -201,6 +291,59 @@ def SendCommand(future: Future, eventLoop, responseType: Type, device, commandPa
))


def SendBatchCommands(future: Future, eventLoop, device, commands: List[InvokeRequestInfo],
timedRequestTimeoutMs: Optional[int] = None, interactionTimeoutMs: Optional[int] = None, busyWaitMs: Optional[int] = None,
suppressResponse: Optional[bool] = None) -> PyChipError:
''' Send a cluster-object encapsulated command to a device and does the following:
- On receipt of a successful data response, returns the cluster-object equivalent through the provided future.
- None (on a successful response containing no data)
- Raises an exception if any errors are encountered.
If no response type is provided above, the type will be automatically deduced.
If a valid timedRequestTimeoutMs is provided, a timed interaction will be initiated instead.
If a valid interactionTimeoutMs is provided, the interaction will terminate with a CHIP_ERROR_TIMEOUT if a response
has not been received within that timeout. If it isn't provided, a sensible value will be automatically computed that
accounts for the underlying characteristics of both the transport and the responsiveness of the receiver.
'''
handle = chip.native.GetLibraryHandle()

responseTypes = []
commandargs = []
for command in commands:
clusterCommand = command.Command
responseType = command.ResponseType
if (responseType is not None) and (not issubclass(responseType, ClusterCommand)):
raise ValueError("responseType must be a ClusterCommand or None")
if clusterCommand.must_use_timed_invoke and timedRequestTimeoutMs is None or timedRequestTimeoutMs == 0:
raise chip.interaction_model.InteractionModelError(chip.interaction_model.Status.NeedsTimedInteraction)

commandPath = chip.interaction_model.CommandPathIBStruct.build({
"EndpointId": command.EndpointId,
"ClusterId": clusterCommand.cluster_id,
"CommandId": clusterCommand.command_id})
payloadTLV = clusterCommand.ToTLV()

commandargs.append(c_char_p(commandPath))
commandargs.append(c_char_p(bytes(payloadTLV)))
commandargs.append(c_size_t(len(payloadTLV)))

responseTypes.append(responseType)

transaction = AsyncBatchCommandsTransaction(future, eventLoop, responseTypes)
ctypes.pythonapi.Py_IncRef(ctypes.py_object(transaction))

return builtins.chipStack.Call(
lambda: handle.pychip_CommandSender_SendBatchCommands(
py_object(transaction), device,
c_uint16(0 if timedRequestTimeoutMs is None else timedRequestTimeoutMs),
c_uint16(0 if interactionTimeoutMs is None else interactionTimeoutMs),
c_uint16(0 if busyWaitMs is None else busyWaitMs),
c_bool(False if suppressResponse is None else suppressResponse),
c_size_t(len(commands)), *commandargs)
)


def SendGroupCommand(groupId: int, devCtrl: c_void_p, payload: ClusterCommand, busyWaitMs: Union[None, int] = None) -> PyChipError:
''' Send a cluster-object encapsulated group command to a device and does the following:
- None (on a successful response containing no data)
Expand All @@ -227,6 +370,8 @@ def Init():

setter.Set('pychip_CommandSender_SendCommand',
PyChipError, [py_object, c_void_p, c_uint16, c_uint32, c_uint32, c_char_p, c_size_t, c_uint16, c_bool])
setter.Set('pychip_CommandSender_SendBatchCommands',
PyChipError, [py_object, c_void_p, c_uint16, c_uint16, c_uint16, c_bool, c_size_t])
setter.Set('pychip_CommandSender_TestOnlySendCommandTimedRequestNoTimedInvoke',
PyChipError, [py_object, c_void_p, c_uint32, c_uint32, c_char_p, c_size_t, c_uint16, c_bool])
setter.Set('pychip_CommandSender_SendGroupCommand',
Expand Down
Loading

0 comments on commit 2100695

Please sign in to comment.