From e33153fcde71ed2338711e29a602491c56bb9985 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Sat, 2 Jan 2021 22:25:15 -0500 Subject: [PATCH 01/37] it reads! (something) --- pymodbus/client/asynchronous/factory/tcp.py | 21 ++ .../asynchronous/schedulers/__init__.py | 1 + pymodbus/client/asynchronous/tcp.py | 16 +- pymodbus/client/asynchronous/trio/__init__.py | 285 ++++++++++++++++++ tc.py | 15 + 5 files changed, 332 insertions(+), 6 deletions(-) create mode 100644 pymodbus/client/asynchronous/trio/__init__.py create mode 100644 tc.py diff --git a/pymodbus/client/asynchronous/factory/tcp.py b/pymodbus/client/asynchronous/factory/tcp.py index d0155a48e..2bb19b8a5 100644 --- a/pymodbus/client/asynchronous/factory/tcp.py +++ b/pymodbus/client/asynchronous/factory/tcp.py @@ -105,6 +105,25 @@ def async_io_factory(host="127.0.0.1", port=Defaults.Port, framer=None, return loop, client +def trio_factory(host="127.0.0.1", port=Defaults.Port, framer=None, + source_address=None, timeout=None, **kwargs): + """ + Factory to create Trio based asynchronous tcp clients + :param host: Host IP address + :param port: Port + :param framer: Modbus Framer + :param source_address: Bind address + :param timeout: Timeout in seconds + :param kwargs: + :return: tcp client + """ + from pymodbus.client.asynchronous.trio import init_tcp_client + proto_cls = kwargs.get("proto_cls", None) + client = init_tcp_client(proto_cls, host, port) + + return client + + def get_factory(scheduler): """ Gets protocol factory based on the backend scheduler being used @@ -117,6 +136,8 @@ def get_factory(scheduler): return io_loop_factory elif scheduler == schedulers.ASYNC_IO: return async_io_factory + elif scheduler == schedulers.TRIO: + return trio_factory else: LOGGER.warning("Allowed Schedulers: {}, {}, {}".format( schedulers.REACTOR, schedulers.IO_LOOP, schedulers.ASYNC_IO diff --git a/pymodbus/client/asynchronous/schedulers/__init__.py b/pymodbus/client/asynchronous/schedulers/__init__.py index 591737579..279ad53f6 100644 --- a/pymodbus/client/asynchronous/schedulers/__init__.py +++ b/pymodbus/client/asynchronous/schedulers/__init__.py @@ -7,3 +7,4 @@ REACTOR = "reactor" IO_LOOP = "io_loop" ASYNC_IO = "async_io" +TRIO = "trio" diff --git a/pymodbus/client/asynchronous/tcp.py b/pymodbus/client/asynchronous/tcp.py index 45b7cfdf4..04ce2f1bb 100644 --- a/pymodbus/client/asynchronous/tcp.py +++ b/pymodbus/client/asynchronous/tcp.py @@ -5,7 +5,7 @@ from pymodbus.client.asynchronous.factory.tcp import get_factory from pymodbus.constants import Defaults from pymodbus.compat import IS_PYTHON3, PYTHON_VERSION -from pymodbus.client.asynchronous.schedulers import ASYNC_IO +from pymodbus.client.asynchronous.schedulers import ASYNC_IO, TRIO logger = logging.getLogger(__name__) @@ -34,11 +34,15 @@ def __new__(cls, scheduler, host="127.0.0.1", port=Defaults.Port, :param kwargs: Other extra args specific to Backend being used :return: """ - if (not (IS_PYTHON3 and PYTHON_VERSION >= (3, 4)) - and scheduler == ASYNC_IO): - logger.critical("ASYNCIO is supported only on python3") - import sys - sys.exit(1) + if not IS_PYTHON3: + if scheduler == ASYNC_IO and PYTHON_VERSION < (3, 4): + logger.critical("ASYNCIO is supported only on python3") + import sys + sys.exit(1) + elif scheduler == TRIO and PYTHON_VERSION < (3, 6): + logger.critical("TRIO is supported only on python3") + import sys + sys.exit(1) factory_class = get_factory(scheduler) yieldable = factory_class(host=host, port=port, framer=framer, source_address=source_address, diff --git a/pymodbus/client/asynchronous/trio/__init__.py b/pymodbus/client/asynchronous/trio/__init__.py new file mode 100644 index 000000000..a7ae1189f --- /dev/null +++ b/pymodbus/client/asynchronous/trio/__init__.py @@ -0,0 +1,285 @@ +import contextlib +import functools +import logging + +import trio + +from pymodbus.client.asynchronous.mixins import AsyncModbusClientMixin +from pymodbus.exceptions import ConnectionException +from pymodbus.utilities import hexlify_packets + +_logger = logging.getLogger(__name__) + + +class BaseModbusAsyncClientProtocol(AsyncModbusClientMixin): + """ + Trio specific implementation of asynchronous modbus client protocol. + """ + + #: Factory that created this instance. + factory = None + transport = None + + async def execute(self, request=None): + """ + Executes requests asynchronously + :param request: + :return: + """ + # with trio.fail_after(seconds=self._timeout): + resp = await self._execute(request) + + return resp + + def connection_made(self, transport): + """ + Called when a connection is made. + + The transport argument is the transport representing the connection. + :param transport: + :return: + """ + self.transport = transport + self._connectionMade() + + if self.factory: + self.factory.protocol_made_connection(self) + + def _connectionMade(self): + """ + Called upon a successful client connection. + """ + _logger.debug("Client connected to modbus server") + self._connected = True + + async def _execute(self, request, **kwargs): + """ + Starts the producer to send the next request to + consumer.write(Frame(request)) + """ + request.transaction_id = self.transaction.getNextTID() + packet = self.framer.buildPacket(request) + _logger.debug("send: " + hexlify_packets(packet)) + await self.write_transport(packet) + return await self._buildResponse(request.transaction_id) + + def _dataReceived(self, data): + ''' Get response, check for valid message, decode result + + :param data: The data returned from the server + ''' + _logger.debug("recv: " + hexlify_packets(data)) + unit = self.framer.decode_data(data).get("unit", 0) + self.framer.processIncomingPacket(data, self._handleResponse, unit=unit) + + async def write_transport(self, packet): + return await self.transport.send(packet) + + def _handleResponse(self, reply, **kwargs): + """ + Handle the processed response and link to correct deferred + + :param reply: The reply to process + """ + if reply is not None: + tid = reply.transaction_id + handler = self.transaction.getTransaction(tid) + if handler: + handler.value = reply + handler.event.set() + else: + _logger.debug("Unrequested message: " + str(reply)) + + async def _buildResponse(self, tid): + """ + Helper method to return a deferred response + for the current request. + + :param tid: The transaction identifier for this response + :returns: A defer linked to the latest request + """ + if not self._connected: + raise ConnectionException('Client is not connected') + + class EventAndValue: + def __init__(self): + self.event = trio.Event() + self.value = self + + event_and_value = EventAndValue() + self.transaction.addTransaction(event_and_value, tid) + await event_and_value.event.wait() + return event_and_value.value + + +class ModbusClientProtocol(BaseModbusAsyncClientProtocol): + """ + Asyncio specific implementation of asynchronous modbus client protocol. + """ + + #: Factory that created this instance. + factory = None + transport = None + + def data_received(self, data): + """ + Called when some data is received. + data is a non-empty bytes object containing the incoming data. + :param data: + :return: + """ + self._dataReceived(data) + + +# class ModbusUdpClientProtocol(BaseModbusAsyncClientProtocol): +# """ +# Asyncio specific implementation of asynchronous modbus udp client protocol. +# """ +# +# #: Factory that created this instance. +# factory = None +# +# def __init__(self, host=None, port=0, **kwargs): +# self.host = host +# self.port = port +# super(self.__class__, self).__init__(**kwargs) +# +# def datagram_received(self, data, addr): +# self._dataReceived(data) +# +# def write_transport(self, packet): +# return self.transport.sendto(packet) + + +class TrioModbusTcpClient(object): + """Client to connect to modbus device over TCP/IP.""" + + def __init__(self, host=None, port=502, protocol_class=None, loop=None): + """ + Initializes Asyncio Modbus Tcp Client + :param host: Host IP address + :param port: Port to connect + :param protocol_class: Protocol used to talk to modbus device. + """ + #: Protocol used to talk to modbus device. + self.protocol_class = protocol_class or ModbusClientProtocol + #: Current protocol instance. + self.protocol = None + #: Event loop to use. + + self.host = host + self.port = port + + self.connected = False + + @contextlib.asynccontextmanager + async def manage_connection(self): + async with trio.open_nursery() as nursery: + self.protocol = self._create_protocol() + client_stream = await trio.open_tcp_stream(self.host, self.port) + + write_send_channel, write_receive_channel = trio.open_memory_channel(0) + async with write_send_channel: + self.protocol.connection_made(transport=write_send_channel) + nursery.start_soon( + functools.partial( + self.sender, + stream=client_stream, + channel=write_receive_channel, + ), + ) + nursery.start_soon( + functools.partial(self.receiver, stream=client_stream), + ) + + yield self.protocol + + nursery.cancel_scope.cancel() + + async def sender(self, stream, channel): + async with channel: + async for data in channel: + await stream.send_all(data) + + async def receiver(self, stream): + async for data in stream: + self.protocol.data_received(data) + # await channel.send(data) + + def stop(self): + """ + Stops the client + :return: + """ + if self.connected: + if self.protocol: + if self.protocol.transport: + self.protocol.transport.close() + + def _create_protocol(self): + """ + Factory function to create initialized protocol instance. + """ + protocol = self.protocol_class() + protocol.factory = self + return protocol + + # # @asyncio.coroutine + # def connect(self): + # """ + # Connect and start Async client + # :return: + # """ + # _logger.debug('Connecting.') + # try: + # yield from self.loop.create_connection(self._create_protocol, + # self.host, + # self.port) + # _logger.info('Connected to %s:%s.' % (self.host, self.port)) + # except Exception as ex: + # _logger.warning('Failed to connect: %s' % ex) + # # asyncio.asynchronous(self._reconnect(), loop=self.loop) + + def protocol_made_connection(self, protocol): + """ + Protocol notification of successful connection. + """ + _logger.info('Protocol made connection.') + if not self.connected: + self.connected = True + self.protocol = protocol + else: + _logger.error('Factory protocol connect ' + 'callback called while connected.') + + def protocol_lost_connection(self, protocol): + """ + Protocol notification of lost connection. + """ + if self.connected: + _logger.info('Protocol lost connection.') + if protocol is not self.protocol: + _logger.error('Factory protocol callback called' + ' from unexpected protocol instance.') + + self.connected = False + self.protocol = None + # if self.host: + # asyncio.asynchronous(self._reconnect(), loop=self.loop) + else: + _logger.error('Factory protocol disconnect' + ' callback called while not connected.') + + +def init_tcp_client(proto_cls, host, port, **kwargs): + """ + Helper function to initialize tcp client + :param proto_cls: + :param loop: + :param host: + :param port: + :param kwargs: + :return: + """ + client = TrioModbusTcpClient(protocol_class=proto_cls, host=host, port=port) + return client diff --git a/tc.py b/tc.py new file mode 100644 index 000000000..3bc36cc42 --- /dev/null +++ b/tc.py @@ -0,0 +1,15 @@ +from pymodbus.client.asynchronous.tcp import AsyncModbusTCPClient as ModbusClient +from pymodbus.client.asynchronous import schedulers + +import trio + + +async def main(): + client = ModbusClient(scheduler=schedulers.TRIO, host="127.0.0.1", port=5020) + + async with client.manage_connection() as protocol: + # print(await protocol.read_coils(address=1, count=1, unit=0x01)) + print(await protocol.read_holding_registers(address=1, count=1, unit=0x01)) + # print(await protocol.read_holding_registers(address=10, count=1, unit=0x01)) + +trio.run(main) From 9320e2519b41dd3f881168b0d33dc58c4f6f5013 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Sun, 3 Jan 2021 11:20:06 -0500 Subject: [PATCH 02/37] read a few things in tc.py --- tc.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/tc.py b/tc.py index 3bc36cc42..98c487dd9 100644 --- a/tc.py +++ b/tc.py @@ -1,3 +1,6 @@ +import logging +import sys + from pymodbus.client.asynchronous.tcp import AsyncModbusTCPClient as ModbusClient from pymodbus.client.asynchronous import schedulers @@ -5,11 +8,20 @@ async def main(): + root_logger = logging.getLogger() + handler = logging.StreamHandler(stream=sys.stdout) + root_logger.addHandler(hdlr=handler) + # root_logger.setLevel(logging.DEBUG) + client = ModbusClient(scheduler=schedulers.TRIO, host="127.0.0.1", port=5020) async with client.manage_connection() as protocol: - # print(await protocol.read_coils(address=1, count=1, unit=0x01)) - print(await protocol.read_holding_registers(address=1, count=1, unit=0x01)) - # print(await protocol.read_holding_registers(address=10, count=1, unit=0x01)) + response = await protocol.read_coils(address=1, count=1, unit=0x01) + print(' response:', response.bits) + response = await protocol.read_holding_registers(address=1, count=1, unit=0x01) + print(' response:', response.registers) + response = await protocol.read_holding_registers(address=10, count=1, unit=0x01) + print(' response:', response.registers) + trio.run(main) From e7a365ba86a79d684d8109fd518077e26c52e19c Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Sun, 3 Jan 2021 11:20:55 -0500 Subject: [PATCH 03/37] add a few tests though these may not be super applicable even --- test/test_client_async.py | 23 +++++++++++ test/test_client_async_trio.py | 75 ++++++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+) create mode 100644 test/test_client_async_trio.py diff --git a/test/test_client_async.py b/test/test_client_async.py index 97aaae8bd..06d0acd43 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -21,6 +21,7 @@ from pymodbus.client.asynchronous.tornado import AsyncModbusSerialClient as AsyncTornadoModbusSerialClient from pymodbus.client.asynchronous.tornado import AsyncModbusTCPClient as AsyncTornadoModbusTcpClient from pymodbus.client.asynchronous.tornado import AsyncModbusUDPClient as AsyncTornadoModbusUdoClient +from pymodbus.client.asynchronous.trio import TrioModbusTcpClient as AsyncTrioModbusTcpClient from pymodbus.client.asynchronous import schedulers from pymodbus.factory import ClientDecoder from pymodbus.exceptions import ConnectionException @@ -108,6 +109,28 @@ def testTcpAsyncioClient(self, mock_gather, mock_loop): """ pytest.skip("TBD") + @pytest.mark.skipif(not IS_PYTHON3 or PYTHON_VERSION < (3, 6), + reason="requires python3.6 or above") + def testTcpTrioClient(self): + """ + Test the TCP Trio client + :return: + """ + + def test_callback(client): + pass + + def test_errback(client): + pass + + client = AsyncModbusTCPClient(schedulers.TRIO, + framer=ModbusSocketFramer(ClientDecoder()), + callback=test_callback, + errback=test_errback) + + assert(isinstance(client, AsyncTrioModbusTcpClient)) + assert(client.port == 502) + # -----------------------------------------------------------------------# # Test TLS Client client # -----------------------------------------------------------------------# diff --git a/test/test_client_async_trio.py b/test/test_client_async_trio.py new file mode 100644 index 000000000..4941d8970 --- /dev/null +++ b/test/test_client_async_trio.py @@ -0,0 +1,75 @@ +from pymodbus.compat import IS_PYTHON3, PYTHON_VERSION +import pytest +TRIO_AVAILABLE = IS_PYTHON3 and PYTHON_VERSION >= (3, 6) +if TRIO_AVAILABLE: + from unittest import mock + from pymodbus.client.asynchronous.trio import ( + ModbusClientProtocol, TrioModbusTcpClient)#, ModbusUdpClientProtocol) + # from test.trio_test_helper import return_as_coroutine, run_coroutine + from pymodbus.factory import ClientDecoder + from pymodbus.exceptions import ConnectionException + from pymodbus.transaction import ModbusSocketFramer + from pymodbus.bit_read_message import ReadCoilsRequest, ReadCoilsResponse + # protocols = [ModbusUdpClientProtocol, ModbusClientProtocol] + protocols = [ModbusClientProtocol] +else: + import mock + protocols = [None, None] + + +@pytest.mark.skipif(not TRIO_AVAILABLE, reason="requires python3.6 or above") +class TestTrioClient(object): + def test_factory_stop(self): + mock_protocol_class = mock.MagicMock() + client = TrioModbusTcpClient(protocol_class=mock_protocol_class) + + assert not client.connected + client.stop() + assert not client.connected + + # fake connected client: + client.protocol = mock.MagicMock() + client.connected = True + + client.stop() + client.protocol.transport.close.assert_called_once_with() + + def test_factory_protocol_made_connection(self): + mock_protocol_class = mock.MagicMock() + client = TrioModbusTcpClient(protocol_class=mock_protocol_class) + + assert not client.connected + assert client.protocol is None + client.protocol_made_connection(mock.sentinel.PROTOCOL) + assert client.connected + assert client.protocol is mock.sentinel.PROTOCOL + + client.protocol_made_connection(mock.sentinel.PROTOCOL_UNEXPECTED) + assert client.connected + assert client.protocol is mock.sentinel.PROTOCOL + + # @pytest.mark.trio + # async def test_factory_start_success(self): + # mock_protocol_class = mock.MagicMock() + # client = TrioModbusTcpClient( + # protocol_class=mock_protocol_class, + # host=mock.sentinel.HOST, + # port=mock.sentinel.PORT, + # ) + # + # async with client.manage_connection(): + # # mock_loop.create_connection.assert_called_once_with(mock.ANY, mock.sentinel.HOST, mock.sentinel.PORT) + # # assert mock_async.call_count == 0 + # pass + + @pytest.mark.parametrize("protocol", protocols) + def testClientProtocolConnectionMade(self, protocol): + """ + Test the client protocol close + :return: + """ + protocol = protocol(ModbusSocketFramer(ClientDecoder())) + transport = mock.MagicMock() + protocol.connection_made(transport) + assert protocol.transport == transport + # assert protocol.connected From 29346ff8ba4491ec4ab68de4291ec67fd3823195 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Sun, 3 Jan 2021 11:21:34 -0500 Subject: [PATCH 04/37] commented out attempt to break the framer does --- pymodbus/client/asynchronous/trio/__init__.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pymodbus/client/asynchronous/trio/__init__.py b/pymodbus/client/asynchronous/trio/__init__.py index a7ae1189f..64030f37d 100644 --- a/pymodbus/client/asynchronous/trio/__init__.py +++ b/pymodbus/client/asynchronous/trio/__init__.py @@ -204,7 +204,10 @@ async def sender(self, stream, channel): async def receiver(self, stream): async for data in stream: self.protocol.data_received(data) - # await channel.send(data) + + # seems like this should work due to the framer but it doesn't + # for d in data: + # self.protocol.data_received(bytes([d])) def stop(self): """ From 809972d3b9bcbc48af852d28d5893fb3aa800d6c Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Mon, 11 Jan 2021 09:01:54 -0500 Subject: [PATCH 05/37] slight simplification (sure, pieces may come back later such as when supporting RTU) --- pymodbus/client/asynchronous/trio/__init__.py | 105 +++--------------- tc.py | 19 ++-- 2 files changed, 27 insertions(+), 97 deletions(-) diff --git a/pymodbus/client/asynchronous/trio/__init__.py b/pymodbus/client/asynchronous/trio/__init__.py index 64030f37d..94e293073 100644 --- a/pymodbus/client/asynchronous/trio/__init__.py +++ b/pymodbus/client/asynchronous/trio/__init__.py @@ -21,15 +21,12 @@ class BaseModbusAsyncClientProtocol(AsyncModbusClientMixin): transport = None async def execute(self, request=None): - """ - Executes requests asynchronously - :param request: - :return: - """ - # with trio.fail_after(seconds=self._timeout): - resp = await self._execute(request) - - return resp + request.transaction_id = self.transaction.getNextTID() + packet = self.framer.buildPacket(request) + _logger.debug("send: " + hexlify_packets(packet)) + await self.transport.send_all(packet) + response = await self._buildResponse(request.transaction_id) + return response def connection_made(self, transport): """ @@ -40,29 +37,18 @@ def connection_made(self, transport): :return: """ self.transport = transport - self._connectionMade() + self._connection_made() if self.factory: self.factory.protocol_made_connection(self) - def _connectionMade(self): + def _connection_made(self): """ Called upon a successful client connection. """ _logger.debug("Client connected to modbus server") self._connected = True - async def _execute(self, request, **kwargs): - """ - Starts the producer to send the next request to - consumer.write(Frame(request)) - """ - request.transaction_id = self.transaction.getNextTID() - packet = self.framer.buildPacket(request) - _logger.debug("send: " + hexlify_packets(packet)) - await self.write_transport(packet) - return await self._buildResponse(request.transaction_id) - def _dataReceived(self, data): ''' Get response, check for valid message, decode result @@ -72,9 +58,6 @@ def _dataReceived(self, data): unit = self.framer.decode_data(data).get("unit", 0) self.framer.processIncomingPacket(data, self._handleResponse, unit=unit) - async def write_transport(self, packet): - return await self.transport.send(packet) - def _handleResponse(self, reply, **kwargs): """ Handle the processed response and link to correct deferred @@ -151,7 +134,7 @@ def data_received(self, data): # return self.transport.sendto(packet) -class TrioModbusTcpClient(object): +class TrioModbusTcpClient: """Client to connect to modbus device over TCP/IP.""" def __init__(self, host=None, port=502, protocol_class=None, loop=None): @@ -178,28 +161,14 @@ async def manage_connection(self): self.protocol = self._create_protocol() client_stream = await trio.open_tcp_stream(self.host, self.port) - write_send_channel, write_receive_channel = trio.open_memory_channel(0) - async with write_send_channel: - self.protocol.connection_made(transport=write_send_channel) - nursery.start_soon( - functools.partial( - self.sender, - stream=client_stream, - channel=write_receive_channel, - ), - ) - nursery.start_soon( - functools.partial(self.receiver, stream=client_stream), - ) - - yield self.protocol + self.protocol.connection_made(transport=client_stream) + nursery.start_soon( + functools.partial(self.receiver, stream=client_stream), + ) - nursery.cancel_scope.cancel() + yield self.protocol - async def sender(self, stream, channel): - async with channel: - async for data in channel: - await stream.send_all(data) + nursery.cancel_scope.cancel() async def receiver(self, stream): async for data in stream: @@ -209,16 +178,6 @@ async def receiver(self, stream): # for d in data: # self.protocol.data_received(bytes([d])) - def stop(self): - """ - Stops the client - :return: - """ - if self.connected: - if self.protocol: - if self.protocol.transport: - self.protocol.transport.close() - def _create_protocol(self): """ Factory function to create initialized protocol instance. @@ -227,22 +186,6 @@ def _create_protocol(self): protocol.factory = self return protocol - # # @asyncio.coroutine - # def connect(self): - # """ - # Connect and start Async client - # :return: - # """ - # _logger.debug('Connecting.') - # try: - # yield from self.loop.create_connection(self._create_protocol, - # self.host, - # self.port) - # _logger.info('Connected to %s:%s.' % (self.host, self.port)) - # except Exception as ex: - # _logger.warning('Failed to connect: %s' % ex) - # # asyncio.asynchronous(self._reconnect(), loop=self.loop) - def protocol_made_connection(self, protocol): """ Protocol notification of successful connection. @@ -255,24 +198,6 @@ def protocol_made_connection(self, protocol): _logger.error('Factory protocol connect ' 'callback called while connected.') - def protocol_lost_connection(self, protocol): - """ - Protocol notification of lost connection. - """ - if self.connected: - _logger.info('Protocol lost connection.') - if protocol is not self.protocol: - _logger.error('Factory protocol callback called' - ' from unexpected protocol instance.') - - self.connected = False - self.protocol = None - # if self.host: - # asyncio.asynchronous(self._reconnect(), loop=self.loop) - else: - _logger.error('Factory protocol disconnect' - ' callback called while not connected.') - def init_tcp_client(proto_cls, host, port, **kwargs): """ diff --git a/tc.py b/tc.py index 98c487dd9..b2bc0878c 100644 --- a/tc.py +++ b/tc.py @@ -1,3 +1,4 @@ +import contextlib import logging import sys @@ -15,13 +16,17 @@ async def main(): client = ModbusClient(scheduler=schedulers.TRIO, host="127.0.0.1", port=5020) - async with client.manage_connection() as protocol: - response = await protocol.read_coils(address=1, count=1, unit=0x01) - print(' response:', response.bits) - response = await protocol.read_holding_registers(address=1, count=1, unit=0x01) - print(' response:', response.registers) - response = await protocol.read_holding_registers(address=10, count=1, unit=0x01) - print(' response:', response.registers) + with contextlib.suppress(KeyboardInterrupt): + async with client.manage_connection() as protocol: + while True: + response = await protocol.read_coils(address=1, count=1, unit=0x01) + print(' response:', response.bits) + response = await protocol.read_holding_registers(address=1, count=1, unit=0x01) + print(' response:', response.registers) + response = await protocol.read_holding_registers(address=10, count=1, unit=0x01) + print(' response:', response.registers) + + await trio.sleep(1) trio.run(main) From 743bda124741254fc5e7852a93995effaf5deec0 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Mon, 11 Jan 2021 11:01:34 -0500 Subject: [PATCH 06/37] hacky but running trio server --- examples/common/trio_server.py | 52 +++++++++++++++++ pymodbus/server/trio.py | 101 +++++++++++++++++++++++++++++++++ 2 files changed, 153 insertions(+) create mode 100644 examples/common/trio_server.py create mode 100644 pymodbus/server/trio.py diff --git a/examples/common/trio_server.py b/examples/common/trio_server.py new file mode 100644 index 000000000..96d4e9fd9 --- /dev/null +++ b/examples/common/trio_server.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python + +import functools +import logging + +FORMAT = ( + "%(asctime)-15s %(threadName)-15s" + " %(levelname)-8s %(module)-15s:%(lineno)-8s %(message)s" +) +logging.basicConfig(format=FORMAT) +log = logging.getLogger() +log.setLevel(logging.DEBUG) + +from pymodbus.server.trio import tcp_server +from pymodbus.device import ModbusDeviceIdentification +from pymodbus.datastore import ModbusSequentialDataBlock +from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext + +import trio + + +async def run_server(): + store = ModbusSlaveContext( + di=ModbusSequentialDataBlock(0, [17] * 100), + co=ModbusSequentialDataBlock(0, [17] * 100), + hr=ModbusSequentialDataBlock(0, [17] * 100), + ir=ModbusSequentialDataBlock(0, [17] * 100), + ) + + context = ModbusServerContext(slaves=store, single=True) + + identity = ModbusDeviceIdentification() + identity.VendorName = "Pymodbus" + identity.ProductCode = "PM" + identity.VendorUrl = "http://github.com/riptideio/pymodbus/" + identity.ProductName = "Pymodbus Server" + identity.ModelName = "Pymodbus Server" + identity.MajorMinorRevision = "2.3.0" + + await trio.serve_tcp( + functools.partial( + tcp_server, + context=context, + identity=identity, + ), + port=5020, + host="0.0.0.0", + ) + + +if __name__ == "__main__": + trio.run(run_server) diff --git a/pymodbus/server/trio.py b/pymodbus/server/trio.py new file mode 100644 index 000000000..f37bb1a87 --- /dev/null +++ b/pymodbus/server/trio.py @@ -0,0 +1,101 @@ +from binascii import b2a_hex +import functools +import logging + +import trio + +from pymodbus.framer.socket_framer import ModbusSocketFramer +from pymodbus.factory import ServerDecoder + +_logger = logging.getLogger(__name__) + + +def execute(request, addr, context, response_send): + broadcast = False + try: + if False: # self.server.broadcast_enable and request.unit_id == 0: + broadcast = True + # if broadcasting then execute on all slave contexts, note response will be ignored + for unit_id in self.server.context.slaves(): + response = request.execute(self.server.context[unit_id]) + else: + context = context[request.unit_id] + response = request.execute(context) + except NoSuchSlaveException as ex: + _logger.debug("requested slave does " "not exist: %s" % request.unit_id) + if False: # self.server.ignore_missing_slaves: + return # the client will simply timeout waiting for a response + response = request.doException(merror.GatewayNoResponse) + except Exception as ex: + _logger.debug( + "Datastore unable to fulfill request: " "%s; %s", ex, traceback.format_exc() + ) + response = request.doException(merror.SlaveFailure) + # no response when broadcasting + if not broadcast: + response.transaction_id = request.transaction_id + response.unit_id = request.unit_id + response_send.send_nowait((response, addr)) + + +class EventAndValue: + def __init__(self): + self.event = trio.Event() + self.value = self + + +async def incoming(server_stream, framer, context, response_send): + async with response_send: + units = context.slaves() + if not isinstance(units, (list, tuple)): + units = [units] + + async for data in server_stream: + if isinstance(data, tuple): + data, *addr = data # addr is populated when talking over UDP + else: + addr = (None,) # empty tuple + + framer.processIncomingPacket( + data=data, + callback=functools.partial( + execute, + addr=addr, + context=context, + response_send=response_send, + ), + unit=units, + single=context.single, + ) + + +async def tcp_server(server_stream, context, identity): + + # if server.broadcast_enable: # pragma: no cover + # if 0 not in units: + # units.append(0) + response_send, response_receive = trio.open_memory_channel(max_buffer_size=0) + framer = ModbusSocketFramer(decoder=ServerDecoder(), client=None) + + async with trio.open_nursery() as nursery: + nursery.start_soon( + functools.partial( + incoming, + server_stream=server_stream, + framer=framer, + context=context, + response_send=response_send, + ) + ) + + async for message, addr in response_receive: + if message.should_respond: + # self.server.control.Counter.BusMessage += 1 + pdu = framer.buildPacket(message) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("send: [%s]- %s" % (message, b2a_hex(pdu))) + if addr == (None,): + await server_stream.send_all(pdu) + else: + 1 / 0 + self._send_(pdu, *addr) From 4123eb48a4e01bbdc19259bb51b32500adb6c0c0 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Mon, 11 Jan 2021 20:33:11 -0500 Subject: [PATCH 07/37] misc --- examples/common/trio_server.py | 3 ++- pymodbus/server/trio.py | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/examples/common/trio_server.py b/examples/common/trio_server.py index 96d4e9fd9..b503233d0 100644 --- a/examples/common/trio_server.py +++ b/examples/common/trio_server.py @@ -1,6 +1,5 @@ #!/usr/bin/env python -import functools import logging FORMAT = ( @@ -11,6 +10,8 @@ log = logging.getLogger() log.setLevel(logging.DEBUG) +import functools + from pymodbus.server.trio import tcp_server from pymodbus.device import ModbusDeviceIdentification from pymodbus.datastore import ModbusSequentialDataBlock diff --git a/pymodbus/server/trio.py b/pymodbus/server/trio.py index f37bb1a87..4588d17df 100644 --- a/pymodbus/server/trio.py +++ b/pymodbus/server/trio.py @@ -74,6 +74,8 @@ async def tcp_server(server_stream, context, identity): # if server.broadcast_enable: # pragma: no cover # if 0 not in units: # units.append(0) + if not context.single: + raise Exception("non-single context not yet supported") response_send, response_receive = trio.open_memory_channel(max_buffer_size=0) framer = ModbusSocketFramer(decoder=ServerDecoder(), client=None) From e3be3602bd9b76e7b0bf36b74350cc8f9406f5d0 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Tue, 12 Jan 2021 21:12:24 -0500 Subject: [PATCH 08/37] missing imports --- pymodbus/server/trio.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pymodbus/server/trio.py b/pymodbus/server/trio.py index 4588d17df..3377947df 100644 --- a/pymodbus/server/trio.py +++ b/pymodbus/server/trio.py @@ -1,11 +1,14 @@ from binascii import b2a_hex import functools import logging +import traceback import trio -from pymodbus.framer.socket_framer import ModbusSocketFramer +from pymodbus.exceptions import NoSuchSlaveException from pymodbus.factory import ServerDecoder +from pymodbus.framer.socket_framer import ModbusSocketFramer +from pymodbus.pdu import ModbusExceptions as merror _logger = logging.getLogger(__name__) From b0e72eb57fd943650ae97f4bb43481dbdb8be0fe Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Tue, 12 Jan 2021 21:19:16 -0500 Subject: [PATCH 09/37] use async_generator for asynccontextmanager --- pymodbus/client/asynchronous/trio/__init__.py | 3 ++- requirements.txt | 5 +++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/pymodbus/client/asynchronous/trio/__init__.py b/pymodbus/client/asynchronous/trio/__init__.py index 94e293073..ba56f947b 100644 --- a/pymodbus/client/asynchronous/trio/__init__.py +++ b/pymodbus/client/asynchronous/trio/__init__.py @@ -2,6 +2,7 @@ import functools import logging +import async_generator import trio from pymodbus.client.asynchronous.mixins import AsyncModbusClientMixin @@ -155,7 +156,7 @@ def __init__(self, host=None, port=502, protocol_class=None, loop=None): self.connected = False - @contextlib.asynccontextmanager + @async_generator.asynccontextmanager async def manage_connection(self): async with trio.open_nursery() as nursery: self.protocol = self._create_protocol() diff --git a/requirements.txt b/requirements.txt index b3a0eb930..10770b990 100644 --- a/requirements.txt +++ b/requirements.txt @@ -29,6 +29,11 @@ six>=1.15.0 # ------------------------------------------------------------------- # pyserial-asyncio==0.4.0;python_version>="3.4" +# ------------------------------------------------------------------- +# if you want to use the Trio asynchronous version, uncomment these +# ------------------------------------------------------------------- +# async_generator ~= 1.10 + # ------------------------------------------------------------------- # if you want to build the documentation, uncomment these # ------------------------------------------------------------------- From 0229932d76630f52d014a741f882aa1df509c795 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Wed, 13 Jan 2021 08:53:05 -0500 Subject: [PATCH 10/37] Add async_generator to extras_require:trio --- setup.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/setup.py b/setup.py index 60e7c4eef..78469ecb3 100644 --- a/setup.py +++ b/setup.py @@ -89,6 +89,9 @@ 'tornado': [ 'tornado == 4.5.3' ], + 'trio': [ + 'async_generator ~= 1.10', + ], 'repl': [ 'click>=7.0', 'prompt-toolkit==2.0.4', From a844e30f84a37f15f631ba52a43757456610fb91 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Wed, 13 Jan 2021 09:07:35 -0500 Subject: [PATCH 11/37] use https for github link --- examples/common/trio_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/common/trio_server.py b/examples/common/trio_server.py index b503233d0..217a2172e 100644 --- a/examples/common/trio_server.py +++ b/examples/common/trio_server.py @@ -33,7 +33,7 @@ async def run_server(): identity = ModbusDeviceIdentification() identity.VendorName = "Pymodbus" identity.ProductCode = "PM" - identity.VendorUrl = "http://github.com/riptideio/pymodbus/" + identity.VendorUrl = "https://github.com/riptideio/pymodbus/" identity.ProductName = "Pymodbus Server" identity.ModelName = "Pymodbus Server" identity.MajorMinorRevision = "2.3.0" From a4c5e714885913f5df36c3fa9d92f267156c19b4 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Thu, 14 Jan 2021 23:10:51 -0500 Subject: [PATCH 12/37] another test --- pymodbus/client/asynchronous/trio/__init__.py | 10 +++ requirements-tests.txt | 1 + test/test_client_async_trio.py | 1 + test/test_server_trio.py | 65 +++++++++++++++++++ 4 files changed, 77 insertions(+) create mode 100755 test/test_server_trio.py diff --git a/pymodbus/client/asynchronous/trio/__init__.py b/pymodbus/client/asynchronous/trio/__init__.py index ba56f947b..35a4fc995 100644 --- a/pymodbus/client/asynchronous/trio/__init__.py +++ b/pymodbus/client/asynchronous/trio/__init__.py @@ -179,6 +179,16 @@ async def receiver(self, stream): # for d in data: # self.protocol.data_received(bytes([d])) + def stop(self): + """ + Stops the client + :return: + """ + if self.connected: + if self.protocol: + if self.protocol.transport: + self.protocol.transport.close() + def _create_protocol(self): """ Factory function to create initialized protocol instance. diff --git a/requirements-tests.txt b/requirements-tests.txt index eba656a38..a940b0e00 100644 --- a/requirements-tests.txt +++ b/requirements-tests.txt @@ -9,6 +9,7 @@ pyasn1>=0.4.2 pyserial>=3.4 pytest-cov>=2.5.1 pytest>=3.5.0 +pytest-trio~=0.7.0 redis>=2.10.6 sqlalchemy>=1.1.15 #wsgiref>=0.1.2 diff --git a/test/test_client_async_trio.py b/test/test_client_async_trio.py index 4941d8970..515997d13 100644 --- a/test/test_client_async_trio.py +++ b/test/test_client_async_trio.py @@ -10,6 +10,7 @@ from pymodbus.exceptions import ConnectionException from pymodbus.transaction import ModbusSocketFramer from pymodbus.bit_read_message import ReadCoilsRequest, ReadCoilsResponse + from pymodbus.register_read_message import ReadHoldingRegistersResponse # protocols = [ModbusUdpClientProtocol, ModbusClientProtocol] protocols = [ModbusClientProtocol] else: diff --git a/test/test_server_trio.py b/test/test_server_trio.py new file mode 100755 index 000000000..c2ad0b82a --- /dev/null +++ b/test/test_server_trio.py @@ -0,0 +1,65 @@ +import functools + +import pytest +import trio + +from pymodbus.client.asynchronous.schedulers import TRIO +from pymodbus.client.asynchronous.tcp import AsyncModbusTCPClient +from pymodbus.datastore.context import ModbusServerContext, ModbusSlaveContext +from pymodbus.device import ModbusDeviceIdentification +from pymodbus.register_read_message import ReadHoldingRegistersResponse +from pymodbus.server.trio import tcp_server + + +class RunningTrioServer: + def __init__(self, context, host, port): + self.context = context + self.host = host + self.port = port + + +@pytest.fixture(name="trio_tcp_server") +async def trio_tcp_server_fixture(nursery): + host = "127.0.0.1" + + slave_context = ModbusSlaveContext() + server_context = ModbusServerContext(slaves=slave_context) + identity = ModbusDeviceIdentification() + + [listener] = await nursery.start( + functools.partial( + trio.serve_tcp, + functools.partial( + tcp_server, + context=server_context, + identity=identity, + ), + host=host, + port=0, + ), + ) + + yield RunningTrioServer( + context=server_context, host=host, port=listener.socket.getsockname()[1] + ) + + +@pytest.fixture(name="trio_tcp_client") +async def trio_tcp_client_fixture(trio_tcp_server): + modbus_client = AsyncModbusTCPClient( + scheduler=TRIO, + host=trio_tcp_server.host, + port=trio_tcp_server.port, + ) + + async with modbus_client.manage_connection() as protocol: + yield protocol + + +@pytest.mark.trio +async def test_read_holding_registers(trio_tcp_client, trio_tcp_server): + # TODO: learn what fx is about... + trio_tcp_server.context[0].setValues(fx=3, address=12, values=[40312, 40413, 40514]) + response = await trio_tcp_client.read_holding_registers(address=13, count=1) + assert isinstance(response, ReadHoldingRegistersResponse) + assert response.registers == [40413] From 4187d5fe54beac6e9ced4e13568184360c674468 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Thu, 14 Jan 2021 23:15:11 -0500 Subject: [PATCH 13/37] pytest-trio~=0.7.0;python_version>="3.6" --- requirements-tests.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements-tests.txt b/requirements-tests.txt index a940b0e00..02a4ff6b5 100644 --- a/requirements-tests.txt +++ b/requirements-tests.txt @@ -9,7 +9,7 @@ pyasn1>=0.4.2 pyserial>=3.4 pytest-cov>=2.5.1 pytest>=3.5.0 -pytest-trio~=0.7.0 +pytest-trio~=0.7.0;python_version>="3.6" redis>=2.10.6 sqlalchemy>=1.1.15 #wsgiref>=0.1.2 From 1a5ee534bde157f751795540e58f7bbe39e59e89 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Fri, 15 Jan 2021 22:40:06 -0500 Subject: [PATCH 14/37] misc --- pymodbus/client/asynchronous/trio/__init__.py | 83 ++++++++---------- pymodbus/server/trio.py | 23 ++--- test/test_client_async_trio.py | 4 +- test/test_server_trio.py | 86 +++++++++++++++++-- 4 files changed, 127 insertions(+), 69 deletions(-) diff --git a/pymodbus/client/asynchronous/trio/__init__.py b/pymodbus/client/asynchronous/trio/__init__.py index 35a4fc995..ebebb05b9 100644 --- a/pymodbus/client/asynchronous/trio/__init__.py +++ b/pymodbus/client/asynchronous/trio/__init__.py @@ -1,4 +1,3 @@ -import contextlib import functools import logging @@ -12,6 +11,16 @@ _logger = logging.getLogger(__name__) +class EventAndValue: + def __init__(self): + self.event = trio.Event() + self.value = self + + def __call__(self, value): + self.value = value + self.event.set() + + class BaseModbusAsyncClientProtocol(AsyncModbusClientMixin): """ Trio specific implementation of asynchronous modbus client protocol. @@ -20,13 +29,15 @@ class BaseModbusAsyncClientProtocol(AsyncModbusClientMixin): #: Factory that created this instance. factory = None transport = None + data = b'' async def execute(self, request=None): request.transaction_id = self.transaction.getNextTID() packet = self.framer.buildPacket(request) _logger.debug("send: " + hexlify_packets(packet)) + # TODO: should we retry on trio.BusyResourceError? await self.transport.send_all(packet) - response = await self._buildResponse(request.transaction_id) + response = await self._build_response(request.transaction_id) return response def connection_made(self, transport): @@ -38,28 +49,33 @@ def connection_made(self, transport): :return: """ self.transport = transport - self._connection_made() + _logger.debug("Client connected to modbus server") + self._connected = True if self.factory: self.factory.protocol_made_connection(self) - def _connection_made(self): - """ - Called upon a successful client connection. - """ - _logger.debug("Client connected to modbus server") - self._connected = True + # TODO: _connectionLost looks like functionality to have somewhere - def _dataReceived(self, data): + def _data_received(self, data): ''' Get response, check for valid message, decode result :param data: The data returned from the server ''' _logger.debug("recv: " + hexlify_packets(data)) - unit = self.framer.decode_data(data).get("unit", 0) - self.framer.processIncomingPacket(data, self._handleResponse, unit=unit) - def _handleResponse(self, reply, **kwargs): + # TODO: trying to help out the framer here by buffering up a bit but it + # is insufficient and still fails down below. + self.data += data + decoded = self.framer.decode_data(self.data) + if decoded == {}: + return + + unit = decoded.get("unit", 0) + self.framer.processIncomingPacket(self.data, self._handle_response, unit=unit) + self.data = b'' + + def _handle_response(self, reply, **kwargs): """ Handle the processed response and link to correct deferred @@ -69,12 +85,11 @@ def _handleResponse(self, reply, **kwargs): tid = reply.transaction_id handler = self.transaction.getTransaction(tid) if handler: - handler.value = reply - handler.event.set() + handler(reply) else: _logger.debug("Unrequested message: " + str(reply)) - async def _buildResponse(self, tid): + async def _build_response(self, tid): """ Helper method to return a deferred response for the current request. @@ -85,20 +100,15 @@ async def _buildResponse(self, tid): if not self._connected: raise ConnectionException('Client is not connected') - class EventAndValue: - def __init__(self): - self.event = trio.Event() - self.value = self - event_and_value = EventAndValue() self.transaction.addTransaction(event_and_value, tid) await event_and_value.event.wait() return event_and_value.value -class ModbusClientProtocol(BaseModbusAsyncClientProtocol): +class ModbusTcpClientProtocol(BaseModbusAsyncClientProtocol): """ - Asyncio specific implementation of asynchronous modbus client protocol. + Trio specific implementation of asynchronous modbus client protocol. """ #: Factory that created this instance. @@ -112,27 +122,10 @@ def data_received(self, data): :param data: :return: """ - self._dataReceived(data) - - -# class ModbusUdpClientProtocol(BaseModbusAsyncClientProtocol): -# """ -# Asyncio specific implementation of asynchronous modbus udp client protocol. -# """ -# -# #: Factory that created this instance. -# factory = None -# -# def __init__(self, host=None, port=0, **kwargs): -# self.host = host -# self.port = port -# super(self.__class__, self).__init__(**kwargs) -# -# def datagram_received(self, data, addr): -# self._dataReceived(data) -# -# def write_transport(self, packet): -# return self.transport.sendto(packet) + self._data_received(data) + + +# TODO: implement UDP class TrioModbusTcpClient: @@ -146,7 +139,7 @@ def __init__(self, host=None, port=502, protocol_class=None, loop=None): :param protocol_class: Protocol used to talk to modbus device. """ #: Protocol used to talk to modbus device. - self.protocol_class = protocol_class or ModbusClientProtocol + self.protocol_class = protocol_class or ModbusTcpClientProtocol #: Current protocol instance. self.protocol = None #: Event loop to use. diff --git a/pymodbus/server/trio.py b/pymodbus/server/trio.py index 3377947df..fbb76402b 100644 --- a/pymodbus/server/trio.py +++ b/pymodbus/server/trio.py @@ -5,7 +5,7 @@ import trio -from pymodbus.exceptions import NoSuchSlaveException +from pymodbus.exceptions import NotImplementedException, NoSuchSlaveException from pymodbus.factory import ServerDecoder from pymodbus.framer.socket_framer import ModbusSocketFramer from pymodbus.pdu import ModbusExceptions as merror @@ -24,14 +24,15 @@ def execute(request, addr, context, response_send): else: context = context[request.unit_id] response = request.execute(context) - except NoSuchSlaveException as ex: - _logger.debug("requested slave does " "not exist: %s" % request.unit_id) - if False: # self.server.ignore_missing_slaves: - return # the client will simply timeout waiting for a response - response = request.doException(merror.GatewayNoResponse) + # TODO: can't be covered until non-single server contexts are supported + # except NoSuchSlaveException as ex: + # _logger.debug("requested slave does not exist: %s" % request.unit_id) + # if False: # self.server.ignore_missing_slaves: + # return # the client will simply timeout waiting for a response + # response = request.doException(merror.GatewayNoResponse) except Exception as ex: _logger.debug( - "Datastore unable to fulfill request: " "%s; %s", ex, traceback.format_exc() + "Datastore unable to fulfill request: %s; %s", ex, traceback.format_exc() ) response = request.doException(merror.SlaveFailure) # no response when broadcasting @@ -41,12 +42,6 @@ def execute(request, addr, context, response_send): response_send.send_nowait((response, addr)) -class EventAndValue: - def __init__(self): - self.event = trio.Event() - self.value = self - - async def incoming(server_stream, framer, context, response_send): async with response_send: units = context.slaves() @@ -78,7 +73,7 @@ async def tcp_server(server_stream, context, identity): # if 0 not in units: # units.append(0) if not context.single: - raise Exception("non-single context not yet supported") + raise NotImplementedException("non-single context not yet supported") response_send, response_receive = trio.open_memory_channel(max_buffer_size=0) framer = ModbusSocketFramer(decoder=ServerDecoder(), client=None) diff --git a/test/test_client_async_trio.py b/test/test_client_async_trio.py index 515997d13..d96bc7849 100644 --- a/test/test_client_async_trio.py +++ b/test/test_client_async_trio.py @@ -4,7 +4,7 @@ if TRIO_AVAILABLE: from unittest import mock from pymodbus.client.asynchronous.trio import ( - ModbusClientProtocol, TrioModbusTcpClient)#, ModbusUdpClientProtocol) + ModbusTcpClientProtocol, TrioModbusTcpClient)#, ModbusUdpClientProtocol) # from test.trio_test_helper import return_as_coroutine, run_coroutine from pymodbus.factory import ClientDecoder from pymodbus.exceptions import ConnectionException @@ -12,7 +12,7 @@ from pymodbus.bit_read_message import ReadCoilsRequest, ReadCoilsResponse from pymodbus.register_read_message import ReadHoldingRegistersResponse # protocols = [ModbusUdpClientProtocol, ModbusClientProtocol] - protocols = [ModbusClientProtocol] + protocols = [ModbusTcpClientProtocol] else: import mock protocols = [None, None] diff --git a/test/test_server_trio.py b/test/test_server_trio.py index c2ad0b82a..850acd38c 100755 --- a/test/test_server_trio.py +++ b/test/test_server_trio.py @@ -3,12 +3,15 @@ import pytest import trio +from pymodbus.exceptions import NotImplementedException from pymodbus.client.asynchronous.schedulers import TRIO from pymodbus.client.asynchronous.tcp import AsyncModbusTCPClient from pymodbus.datastore.context import ModbusServerContext, ModbusSlaveContext from pymodbus.device import ModbusDeviceIdentification +from pymodbus.pdu import ExceptionResponse from pymodbus.register_read_message import ReadHoldingRegistersResponse from pymodbus.server.trio import tcp_server +from pymodbus.register_write_message import WriteMultipleRegistersResponse class RunningTrioServer: @@ -18,12 +21,18 @@ def __init__(self, context, host, port): self.port = port +@pytest.fixture(name="trio_server_context") +async def trio_server_context_fixture(): + slave_context = ModbusSlaveContext() + server_context = ModbusServerContext(slaves=slave_context) + + return server_context + + @pytest.fixture(name="trio_tcp_server") -async def trio_tcp_server_fixture(nursery): +async def trio_tcp_server_fixture(nursery, trio_server_context): host = "127.0.0.1" - slave_context = ModbusSlaveContext() - server_context = ModbusServerContext(slaves=slave_context) identity = ModbusDeviceIdentification() [listener] = await nursery.start( @@ -31,7 +40,7 @@ async def trio_tcp_server_fixture(nursery): trio.serve_tcp, functools.partial( tcp_server, - context=server_context, + context=trio_server_context, identity=identity, ), host=host, @@ -40,7 +49,7 @@ async def trio_tcp_server_fixture(nursery): ) yield RunningTrioServer( - context=server_context, host=host, port=listener.socket.getsockname()[1] + context=trio_server_context, host=host, port=listener.socket.getsockname()[1] ) @@ -58,8 +67,69 @@ async def trio_tcp_client_fixture(trio_tcp_server): @pytest.mark.trio async def test_read_holding_registers(trio_tcp_client, trio_tcp_server): + address = 12 + value = 40413 # TODO: learn what fx is about... - trio_tcp_server.context[0].setValues(fx=3, address=12, values=[40312, 40413, 40514]) - response = await trio_tcp_client.read_holding_registers(address=13, count=1) + trio_tcp_server.context[0].setValues( + fx=3, + address=address, + values=[value - 5, value, value + 5], + ) + # TODO: is the +1 good? seems related to ModbusSlaveContext.zero_mode probably + response = await trio_tcp_client.read_holding_registers( + address=address + 1, + count=1, + ) assert isinstance(response, ReadHoldingRegistersResponse) - assert response.registers == [40413] + assert response.registers == [value] + + +@pytest.mark.trio +async def test_write_holding_registers(trio_tcp_client, trio_tcp_server): + address = 12 + value = 40413 + + # TODO: is the +1 good? seems related to ModbusSlaveContext.zero_mode probably + response = await trio_tcp_client.write_registers( + address=address + 1, + values=[value], + ) + assert isinstance(response, WriteMultipleRegistersResponse) + + # TODO: learn what fx is about... + server_values = trio_tcp_server.context[0].getValues( + fx=3, + address=address, + count=3, + ) + assert server_values == [0, value, 0] + + +@pytest.mark.trio +async def test_tcp_server_raises_for_non_single_context(trio_server_context): + # TODO: Remove once non-single support is implemented + trio_server_context.single = False + with pytest.raises(NotImplementedException): + await tcp_server(server_stream=None, context=trio_server_context, identity=None) + + +@pytest.mark.trio +async def test_large_count_excepts(trio_tcp_client): + response = await trio_tcp_client.read_holding_registers( + address=0, + count=300, + ) + assert isinstance(response, ExceptionResponse) + + +@pytest.mark.trio +async def test_red(trio_tcp_server): + modbus_client = AsyncModbusTCPClient( + scheduler=TRIO, + host=trio_tcp_server.host, + port=trio_tcp_server.port, + ) + + async with modbus_client.manage_connection(): + async with modbus_client.manage_connection(): + pass From 83bf25071bdf56ece257e2e113a63dccf6bd692a Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Sun, 17 Jan 2021 22:48:09 -0500 Subject: [PATCH 15/37] misc --- pymodbus/client/asynchronous/trio/__init__.py | 13 +-- pymodbus/server/trio.py | 66 +++++++----- setup.py | 1 + test/test_server_trio.py | 100 ++++++++++++++---- 4 files changed, 119 insertions(+), 61 deletions(-) diff --git a/pymodbus/client/asynchronous/trio/__init__.py b/pymodbus/client/asynchronous/trio/__init__.py index ebebb05b9..9722acd67 100644 --- a/pymodbus/client/asynchronous/trio/__init__.py +++ b/pymodbus/client/asynchronous/trio/__init__.py @@ -29,7 +29,6 @@ class BaseModbusAsyncClientProtocol(AsyncModbusClientMixin): #: Factory that created this instance. factory = None transport = None - data = b'' async def execute(self, request=None): request.transaction_id = self.transaction.getNextTID() @@ -37,7 +36,8 @@ async def execute(self, request=None): _logger.debug("send: " + hexlify_packets(packet)) # TODO: should we retry on trio.BusyResourceError? await self.transport.send_all(packet) - response = await self._build_response(request.transaction_id) + with trio.fail_after(seconds=1): + response = await self._build_response(request.transaction_id) return response def connection_made(self, transport): @@ -64,15 +64,10 @@ def _data_received(self, data): ''' _logger.debug("recv: " + hexlify_packets(data)) - # TODO: trying to help out the framer here by buffering up a bit but it - # is insufficient and still fails down below. - self.data += data - decoded = self.framer.decode_data(self.data) - if decoded == {}: - return + decoded = self.framer.decode_data(data) unit = decoded.get("unit", 0) - self.framer.processIncomingPacket(self.data, self._handle_response, unit=unit) + self.framer.processIncomingPacket(data, self._handle_response, unit=unit) self.data = b'' def _handle_response(self, reply, **kwargs): diff --git a/pymodbus/server/trio.py b/pymodbus/server/trio.py index fbb76402b..364498026 100644 --- a/pymodbus/server/trio.py +++ b/pymodbus/server/trio.py @@ -5,7 +5,7 @@ import trio -from pymodbus.exceptions import NotImplementedException, NoSuchSlaveException +from pymodbus.exceptions import NoSuchSlaveException from pymodbus.factory import ServerDecoder from pymodbus.framer.socket_framer import ModbusSocketFramer from pymodbus.pdu import ModbusExceptions as merror @@ -13,28 +13,28 @@ _logger = logging.getLogger(__name__) -def execute(request, addr, context, response_send): +def execute(request, addr, context, response_send, ignore_missing_slaves, broadcast_enable): broadcast = False try: - if False: # self.server.broadcast_enable and request.unit_id == 0: + if broadcast_enable and request.unit_id == 0: broadcast = True # if broadcasting then execute on all slave contexts, note response will be ignored - for unit_id in self.server.context.slaves(): - response = request.execute(self.server.context[unit_id]) + for unit_id in context.slaves(): + response = request.execute(context[unit_id]) else: context = context[request.unit_id] response = request.execute(context) - # TODO: can't be covered until non-single server contexts are supported - # except NoSuchSlaveException as ex: - # _logger.debug("requested slave does not exist: %s" % request.unit_id) - # if False: # self.server.ignore_missing_slaves: - # return # the client will simply timeout waiting for a response - # response = request.doException(merror.GatewayNoResponse) - except Exception as ex: + except NoSuchSlaveException as ex: + _logger.debug("requested slave does not exist: %s" % request.unit_id) + if ignore_missing_slaves: + return # the client will simply timeout waiting for a response + response = request.doException(merror.GatewayNoResponse) + except Exception as ex: # pragma: no cover _logger.debug( "Datastore unable to fulfill request: %s; %s", ex, traceback.format_exc() ) response = request.doException(merror.SlaveFailure) + # no response when broadcasting if not broadcast: response.transaction_id = request.transaction_id @@ -42,17 +42,19 @@ def execute(request, addr, context, response_send): response_send.send_nowait((response, addr)) -async def incoming(server_stream, framer, context, response_send): +async def incoming(server_stream, framer, context, response_send, ignore_missing_slaves, broadcast_enable): async with response_send: units = context.slaves() - if not isinstance(units, (list, tuple)): + if not isinstance(units, (list, tuple)): # pragma: no cover units = [units] async for data in server_stream: - if isinstance(data, tuple): - data, *addr = data # addr is populated when talking over UDP - else: - addr = (None,) # empty tuple + # TODO: implement UDP support + # if isinstance(data, tuple): + # data, *addr = data # addr is populated when talking over UDP + # else: + # addr = (None,) # empty tuple + addr = (None,) # empty tuple framer.processIncomingPacket( data=data, @@ -61,19 +63,20 @@ async def incoming(server_stream, framer, context, response_send): addr=addr, context=context, response_send=response_send, + ignore_missing_slaves=ignore_missing_slaves, + broadcast_enable=broadcast_enable, ), unit=units, single=context.single, ) -async def tcp_server(server_stream, context, identity): +async def tcp_server(server_stream, context, identity, ignore_missing_slaves=False, broadcast_enable=False): + if broadcast_enable: + units = context.slaves() + if 0 not in context: + units.append(0) - # if server.broadcast_enable: # pragma: no cover - # if 0 not in units: - # units.append(0) - if not context.single: - raise NotImplementedException("non-single context not yet supported") response_send, response_receive = trio.open_memory_channel(max_buffer_size=0) framer = ModbusSocketFramer(decoder=ServerDecoder(), client=None) @@ -85,17 +88,22 @@ async def tcp_server(server_stream, context, identity): framer=framer, context=context, response_send=response_send, + ignore_missing_slaves=ignore_missing_slaves, + broadcast_enable=broadcast_enable, ) ) async for message, addr in response_receive: if message.should_respond: - # self.server.control.Counter.BusMessage += 1 pdu = framer.buildPacket(message) if _logger.isEnabledFor(logging.DEBUG): - _logger.debug("send: [%s]- %s" % (message, b2a_hex(pdu))) + # avoids the b2a_hex() conversion + _logger.debug('send: [%s]- %s' % (message, b2a_hex(pdu))) if addr == (None,): await server_stream.send_all(pdu) - else: - 1 / 0 - self._send_(pdu, *addr) + else: # pragma: no cover + # TODO: implement UDP support + # self._send_(pdu, *addr) + pass + else: # pragma: no cover + pass diff --git a/setup.py b/setup.py index 78469ecb3..8b60584b4 100644 --- a/setup.py +++ b/setup.py @@ -90,6 +90,7 @@ 'tornado == 4.5.3' ], 'trio': [ + 'trio ~= 0.17.0', 'async_generator ~= 1.10', ], 'repl': [ diff --git a/test/test_server_trio.py b/test/test_server_trio.py index 850acd38c..a7f91fbb0 100755 --- a/test/test_server_trio.py +++ b/test/test_server_trio.py @@ -1,14 +1,14 @@ import functools +import logging import pytest import trio -from pymodbus.exceptions import NotImplementedException from pymodbus.client.asynchronous.schedulers import TRIO from pymodbus.client.asynchronous.tcp import AsyncModbusTCPClient from pymodbus.datastore.context import ModbusServerContext, ModbusSlaveContext from pymodbus.device import ModbusDeviceIdentification -from pymodbus.pdu import ExceptionResponse +from pymodbus.pdu import ExceptionResponse, ModbusExceptions from pymodbus.register_read_message import ReadHoldingRegistersResponse from pymodbus.server.trio import tcp_server from pymodbus.register_write_message import WriteMultipleRegistersResponse @@ -22,15 +22,32 @@ def __init__(self, context, host, port): @pytest.fixture(name="trio_server_context") -async def trio_server_context_fixture(): - slave_context = ModbusSlaveContext() - server_context = ModbusServerContext(slaves=slave_context) +async def trio_server_context_fixture(request): + node = request.node + single = node.get_closest_marker("single") + no_contexts = node.get_closest_marker("no_contexts") + + if no_contexts: + slaves = {} + else: + slave_context = ModbusSlaveContext() + + if single: + slaves = slave_context + else: + slaves = {0: slave_context} + + server_context = ModbusServerContext(slaves=slaves, single=single) return server_context @pytest.fixture(name="trio_tcp_server") -async def trio_tcp_server_fixture(nursery, trio_server_context): +async def trio_tcp_server_fixture(request, nursery, trio_server_context): + node = request.node + broadcast_enable = node.get_closest_marker("broadcast_enable") + ignore_missing_slaves = node.get_closest_marker("ignore_missing_slaves") + host = "127.0.0.1" identity = ModbusDeviceIdentification() @@ -42,6 +59,8 @@ async def trio_tcp_server_fixture(nursery, trio_server_context): tcp_server, context=trio_server_context, identity=identity, + ignore_missing_slaves=ignore_missing_slaves, + broadcast_enable=broadcast_enable, ), host=host, port=0, @@ -49,7 +68,9 @@ async def trio_tcp_server_fixture(nursery, trio_server_context): ) yield RunningTrioServer( - context=trio_server_context, host=host, port=listener.socket.getsockname()[1] + context=trio_server_context, + host=host, + port=listener.socket.getsockname()[1], ) @@ -105,14 +126,6 @@ async def test_write_holding_registers(trio_tcp_client, trio_tcp_server): assert server_values == [0, value, 0] -@pytest.mark.trio -async def test_tcp_server_raises_for_non_single_context(trio_server_context): - # TODO: Remove once non-single support is implemented - trio_server_context.single = False - with pytest.raises(NotImplementedException): - await tcp_server(server_stream=None, context=trio_server_context, identity=None) - - @pytest.mark.trio async def test_large_count_excepts(trio_tcp_client): response = await trio_tcp_client.read_holding_registers( @@ -120,16 +133,57 @@ async def test_large_count_excepts(trio_tcp_client): count=300, ) assert isinstance(response, ExceptionResponse) + assert response.exception_code == ModbusExceptions.IllegalValue @pytest.mark.trio -async def test_red(trio_tcp_server): - modbus_client = AsyncModbusTCPClient( - scheduler=TRIO, - host=trio_tcp_server.host, - port=trio_tcp_server.port, +async def test_invalid_client_excepts_gateway_no_response(trio_tcp_client): + response = await trio_tcp_client.read_holding_registers( + address=0, + count=1, + unit=57, ) + assert isinstance(response, ExceptionResponse) + assert response.exception_code == ModbusExceptions.GatewayNoResponse + + +@pytest.mark.ignore_missing_slaves +@pytest.mark.trio +async def test_invalid_unit_times_out_when_ignoring_missing_slaves(trio_tcp_client): + with pytest.raises(trio.TooSlowError): + await trio_tcp_client.read_holding_registers( + address=0, + count=1, + unit=57, + ) + + +@pytest.mark.broadcast_enable +@pytest.mark.trio +async def test_times_out_when_broadcast_enabled(trio_tcp_client): + with pytest.raises(trio.TooSlowError): + await trio_tcp_client.read_holding_registers( + address=0, + count=1, + unit=0, + ) + + +@pytest.mark.broadcast_enable +@pytest.mark.no_contexts +@pytest.mark.trio +async def test_times_out_when_broadcast_enabled_and_no_contexts(trio_tcp_client): + with pytest.raises(trio.TooSlowError): + await trio_tcp_client.read_holding_registers( + address=0, + count=1, + unit=0, + ) + + +@pytest.mark.trio +async def test_logs_server_response_send(trio_tcp_client, caplog): + with caplog.at_level(logging.DEBUG): + await trio_tcp_client.read_holding_registers(address=0, count=1) - async with modbus_client.manage_connection(): - async with modbus_client.manage_connection(): - pass + assert "send: [ReadRegisterResponse (1)]- b'0001000000050003020000'" in caplog.text From d514d08f86e9ae0f70d64bde54347357fa60526a Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Thu, 22 Apr 2021 21:14:03 -0400 Subject: [PATCH 16/37] update test for logging change --- test/test_server_trio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_server_trio.py b/test/test_server_trio.py index a7f91fbb0..a647a3226 100755 --- a/test/test_server_trio.py +++ b/test/test_server_trio.py @@ -186,4 +186,4 @@ async def test_logs_server_response_send(trio_tcp_client, caplog): with caplog.at_level(logging.DEBUG): await trio_tcp_client.read_holding_registers(address=0, count=1) - assert "send: [ReadRegisterResponse (1)]- b'0001000000050003020000'" in caplog.text + assert "send: [ReadHoldingRegistersResponse (1)]- b'0001000000050003020000'" in caplog.text From 7bd1cbbeb1a047087ad462a9247cd40f3c4c8a84 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Thu, 22 Apr 2021 21:27:32 -0400 Subject: [PATCH 17/37] skip trio tests in py2 --- test/conftest.py | 1 + test/test_client_async.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/test/conftest.py b/test/conftest.py index 932e8124c..3ac3db995 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -5,4 +5,5 @@ collect_ignore = [ "test_client_async_asyncio.py", "test_server_asyncio.py", + "test_server_trio.py", ] diff --git a/test/test_client_async.py b/test/test_client_async.py index 54002ac2d..dd7094267 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -23,7 +23,8 @@ from pymodbus.client.asynchronous.tornado import AsyncModbusSerialClient as AsyncTornadoModbusSerialClient from pymodbus.client.asynchronous.tornado import AsyncModbusTCPClient as AsyncTornadoModbusTcpClient from pymodbus.client.asynchronous.tornado import AsyncModbusUDPClient as AsyncTornadoModbusUdoClient -from pymodbus.client.asynchronous.trio import TrioModbusTcpClient as AsyncTrioModbusTcpClient +if not (not IS_PYTHON3 or PYTHON_VERSION < (3, 6)): + from pymodbus.client.asynchronous.trio import TrioModbusTcpClient as AsyncTrioModbusTcpClient from pymodbus.client.asynchronous import schedulers from pymodbus.factory import ClientDecoder from pymodbus.exceptions import ConnectionException From 325a53e41466fdaa420b45bebe538117ab8a69b9 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Fri, 23 Apr 2021 12:38:34 -0400 Subject: [PATCH 18/37] unit test trio.execute() --- test/test_server_trio.py | 136 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 134 insertions(+), 2 deletions(-) diff --git a/test/test_server_trio.py b/test/test_server_trio.py index a647a3226..78d94f016 100755 --- a/test/test_server_trio.py +++ b/test/test_server_trio.py @@ -10,7 +10,7 @@ from pymodbus.device import ModbusDeviceIdentification from pymodbus.pdu import ExceptionResponse, ModbusExceptions from pymodbus.register_read_message import ReadHoldingRegistersResponse -from pymodbus.server.trio import tcp_server +from pymodbus.server.trio import execute, tcp_server from pymodbus.register_write_message import WriteMultipleRegistersResponse @@ -22,7 +22,7 @@ def __init__(self, context, host, port): @pytest.fixture(name="trio_server_context") -async def trio_server_context_fixture(request): +def trio_server_context_fixture(request): node = request.node single = node.get_closest_marker("single") no_contexts = node.get_closest_marker("no_contexts") @@ -42,6 +42,15 @@ async def trio_server_context_fixture(request): return server_context +@pytest.fixture(name="trio_server_multiunit_context") +def trio_server_multiunit_context_fixture(request): + slaves = {i: ModbusSlaveContext() for i in range(3)} + + server_context = ModbusServerContext(slaves=slaves, single=False) + + return server_context + + @pytest.fixture(name="trio_tcp_server") async def trio_tcp_server_fixture(request, nursery, trio_server_context): node = request.node @@ -187,3 +196,126 @@ async def test_logs_server_response_send(trio_tcp_client, caplog): await trio_tcp_client.read_holding_registers(address=0, count=1) assert "send: [ReadHoldingRegistersResponse (1)]- b'0001000000050003020000'" in caplog.text + + +class Response: + def __init__(self): + self.transaction_id = None + self.unit_id = None + + +class Request: + def __init__(self, unit_id, transaction_id=0, fail_to_execute=False): + self.exception_codes = [] + self.executed_contexts = [] + self.fail_to_execute = fail_to_execute + self.transaction_id = transaction_id + self.unit_id = unit_id + + def execute(self, context): + if self.fail_to_execute: + raise Exception('failing to execute for testing purposes') + self.executed_contexts.append(context) + return Response() + + def doException(self, exception): + self.exception_codes.append(exception) + return Response() + + +def test_execute_broadcasts(trio_server_multiunit_context): + context = trio_server_multiunit_context + + test_request = Request(unit_id=0) + execute( + request=test_request, + addr=None, + context=context, + response_send=None, + ignore_missing_slaves=False, + broadcast_enable=True, + ) + + assert len(test_request.executed_contexts) == len(context.slaves()) + assert test_request.exception_codes == [] + + +@pytest.mark.parametrize( + argnames=["unit_id", "broadcast_enabled"], + argvalues=[[0, False], [1, True]], +) +def test_execute_does_not_broadcast( + trio_server_multiunit_context, + unit_id, + broadcast_enabled, +): + context = trio_server_multiunit_context + response_send, response_receive = trio.open_memory_channel(max_buffer_size=1) + + test_request = Request(unit_id=unit_id) + execute( + request=test_request, + addr=None, + context=context, + response_send=response_send, + ignore_missing_slaves=False, + broadcast_enable=broadcast_enabled, + ) + + assert len(test_request.executed_contexts) == 1 + assert test_request.exception_codes == [] + + +def test_execute_does_ignore_missing_slaves(trio_server_multiunit_context): + context = trio_server_multiunit_context + + missing_unit_id = max(context.slaves()) + 1 + + test_request = Request(unit_id=missing_unit_id) + execute( + request=test_request, + addr=None, + context=context, + response_send=None, + ignore_missing_slaves=True, + broadcast_enable=True, + ) + + assert len(test_request.executed_contexts) == 0 + assert test_request.exception_codes == [] + + +def test_execute_does_not_ignore_missing_slaves(trio_server_multiunit_context): + context = trio_server_multiunit_context + response_send, response_receive = trio.open_memory_channel(max_buffer_size=1) + + missing_unit_id = max(context.slaves()) + 1 + + test_request = Request(unit_id=missing_unit_id) + execute( + request=test_request, + addr=None, + context=context, + response_send=response_send, + ignore_missing_slaves=False, + broadcast_enable=True, + ) + + assert test_request.exception_codes == [ModbusExceptions.GatewayNoResponse] + + +def test_execute_handles_slave_failure(trio_server_multiunit_context): + context = trio_server_multiunit_context + response_send, response_receive = trio.open_memory_channel(max_buffer_size=1) + + test_request = Request(unit_id=0, fail_to_execute=True) + execute( + request=test_request, + addr=None, + context=context, + response_send=response_send, + ignore_missing_slaves=False, + broadcast_enable=True, + ) + + assert test_request.exception_codes == [ModbusExceptions.SlaveFailure] From a2e368fbb02d189e89d8f299f7364018abb54f90 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Fri, 23 Apr 2021 12:38:52 -0400 Subject: [PATCH 19/37] simplify trio broadcast handling code --- pymodbus/server/trio.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pymodbus/server/trio.py b/pymodbus/server/trio.py index 364498026..1c3435ce8 100644 --- a/pymodbus/server/trio.py +++ b/pymodbus/server/trio.py @@ -19,8 +19,8 @@ def execute(request, addr, context, response_send, ignore_missing_slaves, broadc if broadcast_enable and request.unit_id == 0: broadcast = True # if broadcasting then execute on all slave contexts, note response will be ignored - for unit_id in context.slaves(): - response = request.execute(context[unit_id]) + for unit_id, unit_context in context: + response = request.execute(unit_context) else: context = context[request.unit_id] response = request.execute(context) From 023caffd3a3c438f28eabcf602b0fedcfa310cde Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Fri, 23 Apr 2021 21:45:28 -0400 Subject: [PATCH 20/37] test trio.incoming --- test/test_server_trio.py | 66 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 65 insertions(+), 1 deletion(-) diff --git a/test/test_server_trio.py b/test/test_server_trio.py index 78d94f016..9c66bc6f3 100755 --- a/test/test_server_trio.py +++ b/test/test_server_trio.py @@ -1,3 +1,4 @@ +import contextlib import functools import logging @@ -8,9 +9,11 @@ from pymodbus.client.asynchronous.tcp import AsyncModbusTCPClient from pymodbus.datastore.context import ModbusServerContext, ModbusSlaveContext from pymodbus.device import ModbusDeviceIdentification +from pymodbus.factory import ServerDecoder +from pymodbus.framer.socket_framer import ModbusSocketFramer from pymodbus.pdu import ExceptionResponse, ModbusExceptions from pymodbus.register_read_message import ReadHoldingRegistersResponse -from pymodbus.server.trio import execute, tcp_server +from pymodbus.server.trio import execute, incoming, tcp_server from pymodbus.register_write_message import WriteMultipleRegistersResponse @@ -319,3 +322,64 @@ def test_execute_handles_slave_failure(trio_server_multiunit_context): ) assert test_request.exception_codes == [ModbusExceptions.SlaveFailure] + + +@pytest.mark.trio +async def test_incoming_closes_response_send_channel(trio_server_context): + server_send, server_receive = trio.open_memory_channel(max_buffer_size=1) + response_send, response_receive = trio.open_memory_channel(max_buffer_size=1) + + server_send.close() + + await incoming( + server_stream=server_receive, + framer=None, + context=trio_server_context, + response_send=response_send, + ignore_missing_slaves=None, + broadcast_enable=None, + ) + + with pytest.raises(trio.ClosedResourceError): + response_send.send_nowait(None) + + +sample_read_data = b'\x00\x01\x00\x00\x00\x06\x00\x03\x00\r\x00\x01' + + +@pytest.mark.trio +@pytest.mark.parametrize( + argnames=['data_blocks'], + argvalues=[ + [[sample_read_data]], + # TODO: can the framer actually handle this? + # [[sample_read_data[:5], sample_read_data[5:]]], + # [[bytes([byte]) for byte in sample_read_data]], + ], +) +async def test_incoming_processes(trio_server_context, data_blocks): + server_send, server_receive = trio.open_memory_channel( + max_buffer_size=len(data_blocks), + ) + response_send, response_receive = trio.open_memory_channel(max_buffer_size=1) + framer = ModbusSocketFramer(decoder=ServerDecoder(), client=None) + + with response_receive: + with server_send: + for block in data_blocks: + server_send.send_nowait(block) + + await incoming( + server_stream=server_receive, + framer=framer, + context=trio_server_context, + response_send=response_send, + ignore_missing_slaves=None, + broadcast_enable=None, + ) + + responses = [response async for response in response_receive] + + assert len(responses) == 1 + [[response, address]] = responses + assert isinstance(response, ReadHoldingRegistersResponse) From b1fc23670ba2ebfbad20d8cbc513f86155b73664 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Fri, 23 Apr 2021 23:22:28 -0400 Subject: [PATCH 21/37] shift examples to the examples directory --- tc.py => examples/common/async_trio_client.py | 12 +++++++++++- examples/common/trio_server.py | 4 ++++ 2 files changed, 15 insertions(+), 1 deletion(-) rename tc.py => examples/common/async_trio_client.py (75%) diff --git a/tc.py b/examples/common/async_trio_client.py similarity index 75% rename from tc.py rename to examples/common/async_trio_client.py index b2bc0878c..7fa43a468 100644 --- a/tc.py +++ b/examples/common/async_trio_client.py @@ -1,3 +1,13 @@ +#!/usr/bin/env python +""" +Pymodbus Asynchronous Client Examples +-------------------------------------------------------------------------- + +The following is an example of how to use the asynchronous modbus +client implementation from pymodbus with Trio. + +The example is only valid on Python3.6 and above +""" import contextlib import logging import sys @@ -12,7 +22,7 @@ async def main(): root_logger = logging.getLogger() handler = logging.StreamHandler(stream=sys.stdout) root_logger.addHandler(hdlr=handler) - # root_logger.setLevel(logging.DEBUG) + root_logger.setLevel(logging.DEBUG) client = ModbusClient(scheduler=schedulers.TRIO, host="127.0.0.1", port=5020) diff --git a/examples/common/trio_server.py b/examples/common/trio_server.py index 217a2172e..638d456d1 100644 --- a/examples/common/trio_server.py +++ b/examples/common/trio_server.py @@ -1,5 +1,9 @@ #!/usr/bin/env python +""" +Pymodbus Trio Server Example +-------------------------------------------------------------------------- +""" import logging FORMAT = ( From 707035ed4a3def56e292e46e2266a1e52e57a41e Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Sat, 24 Apr 2021 00:02:31 -0400 Subject: [PATCH 22/37] first round of trio documentation additions --- doc/source/example/async_trio_client.rst | 4 ++++ doc/source/example/async_trio_server.rst | 4 ++++ doc/source/example/modules.rst | 2 ++ doc/source/library/pymodbus.client.asynchronous.rst | 1 + doc/source/library/pymodbus.client.asynchronous.trio.rst | 8 ++++++++ doc/source/library/pymodbus.server.rst | 7 +++++++ pymodbus/client/asynchronous/__init__.py | 7 +++++-- pymodbus/client/asynchronous/trio/__init__.py | 4 ++++ pymodbus/server/trio.py | 5 +++++ 9 files changed, 40 insertions(+), 2 deletions(-) create mode 100644 doc/source/example/async_trio_client.rst create mode 100644 doc/source/example/async_trio_server.rst create mode 100644 doc/source/library/pymodbus.client.asynchronous.trio.rst diff --git a/doc/source/example/async_trio_client.rst b/doc/source/example/async_trio_client.rst new file mode 100644 index 000000000..39470f74d --- /dev/null +++ b/doc/source/example/async_trio_client.rst @@ -0,0 +1,4 @@ +================================================== +Async Trio Client Example +================================================== +.. literalinclude:: ../../../examples/common/async_trio_client.py \ No newline at end of file diff --git a/doc/source/example/async_trio_server.rst b/doc/source/example/async_trio_server.rst new file mode 100644 index 000000000..31fc4afc3 --- /dev/null +++ b/doc/source/example/async_trio_server.rst @@ -0,0 +1,4 @@ +================================================== +Async Trio Server Example +================================================== +.. literalinclude:: ../../../examples/common/trio_server.py \ No newline at end of file diff --git a/doc/source/example/modules.rst b/doc/source/example/modules.rst index af5a8f4a7..530c65012 100644 --- a/doc/source/example/modules.rst +++ b/doc/source/example/modules.rst @@ -10,6 +10,8 @@ Examples async_asyncio_serial_client async_tornado_client async_tornado_client_serial + async_trio_client + async_trio_server async_twisted_client async_twisted_client_serial asynchronous_processor diff --git a/doc/source/library/pymodbus.client.asynchronous.rst b/doc/source/library/pymodbus.client.asynchronous.rst index c72291a8d..a4c4bad05 100644 --- a/doc/source/library/pymodbus.client.asynchronous.rst +++ b/doc/source/library/pymodbus.client.asynchronous.rst @@ -15,6 +15,7 @@ Subpackages pymodbus.client.asynchronous.factory pymodbus.client.asynchronous.schedulers pymodbus.client.asynchronous.tornado + pymodbus.client.asynchronous.trio pymodbus.client.asynchronous.twisted Submodules diff --git a/doc/source/library/pymodbus.client.asynchronous.trio.rst b/doc/source/library/pymodbus.client.asynchronous.trio.rst new file mode 100644 index 000000000..0f694baa5 --- /dev/null +++ b/doc/source/library/pymodbus.client.asynchronous.trio.rst @@ -0,0 +1,8 @@ +pymodbus\.client\.asynchronous\.trio package +=============================================== + +.. automodule:: pymodbus.client.asynchronous.trio + :members: + :undoc-members: + :show-inheritance: + diff --git a/doc/source/library/pymodbus.server.rst b/doc/source/library/pymodbus.server.rst index 67f492524..8b14e08f3 100644 --- a/doc/source/library/pymodbus.server.rst +++ b/doc/source/library/pymodbus.server.rst @@ -25,4 +25,11 @@ pymodbus\.server\.sync module :undoc-members: :show-inheritance: +pymodbus\.server\.trio module +----------------------------- + +.. automodule:: pymodbus.server.trio + :members: + :undoc-members: + :show-inheritance: diff --git a/pymodbus/client/asynchronous/__init__.py b/pymodbus/client/asynchronous/__init__.py index b7d084de3..3ca437243 100644 --- a/pymodbus/client/asynchronous/__init__.py +++ b/pymodbus/client/asynchronous/__init__.py @@ -1,6 +1,6 @@ """ -Async Modbus Client implementation based on Twisted, tornado and asyncio ------------------------------------------------------------------------- +Async Modbus Client implementation based on Twisted, tornado, asyncio, and Trio +------------------------------------------------------------------------------- Example run:: @@ -21,6 +21,9 @@ # For asyncio based asynchronous client use event_loop, client = Client(schedulers.ASYNC_IO, port=5020) + # For asyncio based asynchronous client use + client = Client(schedulers.TRIO, port=5020) + # Here event_loop is a thread which would control the backend and future is # a Future/deffered object which would be used to # add call backs to run asynchronously. diff --git a/pymodbus/client/asynchronous/trio/__init__.py b/pymodbus/client/asynchronous/trio/__init__.py index 9722acd67..70cf9911b 100644 --- a/pymodbus/client/asynchronous/trio/__init__.py +++ b/pymodbus/client/asynchronous/trio/__init__.py @@ -1,3 +1,7 @@ +""" +Implementation of a Modbus Client using Trio +-------------------------------------------- +""" import functools import logging diff --git a/pymodbus/server/trio.py b/pymodbus/server/trio.py index 1c3435ce8..5fe903b31 100644 --- a/pymodbus/server/trio.py +++ b/pymodbus/server/trio.py @@ -1,3 +1,8 @@ +""" +Implementation of a Trio Modbus Server +------------------------------------------ + +""" from binascii import b2a_hex import functools import logging From 61dc891a486b949fc3653f2c6657a88c394c5cb9 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Sat, 24 Apr 2021 16:48:05 -0400 Subject: [PATCH 23/37] more docstrings and some refactors --- pymodbus/client/asynchronous/tcp.py | 5 +- pymodbus/client/asynchronous/trio/__init__.py | 64 ++++++---- pymodbus/server/trio.py | 110 ++++++++++++------ test/test_server_trio.py | 22 ++-- 4 files changed, 130 insertions(+), 71 deletions(-) diff --git a/pymodbus/client/asynchronous/tcp.py b/pymodbus/client/asynchronous/tcp.py index 04ce2f1bb..6c6514e2f 100644 --- a/pymodbus/client/asynchronous/tcp.py +++ b/pymodbus/client/asynchronous/tcp.py @@ -25,6 +25,7 @@ def __new__(cls, scheduler, host="127.0.0.1", port=Defaults.Port, - reactor (Twisted) - io_loop (Tornado) - async_io (asyncio) + - trio (Trio) :param scheduler: Backend to use :param host: Host IP address :param port: Port @@ -36,11 +37,11 @@ def __new__(cls, scheduler, host="127.0.0.1", port=Defaults.Port, """ if not IS_PYTHON3: if scheduler == ASYNC_IO and PYTHON_VERSION < (3, 4): - logger.critical("ASYNCIO is supported only on python3") + logger.critical("ASYNCIO is supported only on Python >= 3.4") import sys sys.exit(1) elif scheduler == TRIO and PYTHON_VERSION < (3, 6): - logger.critical("TRIO is supported only on python3") + logger.critical("TRIO is supported only on Python >= 3.6") import sys sys.exit(1) factory_class = get_factory(scheduler) diff --git a/pymodbus/client/asynchronous/trio/__init__.py b/pymodbus/client/asynchronous/trio/__init__.py index 70cf9911b..5ed14b33d 100644 --- a/pymodbus/client/asynchronous/trio/__init__.py +++ b/pymodbus/client/asynchronous/trio/__init__.py @@ -15,19 +15,24 @@ _logger = logging.getLogger(__name__) -class EventAndValue: +class _EventAndValue: + """ + A helper class for translating between the existing callback idioms and + those of Trio. + """ def __init__(self): self.event = trio.Event() self.value = self - def __call__(self, value): + def set(self, value): + """Assign a value and set the underlying trio event.""" self.value = value self.event.set() class BaseModbusAsyncClientProtocol(AsyncModbusClientMixin): """ - Trio specific implementation of asynchronous modbus client protocol. + Trio specific implementation of the asynchronous modbus client protocol. """ #: Factory that created this instance. @@ -35,6 +40,11 @@ class BaseModbusAsyncClientProtocol(AsyncModbusClientMixin): transport = None async def execute(self, request=None): + """ + Executes requests asynchronously + :param request: + :return: + """ request.transaction_id = self.transaction.getNextTID() packet = self.framer.buildPacket(request) _logger.debug("send: " + hexlify_packets(packet)) @@ -62,10 +72,11 @@ def connection_made(self, transport): # TODO: _connectionLost looks like functionality to have somewhere def _data_received(self, data): - ''' Get response, check for valid message, decode result + """ + Get a response, check for a valid message, decode the result. :param data: The data returned from the server - ''' + """ _logger.debug("recv: " + hexlify_packets(data)) decoded = self.framer.decode_data(data) @@ -76,7 +87,7 @@ def _data_received(self, data): def _handle_response(self, reply, **kwargs): """ - Handle the processed response and link to correct deferred + Handle the processed response and link to correct deferred. :param reply: The reply to process """ @@ -90,24 +101,24 @@ def _handle_response(self, reply, **kwargs): async def _build_response(self, tid): """ - Helper method to return a deferred response - for the current request. + Helper method to wait for and collect the result of the passed + transaction ID. :param tid: The transaction identifier for this response - :returns: A defer linked to the latest request + :returns: The decoded response. """ if not self._connected: raise ConnectionException('Client is not connected') - event_and_value = EventAndValue() - self.transaction.addTransaction(event_and_value, tid) + event_and_value = _EventAndValue() + self.transaction.addTransaction(event_and_value.set, tid) await event_and_value.event.wait() return event_and_value.value class ModbusTcpClientProtocol(BaseModbusAsyncClientProtocol): """ - Trio specific implementation of asynchronous modbus client protocol. + Trio specific implementation of the asynchronous modbus client protocol. """ #: Factory that created this instance. @@ -128,11 +139,11 @@ def data_received(self, data): class TrioModbusTcpClient: - """Client to connect to modbus device over TCP/IP.""" + """Client to connect to a modbus device over TCP/IP.""" - def __init__(self, host=None, port=502, protocol_class=None, loop=None): + def __init__(self, host=None, port=502, protocol_class=None): """ - Initializes Asyncio Modbus Tcp Client + Initializes a Trio Modbus Tcp Client :param host: Host IP address :param port: Port to connect :param protocol_class: Protocol used to talk to modbus device. @@ -141,7 +152,6 @@ def __init__(self, host=None, port=502, protocol_class=None, loop=None): self.protocol_class = protocol_class or ModbusTcpClientProtocol #: Current protocol instance. self.protocol = None - #: Event loop to use. self.host = host self.port = port @@ -150,30 +160,35 @@ def __init__(self, host=None, port=502, protocol_class=None, loop=None): @async_generator.asynccontextmanager async def manage_connection(self): + """ + Create a context manager to open the connection to the server and + close it when leaving the context block. + :return: + """ async with trio.open_nursery() as nursery: self.protocol = self._create_protocol() client_stream = await trio.open_tcp_stream(self.host, self.port) self.protocol.connection_made(transport=client_stream) nursery.start_soon( - functools.partial(self.receiver, stream=client_stream), + functools.partial(self._receiver, stream=client_stream), ) yield self.protocol nursery.cancel_scope.cancel() - async def receiver(self, stream): + async def _receiver(self, stream): + """ + Process incoming raw stream data. + :return: + """ async for data in stream: self.protocol.data_received(data) - # seems like this should work due to the framer but it doesn't - # for d in data: - # self.protocol.data_received(bytes([d])) - def stop(self): """ - Stops the client + Stop the client. :return: """ if self.connected: @@ -184,6 +199,7 @@ def stop(self): def _create_protocol(self): """ Factory function to create initialized protocol instance. + :return: The initialized protocol """ protocol = self.protocol_class() protocol.factory = self @@ -192,6 +208,7 @@ def _create_protocol(self): def protocol_made_connection(self, protocol): """ Protocol notification of successful connection. + :return: """ _logger.info('Protocol made connection.') if not self.connected: @@ -206,7 +223,6 @@ def init_tcp_client(proto_cls, host, port, **kwargs): """ Helper function to initialize tcp client :param proto_cls: - :param loop: :param host: :param port: :param kwargs: diff --git a/pymodbus/server/trio.py b/pymodbus/server/trio.py index 5fe903b31..c5708cf44 100644 --- a/pymodbus/server/trio.py +++ b/pymodbus/server/trio.py @@ -18,36 +18,67 @@ _logger = logging.getLogger(__name__) -def execute(request, addr, context, response_send, ignore_missing_slaves, broadcast_enable): - broadcast = False - try: - if broadcast_enable and request.unit_id == 0: - broadcast = True - # if broadcasting then execute on all slave contexts, note response will be ignored - for unit_id, unit_context in context: - response = request.execute(unit_context) - else: - context = context[request.unit_id] - response = request.execute(context) - except NoSuchSlaveException as ex: - _logger.debug("requested slave does not exist: %s" % request.unit_id) - if ignore_missing_slaves: - return # the client will simply timeout waiting for a response - response = request.doException(merror.GatewayNoResponse) - except Exception as ex: # pragma: no cover - _logger.debug( - "Datastore unable to fulfill request: %s; %s", ex, traceback.format_exc() - ) - response = request.doException(merror.SlaveFailure) +class Executor: + """ + A helper class to handle the callback interface and feed the result into + the backend channels. + """ + def __init__(self, addr, context, response_send, ignore_missing_slaves, broadcast_enable): + """ + :param addr: An (interface, port) to bind to. + :param context: The ModbusServerContext datastore + :param response_send: The response send channel + :param ignore_missing_slaves: True to not send errors on a request + to a missing slave + :param broadcast_enable: True to treat unit_id 0 as broadcast address, + False to treat 0 as any other unit_id + :return: + """ + self.addr = addr + self.context = context + self.response_send = response_send + self.ignore_missing_slaves = ignore_missing_slaves + self.broadcast_enable = broadcast_enable + + def execute(self, request): + """ The callback to call with the resulting message + + :param request: The decoded request message + """ + + broadcast = False + try: + if self.broadcast_enable and request.unit_id == 0: + broadcast = True + # if broadcasting then execute on all slave contexts, note response will be ignored + for unit_id, unit_context in self.context: + response = request.execute(unit_context) + else: + context = self.context[request.unit_id] + response = request.execute(context) + except NoSuchSlaveException as ex: + _logger.debug("requested slave does not exist: %s" % request.unit_id) + if self.ignore_missing_slaves: + return # the client will simply timeout waiting for a response + response = request.doException(merror.GatewayNoResponse) + except Exception as ex: # pragma: no cover + _logger.debug( + "Datastore unable to fulfill request: %s; %s", ex, traceback.format_exc() + ) + response = request.doException(merror.SlaveFailure) - # no response when broadcasting - if not broadcast: - response.transaction_id = request.transaction_id - response.unit_id = request.unit_id - response_send.send_nowait((response, addr)) + # no response when broadcasting + if not broadcast: + response.transaction_id = request.transaction_id + response.unit_id = request.unit_id + self.response_send.send_nowait((response, self.addr)) async def incoming(server_stream, framer, context, response_send, ignore_missing_slaves, broadcast_enable): + """ + Process the incoming data stream and feed it to the framer. + :return: + """ async with response_send: units = context.slaves() if not isinstance(units, (list, tuple)): # pragma: no cover @@ -61,22 +92,33 @@ async def incoming(server_stream, framer, context, response_send, ignore_missing # addr = (None,) # empty tuple addr = (None,) # empty tuple + executor = Executor( + addr=addr, + context=context, + response_send=response_send, + ignore_missing_slaves=ignore_missing_slaves, + broadcast_enable=broadcast_enable, + ) + framer.processIncomingPacket( data=data, - callback=functools.partial( - execute, - addr=addr, - context=context, - response_send=response_send, - ignore_missing_slaves=ignore_missing_slaves, - broadcast_enable=broadcast_enable, - ), + callback=executor.execute, unit=units, single=context.single, ) async def tcp_server(server_stream, context, identity, ignore_missing_slaves=False, broadcast_enable=False): + """ + :param server_stream: The TCP stream. Generally provided by ``trio.serve_tcp()``. + :param context: The ModbusServerContext datastore + :param identity: An optional identity structure + :param ignore_missing_slaves: True to not send errors on a request + to a missing slave + :param broadcast_enable: True to treat unit_id 0 as broadcast address, + False to treat 0 as any other unit_id + :return: + """ if broadcast_enable: units = context.slaves() if 0 not in context: diff --git a/test/test_server_trio.py b/test/test_server_trio.py index 9c66bc6f3..4da51d239 100755 --- a/test/test_server_trio.py +++ b/test/test_server_trio.py @@ -13,7 +13,7 @@ from pymodbus.framer.socket_framer import ModbusSocketFramer from pymodbus.pdu import ExceptionResponse, ModbusExceptions from pymodbus.register_read_message import ReadHoldingRegistersResponse -from pymodbus.server.trio import execute, incoming, tcp_server +from pymodbus.server.trio import Executor, incoming, tcp_server from pymodbus.register_write_message import WriteMultipleRegistersResponse @@ -230,14 +230,14 @@ def test_execute_broadcasts(trio_server_multiunit_context): context = trio_server_multiunit_context test_request = Request(unit_id=0) - execute( - request=test_request, + executor = Executor( addr=None, context=context, response_send=None, ignore_missing_slaves=False, broadcast_enable=True, ) + executor.execute(request=test_request) assert len(test_request.executed_contexts) == len(context.slaves()) assert test_request.exception_codes == [] @@ -256,14 +256,14 @@ def test_execute_does_not_broadcast( response_send, response_receive = trio.open_memory_channel(max_buffer_size=1) test_request = Request(unit_id=unit_id) - execute( - request=test_request, + executor = Executor( addr=None, context=context, response_send=response_send, ignore_missing_slaves=False, broadcast_enable=broadcast_enabled, ) + executor.execute(request=test_request) assert len(test_request.executed_contexts) == 1 assert test_request.exception_codes == [] @@ -275,14 +275,14 @@ def test_execute_does_ignore_missing_slaves(trio_server_multiunit_context): missing_unit_id = max(context.slaves()) + 1 test_request = Request(unit_id=missing_unit_id) - execute( - request=test_request, + executor = Executor( addr=None, context=context, response_send=None, ignore_missing_slaves=True, broadcast_enable=True, ) + executor.execute(request=test_request) assert len(test_request.executed_contexts) == 0 assert test_request.exception_codes == [] @@ -295,14 +295,14 @@ def test_execute_does_not_ignore_missing_slaves(trio_server_multiunit_context): missing_unit_id = max(context.slaves()) + 1 test_request = Request(unit_id=missing_unit_id) - execute( - request=test_request, + executor = Executor( addr=None, context=context, response_send=response_send, ignore_missing_slaves=False, broadcast_enable=True, ) + executor.execute(request=test_request) assert test_request.exception_codes == [ModbusExceptions.GatewayNoResponse] @@ -312,14 +312,14 @@ def test_execute_handles_slave_failure(trio_server_multiunit_context): response_send, response_receive = trio.open_memory_channel(max_buffer_size=1) test_request = Request(unit_id=0, fail_to_execute=True) - execute( - request=test_request, + executor = Executor( addr=None, context=context, response_send=response_send, ignore_missing_slaves=False, broadcast_enable=True, ) + executor.execute(request=test_request) assert test_request.exception_codes == [ModbusExceptions.SlaveFailure] From 6f1af2faa1f15ad5badbc90319fd51d965ffe252 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Sat, 24 Apr 2021 16:57:52 -0400 Subject: [PATCH 24/37] simplify trio client test file --- test/conftest.py | 12 +++- test/test_client_async_trio.py | 121 ++++++++++++++++----------------- 2 files changed, 67 insertions(+), 66 deletions(-) diff --git a/test/conftest.py b/test/conftest.py index 3ac3db995..48fa57bf0 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,9 +1,17 @@ from pymodbus.compat import PYTHON_VERSION + +collect_ignore = [] + if PYTHON_VERSION < (3,): # These files use syntax introduced between Python 2 and our lowest # supported Python 3 version. We just won't run these tests in Python 2. - collect_ignore = [ + collect_ignore.extend([ "test_client_async_asyncio.py", "test_server_asyncio.py", + ]) + +if PYTHON_VERSION < (3, 6): + collect_ignore.extend([ + "test_client_async_trio.py", "test_server_trio.py", - ] + ]) diff --git a/test/test_client_async_trio.py b/test/test_client_async_trio.py index d96bc7849..4a64f5d94 100644 --- a/test/test_client_async_trio.py +++ b/test/test_client_async_trio.py @@ -1,76 +1,69 @@ from pymodbus.compat import IS_PYTHON3, PYTHON_VERSION import pytest -TRIO_AVAILABLE = IS_PYTHON3 and PYTHON_VERSION >= (3, 6) -if TRIO_AVAILABLE: - from unittest import mock - from pymodbus.client.asynchronous.trio import ( - ModbusTcpClientProtocol, TrioModbusTcpClient)#, ModbusUdpClientProtocol) - # from test.trio_test_helper import return_as_coroutine, run_coroutine - from pymodbus.factory import ClientDecoder - from pymodbus.exceptions import ConnectionException - from pymodbus.transaction import ModbusSocketFramer - from pymodbus.bit_read_message import ReadCoilsRequest, ReadCoilsResponse - from pymodbus.register_read_message import ReadHoldingRegistersResponse - # protocols = [ModbusUdpClientProtocol, ModbusClientProtocol] - protocols = [ModbusTcpClientProtocol] -else: - import mock - protocols = [None, None] +from unittest import mock +from pymodbus.client.asynchronous.trio import ( + ModbusTcpClientProtocol, TrioModbusTcpClient)#, ModbusUdpClientProtocol) +# from test.trio_test_helper import return_as_coroutine, run_coroutine +from pymodbus.factory import ClientDecoder +from pymodbus.exceptions import ConnectionException +from pymodbus.transaction import ModbusSocketFramer +from pymodbus.bit_read_message import ReadCoilsRequest, ReadCoilsResponse +from pymodbus.register_read_message import ReadHoldingRegistersResponse +# protocols = [ModbusUdpClientProtocol, ModbusClientProtocol] +protocols = [ModbusTcpClientProtocol] -@pytest.mark.skipif(not TRIO_AVAILABLE, reason="requires python3.6 or above") -class TestTrioClient(object): - def test_factory_stop(self): - mock_protocol_class = mock.MagicMock() - client = TrioModbusTcpClient(protocol_class=mock_protocol_class) +def test_factory_stop(): + mock_protocol_class = mock.MagicMock() + client = TrioModbusTcpClient(protocol_class=mock_protocol_class) - assert not client.connected - client.stop() - assert not client.connected + assert not client.connected + client.stop() + assert not client.connected - # fake connected client: - client.protocol = mock.MagicMock() - client.connected = True + # fake connected client: + client.protocol = mock.MagicMock() + client.connected = True - client.stop() - client.protocol.transport.close.assert_called_once_with() + client.stop() + client.protocol.transport.close.assert_called_once_with() - def test_factory_protocol_made_connection(self): - mock_protocol_class = mock.MagicMock() - client = TrioModbusTcpClient(protocol_class=mock_protocol_class) +def test_factory_protocol_made_connection(): + mock_protocol_class = mock.MagicMock() + client = TrioModbusTcpClient(protocol_class=mock_protocol_class) - assert not client.connected - assert client.protocol is None - client.protocol_made_connection(mock.sentinel.PROTOCOL) - assert client.connected - assert client.protocol is mock.sentinel.PROTOCOL + assert not client.connected + assert client.protocol is None + client.protocol_made_connection(mock.sentinel.PROTOCOL) + assert client.connected + assert client.protocol is mock.sentinel.PROTOCOL - client.protocol_made_connection(mock.sentinel.PROTOCOL_UNEXPECTED) - assert client.connected - assert client.protocol is mock.sentinel.PROTOCOL + client.protocol_made_connection(mock.sentinel.PROTOCOL_UNEXPECTED) + assert client.connected + assert client.protocol is mock.sentinel.PROTOCOL - # @pytest.mark.trio - # async def test_factory_start_success(self): - # mock_protocol_class = mock.MagicMock() - # client = TrioModbusTcpClient( - # protocol_class=mock_protocol_class, - # host=mock.sentinel.HOST, - # port=mock.sentinel.PORT, - # ) - # - # async with client.manage_connection(): - # # mock_loop.create_connection.assert_called_once_with(mock.ANY, mock.sentinel.HOST, mock.sentinel.PORT) - # # assert mock_async.call_count == 0 - # pass +# @pytest.mark.trio +# async def test_factory_start_success(self): +# mock_protocol_class = mock.MagicMock() +# client = TrioModbusTcpClient( +# protocol_class=mock_protocol_class, +# host=mock.sentinel.HOST, +# port=mock.sentinel.PORT, +# ) +# +# async with client.manage_connection(): +# # mock_loop.create_connection.assert_called_once_with(mock.ANY, mock.sentinel.HOST, mock.sentinel.PORT) +# # assert mock_async.call_count == 0 +# pass - @pytest.mark.parametrize("protocol", protocols) - def testClientProtocolConnectionMade(self, protocol): - """ - Test the client protocol close - :return: - """ - protocol = protocol(ModbusSocketFramer(ClientDecoder())) - transport = mock.MagicMock() - protocol.connection_made(transport) - assert protocol.transport == transport - # assert protocol.connected +@pytest.mark.parametrize("protocol", protocols) +def testClientProtocolConnectionMade(protocol): + """ + Test the client protocol close + :return: + """ + protocol = protocol(ModbusSocketFramer(ClientDecoder())) + transport = mock.MagicMock() + protocol.connection_made(transport) + assert protocol.transport == transport + # assert protocol.connected From ffd12d19e88e89b5fe02fea1fcf4a25fc4ba0ad4 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Tue, 27 Apr 2021 15:04:38 -0400 Subject: [PATCH 25/37] more tests and use mock --- pymodbus/client/asynchronous/trio/__init__.py | 7 +- test/test_client_async_trio.py | 100 ++++++++++++++++-- test/test_server_trio.py | 75 ++++++------- 3 files changed, 132 insertions(+), 50 deletions(-) diff --git a/pymodbus/client/asynchronous/trio/__init__.py b/pymodbus/client/asynchronous/trio/__init__.py index 5ed14b33d..19b0bfeb8 100644 --- a/pymodbus/client/asynchronous/trio/__init__.py +++ b/pymodbus/client/asynchronous/trio/__init__.py @@ -39,14 +39,17 @@ class BaseModbusAsyncClientProtocol(AsyncModbusClientMixin): factory = None transport = None + def _build_packet(self, request): + request.transaction_id = self.transaction.getNextTID() + return self.framer.buildPacket(request) + async def execute(self, request=None): """ Executes requests asynchronously :param request: :return: """ - request.transaction_id = self.transaction.getNextTID() - packet = self.framer.buildPacket(request) + packet = self._build_packet(request=request) _logger.debug("send: " + hexlify_packets(packet)) # TODO: should we retry on trio.BusyResourceError? await self.transport.send_all(packet) diff --git a/test/test_client_async_trio.py b/test/test_client_async_trio.py index 4a64f5d94..4d02276f8 100644 --- a/test/test_client_async_trio.py +++ b/test/test_client_async_trio.py @@ -1,15 +1,18 @@ -from pymodbus.compat import IS_PYTHON3, PYTHON_VERSION -import pytest from unittest import mock + +import pytest +import trio + from pymodbus.client.asynchronous.trio import ( - ModbusTcpClientProtocol, TrioModbusTcpClient)#, ModbusUdpClientProtocol) -# from test.trio_test_helper import return_as_coroutine, run_coroutine -from pymodbus.factory import ClientDecoder + _EventAndValue, + BaseModbusAsyncClientProtocol, + ModbusTcpClientProtocol, + TrioModbusTcpClient, +) from pymodbus.exceptions import ConnectionException +from pymodbus.factory import ClientDecoder from pymodbus.transaction import ModbusSocketFramer -from pymodbus.bit_read_message import ReadCoilsRequest, ReadCoilsResponse -from pymodbus.register_read_message import ReadHoldingRegistersResponse -# protocols = [ModbusUdpClientProtocol, ModbusClientProtocol] +from pymodbus.register_read_message import ReadHoldingRegistersRequest protocols = [ModbusTcpClientProtocol] @@ -67,3 +70,84 @@ def testClientProtocolConnectionMade(protocol): protocol.connection_made(transport) assert protocol.transport == transport # assert protocol.connected + + +@pytest.mark.trio +async def test_event_not_set(autojump_clock): + event_and_value = _EventAndValue() + with pytest.raises(trio.TooSlowError): + with trio.fail_after(1): + await event_and_value.event.wait() + + +def test_event_value_sentinel(): + event_and_value = _EventAndValue() + assert event_and_value.value is event_and_value + + +@pytest.mark.trio +async def test_event_sets(autojump_clock): + event_and_value = _EventAndValue() + event_and_value.set(None) + with trio.fail_after(1): + await event_and_value.event.wait() + + +def test_event_holds_value(): + event_and_value = _EventAndValue() + o = object() + event_and_value.set(o) + assert event_and_value.value is o + + +def test_protocol_build_packet_increments_tid(): + protocol = BaseModbusAsyncClientProtocol() + requests = [ + ReadHoldingRegistersRequest(address=1, count=1) for _ in range(2) + ] + for request in requests: + protocol._build_packet(request=request) + assert requests[0].transaction_id + 1 == requests[1].transaction_id + + +def test_protocol_build_packet_packs_id(): + protocol = BaseModbusAsyncClientProtocol() + unit_id = 0x23 + request = ReadHoldingRegistersRequest(address=1, count=1, unit=unit_id) + packet = protocol._build_packet(request=request) + assert packet[6] == unit_id + + +@pytest.mark.trio +async def test_protocol_execute_sends(): + protocol = BaseModbusAsyncClientProtocol() + transport = mock.AsyncMock() + protocol.transport = transport + unit_id = 0x23 + request = ReadHoldingRegistersRequest(address=1, count=1, unit=unit_id) + with pytest.raises(ConnectionException): + await protocol.execute(request=request) + expected_packet = b'\x00\x01\x00\x00\x00\x06\x23\x03\x00\x01\x00\x01' + + transport.send_all.assert_called_once_with(expected_packet) + + +def test_protocol_connection_made_saves_transport(): + protocol = BaseModbusAsyncClientProtocol() + transport = object() + protocol.connection_made(transport=transport) + assert protocol.transport is transport + + +def test_protocol_connection_made_sets_connected(): + protocol = BaseModbusAsyncClientProtocol() + protocol.connection_made(transport=object()) + assert protocol._connected + + +def test_protocol_connection_made_notifies_factory(): + protocol = BaseModbusAsyncClientProtocol() + factory = mock.MagicMock() + protocol.factory = factory + protocol.connection_made(transport=object()) + factory.protocol_made_connection.assert_called_once_with(protocol) diff --git a/test/test_server_trio.py b/test/test_server_trio.py index 4da51d239..376e79d56 100755 --- a/test/test_server_trio.py +++ b/test/test_server_trio.py @@ -1,3 +1,5 @@ +from unittest import mock + import contextlib import functools import logging @@ -12,7 +14,10 @@ from pymodbus.factory import ServerDecoder from pymodbus.framer.socket_framer import ModbusSocketFramer from pymodbus.pdu import ExceptionResponse, ModbusExceptions -from pymodbus.register_read_message import ReadHoldingRegistersResponse +from pymodbus.register_read_message import ( + ReadHoldingRegistersRequest, + ReadHoldingRegistersResponse, +) from pymodbus.server.trio import Executor, incoming, tcp_server from pymodbus.register_write_message import WriteMultipleRegistersResponse @@ -201,46 +206,24 @@ async def test_logs_server_response_send(trio_tcp_client, caplog): assert "send: [ReadHoldingRegistersResponse (1)]- b'0001000000050003020000'" in caplog.text -class Response: - def __init__(self): - self.transaction_id = None - self.unit_id = None - - -class Request: - def __init__(self, unit_id, transaction_id=0, fail_to_execute=False): - self.exception_codes = [] - self.executed_contexts = [] - self.fail_to_execute = fail_to_execute - self.transaction_id = transaction_id - self.unit_id = unit_id - - def execute(self, context): - if self.fail_to_execute: - raise Exception('failing to execute for testing purposes') - self.executed_contexts.append(context) - return Response() - - def doException(self, exception): - self.exception_codes.append(exception) - return Response() - - def test_execute_broadcasts(trio_server_multiunit_context): context = trio_server_multiunit_context + response_send, response_receive = trio.open_memory_channel(max_buffer_size=1) - test_request = Request(unit_id=0) + test_request = ReadHoldingRegistersRequest(address=1, count=1, unit=0) + test_request.execute = mock.Mock() + test_request.doException = mock.Mock() executor = Executor( addr=None, context=context, - response_send=None, + response_send=response_send, ignore_missing_slaves=False, broadcast_enable=True, ) executor.execute(request=test_request) - assert len(test_request.executed_contexts) == len(context.slaves()) - assert test_request.exception_codes == [] + test_request.execute.assert_has_calls([mock.call(v) for _, v in context]) + test_request.doException.assert_not_called() @pytest.mark.parametrize( @@ -255,7 +238,9 @@ def test_execute_does_not_broadcast( context = trio_server_multiunit_context response_send, response_receive = trio.open_memory_channel(max_buffer_size=1) - test_request = Request(unit_id=unit_id) + test_request = ReadHoldingRegistersRequest(address=1, count=1, unit=unit_id) + test_request.execute = mock.Mock() + test_request.doException = mock.Mock() executor = Executor( addr=None, context=context, @@ -265,8 +250,10 @@ def test_execute_does_not_broadcast( ) executor.execute(request=test_request) - assert len(test_request.executed_contexts) == 1 - assert test_request.exception_codes == [] + slave_context = dict(context)[unit_id] + + test_request.execute.assert_called_with(slave_context) + test_request.doException.assert_not_called() def test_execute_does_ignore_missing_slaves(trio_server_multiunit_context): @@ -274,7 +261,9 @@ def test_execute_does_ignore_missing_slaves(trio_server_multiunit_context): missing_unit_id = max(context.slaves()) + 1 - test_request = Request(unit_id=missing_unit_id) + test_request = ReadHoldingRegistersRequest(address=1, count=1, unit=missing_unit_id) + test_request.execute = mock.Mock() + test_request.doException = mock.Mock() executor = Executor( addr=None, context=context, @@ -284,8 +273,8 @@ def test_execute_does_ignore_missing_slaves(trio_server_multiunit_context): ) executor.execute(request=test_request) - assert len(test_request.executed_contexts) == 0 - assert test_request.exception_codes == [] + test_request.execute.assert_not_called() + test_request.doException.assert_not_called() def test_execute_does_not_ignore_missing_slaves(trio_server_multiunit_context): @@ -294,7 +283,9 @@ def test_execute_does_not_ignore_missing_slaves(trio_server_multiunit_context): missing_unit_id = max(context.slaves()) + 1 - test_request = Request(unit_id=missing_unit_id) + test_request = ReadHoldingRegistersRequest(address=1, count=1, unit=missing_unit_id) + test_request.execute = mock.Mock() + test_request.doException = mock.Mock() executor = Executor( addr=None, context=context, @@ -304,14 +295,17 @@ def test_execute_does_not_ignore_missing_slaves(trio_server_multiunit_context): ) executor.execute(request=test_request) - assert test_request.exception_codes == [ModbusExceptions.GatewayNoResponse] + test_request.execute.assert_not_called() + test_request.doException.assert_called_once_with(ModbusExceptions.GatewayNoResponse) def test_execute_handles_slave_failure(trio_server_multiunit_context): context = trio_server_multiunit_context response_send, response_receive = trio.open_memory_channel(max_buffer_size=1) - test_request = Request(unit_id=0, fail_to_execute=True) + test_request = ReadHoldingRegistersRequest(address=1, count=1, unit=0) + test_request.execute = mock.Mock(side_effect=Exception) + test_request.doException = mock.Mock() executor = Executor( addr=None, context=context, @@ -321,7 +315,8 @@ def test_execute_handles_slave_failure(trio_server_multiunit_context): ) executor.execute(request=test_request) - assert test_request.exception_codes == [ModbusExceptions.SlaveFailure] + test_request.execute.assert_called_once() + test_request.doException.assert_called_once_with(ModbusExceptions.SlaveFailure) @pytest.mark.trio From ef8ffbfbe3dbb2586f9ec785bcfb68c06e0aa658 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Tue, 27 Apr 2021 15:36:08 -0400 Subject: [PATCH 26/37] stop using MagicMock and AsyncMock --- test/test_client_async_trio.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/test/test_client_async_trio.py b/test/test_client_async_trio.py index 4d02276f8..f75b5b2b4 100644 --- a/test/test_client_async_trio.py +++ b/test/test_client_async_trio.py @@ -17,7 +17,7 @@ def test_factory_stop(): - mock_protocol_class = mock.MagicMock() + mock_protocol_class = mock.Mock() client = TrioModbusTcpClient(protocol_class=mock_protocol_class) assert not client.connected @@ -25,14 +25,14 @@ def test_factory_stop(): assert not client.connected # fake connected client: - client.protocol = mock.MagicMock() + client.protocol = mock.Mock() client.connected = True client.stop() client.protocol.transport.close.assert_called_once_with() def test_factory_protocol_made_connection(): - mock_protocol_class = mock.MagicMock() + mock_protocol_class = mock.Mock() client = TrioModbusTcpClient(protocol_class=mock_protocol_class) assert not client.connected @@ -47,7 +47,7 @@ def test_factory_protocol_made_connection(): # @pytest.mark.trio # async def test_factory_start_success(self): -# mock_protocol_class = mock.MagicMock() +# mock_protocol_class = mock.Mock() # client = TrioModbusTcpClient( # protocol_class=mock_protocol_class, # host=mock.sentinel.HOST, @@ -66,7 +66,7 @@ def testClientProtocolConnectionMade(protocol): :return: """ protocol = protocol(ModbusSocketFramer(ClientDecoder())) - transport = mock.MagicMock() + transport = mock.Mock() protocol.connection_made(transport) assert protocol.transport == transport # assert protocol.connected @@ -118,10 +118,15 @@ def test_protocol_build_packet_packs_id(): assert packet[6] == unit_id +async def anoop(): + pass + + @pytest.mark.trio async def test_protocol_execute_sends(): protocol = BaseModbusAsyncClientProtocol() - transport = mock.AsyncMock() + transport = mock.Mock() + transport.send_all = mock.Mock(return_value=anoop()) protocol.transport = transport unit_id = 0x23 request = ReadHoldingRegistersRequest(address=1, count=1, unit=unit_id) @@ -147,7 +152,7 @@ def test_protocol_connection_made_sets_connected(): def test_protocol_connection_made_notifies_factory(): protocol = BaseModbusAsyncClientProtocol() - factory = mock.MagicMock() + factory = mock.Mock() protocol.factory = factory protocol.connection_made(transport=object()) factory.protocol_made_connection.assert_called_once_with(protocol) From 49f7ca95c244dd11b35a4e7104219344889773a6 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Tue, 27 Apr 2021 22:07:08 -0400 Subject: [PATCH 27/37] trio.BaseModbusAsyncClientProtocol tests --- pymodbus/client/asynchronous/trio/__init__.py | 1 - test/test_client_async_trio.py | 85 ++++++++++++++++++- test/test_server_trio.py | 1 - 3 files changed, 84 insertions(+), 3 deletions(-) diff --git a/pymodbus/client/asynchronous/trio/__init__.py b/pymodbus/client/asynchronous/trio/__init__.py index 19b0bfeb8..2f4040fc9 100644 --- a/pymodbus/client/asynchronous/trio/__init__.py +++ b/pymodbus/client/asynchronous/trio/__init__.py @@ -86,7 +86,6 @@ def _data_received(self, data): unit = decoded.get("unit", 0) self.framer.processIncomingPacket(data, self._handle_response, unit=unit) - self.data = b'' def _handle_response(self, reply, **kwargs): """ diff --git a/test/test_client_async_trio.py b/test/test_client_async_trio.py index f75b5b2b4..09599cc41 100644 --- a/test/test_client_async_trio.py +++ b/test/test_client_async_trio.py @@ -12,7 +12,10 @@ from pymodbus.exceptions import ConnectionException from pymodbus.factory import ClientDecoder from pymodbus.transaction import ModbusSocketFramer -from pymodbus.register_read_message import ReadHoldingRegistersRequest +from pymodbus.register_read_message import ( + ReadHoldingRegistersRequest, + ReadHoldingRegistersResponse, +) protocols = [ModbusTcpClientProtocol] @@ -156,3 +159,83 @@ def test_protocol_connection_made_notifies_factory(): protocol.factory = factory protocol.connection_made(transport=object()) factory.protocol_made_connection.assert_called_once_with(protocol) + + +def test_protocol_data_received_processes(): + protocol = BaseModbusAsyncClientProtocol() + protocol.framer.processIncomingPacket = mock.Mock() + data = b'\x00\x01\x00\x00\x00\x05\x07\x03\x02\x9d\xdd' + protocol._data_received(data) + + protocol.framer.processIncomingPacket.assert_called_once_with( + data, + protocol._handle_response, + unit=7, + ) + + +def test_protocol_handle_response_skips_none_handler(): + protocol = BaseModbusAsyncClientProtocol() + transaction_id = 13 + response = ReadHoldingRegistersResponse( + values=[40412], + transaction=transaction_id, + ) + protocol.transaction.getTransaction = mock.Mock(return_value=None) + + protocol._handle_response(reply=response) + + protocol.transaction.getTransaction.assert_called_once_with(response.transaction_id) + + +def test_protocol_handle_response_calls_handler(): + protocol = BaseModbusAsyncClientProtocol() + transaction_id = 13 + response = ReadHoldingRegistersResponse( + values=[40412], + transaction=transaction_id, + ) + handler = mock.Mock() + protocol.transaction.getTransaction = mock.Mock(return_value=handler) + + protocol._handle_response(reply=response) + + protocol.transaction.getTransaction.assert_called_once_with(response.transaction_id) + handler.assert_called_once_with(response) + + +@pytest.mark.trio +async def test_protocol_build_response_raises_if_not_connected(): + protocol = BaseModbusAsyncClientProtocol() + protocol._connected = False + with pytest.raises(ConnectionException): + await protocol._build_response(tid=None) + + +@pytest.mark.trio +async def test_protocol_build_response_adds_transaction(autojump_clock): + protocol = BaseModbusAsyncClientProtocol() + protocol._connected = True + protocol.transaction.addTransaction = mock.Mock() + transaction_id = 37 + with trio.move_on_after(1): + await protocol._build_response(tid=transaction_id) + + protocol.transaction.addTransaction.assert_called_once() + assert protocol.transaction.addTransaction.call_args.args[1] == transaction_id + + +@pytest.mark.trio +async def test_protocol_build_response_adds_transaction(autojump_clock): + protocol = BaseModbusAsyncClientProtocol() + protocol._connected = True + transaction_id = 37 + value = 13 + event_and_value = _EventAndValue() + event_and_value.value = value + event_and_value.event.set() + with trio.move_on_after(1): + with mock.patch('pymodbus.client.asynchronous.trio._EventAndValue', return_value=event_and_value): + result = await protocol._build_response(tid=transaction_id) + + assert result == value diff --git a/test/test_server_trio.py b/test/test_server_trio.py index 376e79d56..ec5500351 100755 --- a/test/test_server_trio.py +++ b/test/test_server_trio.py @@ -1,6 +1,5 @@ from unittest import mock -import contextlib import functools import logging From 546458004f5b19d11bb5a07a77d511613f104c21 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Tue, 27 Apr 2021 23:05:13 -0400 Subject: [PATCH 28/37] more tests... --- pymodbus/client/asynchronous/trio/__init__.py | 4 +- test/test_client_async_trio.py | 64 +++++++++++++++++++ 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/pymodbus/client/asynchronous/trio/__init__.py b/pymodbus/client/asynchronous/trio/__init__.py index 2f4040fc9..eb4662a24 100644 --- a/pymodbus/client/asynchronous/trio/__init__.py +++ b/pymodbus/client/asynchronous/trio/__init__.py @@ -221,12 +221,12 @@ def protocol_made_connection(self, protocol): 'callback called while connected.') -def init_tcp_client(proto_cls, host, port, **kwargs): +def init_tcp_client(host, port=502, proto_cls=None, **kwargs): """ Helper function to initialize tcp client - :param proto_cls: :param host: :param port: + :param proto_cls: :param kwargs: :return: """ diff --git a/test/test_client_async_trio.py b/test/test_client_async_trio.py index 09599cc41..79ea87ca5 100644 --- a/test/test_client_async_trio.py +++ b/test/test_client_async_trio.py @@ -8,6 +8,7 @@ BaseModbusAsyncClientProtocol, ModbusTcpClientProtocol, TrioModbusTcpClient, + init_tcp_client, ) from pymodbus.exceptions import ConnectionException from pymodbus.factory import ClientDecoder @@ -16,6 +17,8 @@ ReadHoldingRegistersRequest, ReadHoldingRegistersResponse, ) + + protocols = [ModbusTcpClientProtocol] @@ -239,3 +242,64 @@ async def test_protocol_build_response_adds_transaction(autojump_clock): result = await protocol._build_response(tid=transaction_id) assert result == value + + +def test_tcp_protocol_data_received(): + protocol = ModbusTcpClientProtocol() + protocol._data_received = mock.Mock() + data = object() + protocol.data_received(data=data) + protocol._data_received.assert_called_once_with(data) + + +# @pytest.mark.trio +# async def test_tcp_client_manage_connection_is_connected(): +# client = TrioModbusTcpClient(host='127.0.0.1') +# async with client.manage_connection(): +# assert client.connected + + +async def ag(iterable): + for element in iterable: + yield element + + +@pytest.mark.trio +async def test_client_receiver_passes_on_data(): + client = TrioModbusTcpClient() + client.protocol = mock.Mock() + client.protocol.data_received = mock.Mock() + data = [1, 1, 2, 3, 5] + await client._receiver(stream=ag(data)) + client.protocol.data_received.assert_has_calls( + [mock.call(element) for element in data], + ) + + +def test_client_create_protocol(): + client = TrioModbusTcpClient(protocol_class=BaseModbusAsyncClientProtocol) + protocol = client._create_protocol() + assert isinstance(protocol, BaseModbusAsyncClientProtocol) + assert protocol.factory is client + + +def test_client_protocol_made_connection(): + client = TrioModbusTcpClient() + protocol = BaseModbusAsyncClientProtocol() + client.protocol_made_connection(protocol=protocol) + assert client.connected + assert client.protocol is protocol + + +def test_client_protocol_remade_connection_ignore(): + client = TrioModbusTcpClient() + protocol = BaseModbusAsyncClientProtocol() + client.protocol_made_connection(protocol=protocol) + client.protocol_made_connection(protocol=None) + assert client.connected + assert client.protocol is protocol + + +def test_init_tcp_client(): + client = init_tcp_client(host='127.0.0.1') + assert isinstance(client, TrioModbusTcpClient) From 343346c79b75c682acd3c577cf22b3132e0fa08e Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Tue, 27 Apr 2021 23:16:14 -0400 Subject: [PATCH 29/37] fix the tests --- pymodbus/client/asynchronous/factory/tcp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymodbus/client/asynchronous/factory/tcp.py b/pymodbus/client/asynchronous/factory/tcp.py index 2bb19b8a5..12668e2d9 100644 --- a/pymodbus/client/asynchronous/factory/tcp.py +++ b/pymodbus/client/asynchronous/factory/tcp.py @@ -119,7 +119,7 @@ def trio_factory(host="127.0.0.1", port=Defaults.Port, framer=None, """ from pymodbus.client.asynchronous.trio import init_tcp_client proto_cls = kwargs.get("proto_cls", None) - client = init_tcp_client(proto_cls, host, port) + client = init_tcp_client(proto_cls=proto_cls, host=host, port=port) return client From 12075892c810dbefd345cba68496040651c8f6bb Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Tue, 27 Apr 2021 23:45:33 -0400 Subject: [PATCH 30/37] even more tests --- test/test_client_async_trio.py | 38 ++++++++++++++++++---------------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/test/test_client_async_trio.py b/test/test_client_async_trio.py index 79ea87ca5..fcf353584 100644 --- a/test/test_client_async_trio.py +++ b/test/test_client_async_trio.py @@ -51,19 +51,20 @@ def test_factory_protocol_made_connection(): assert client.connected assert client.protocol is mock.sentinel.PROTOCOL -# @pytest.mark.trio -# async def test_factory_start_success(self): -# mock_protocol_class = mock.Mock() -# client = TrioModbusTcpClient( -# protocol_class=mock_protocol_class, -# host=mock.sentinel.HOST, -# port=mock.sentinel.PORT, -# ) -# -# async with client.manage_connection(): -# # mock_loop.create_connection.assert_called_once_with(mock.ANY, mock.sentinel.HOST, mock.sentinel.PORT) -# # assert mock_async.call_count == 0 -# pass + +@pytest.mark.trio +async def test_factory_start_success(): + mock_protocol_class = mock.Mock() + client = TrioModbusTcpClient( + protocol_class=mock_protocol_class, + host=mock.sentinel.HOST, + port=mock.sentinel.PORT, + ) + + with mock.patch('trio.open_tcp_stream') as patch: + async with client.manage_connection(): + patch.assert_called_once_with(mock.sentinel.HOST, mock.sentinel.PORT) + @pytest.mark.parametrize("protocol", protocols) def testClientProtocolConnectionMade(protocol): @@ -252,11 +253,12 @@ def test_tcp_protocol_data_received(): protocol._data_received.assert_called_once_with(data) -# @pytest.mark.trio -# async def test_tcp_client_manage_connection_is_connected(): -# client = TrioModbusTcpClient(host='127.0.0.1') -# async with client.manage_connection(): -# assert client.connected +@pytest.mark.trio +async def test_tcp_client_manage_connection_is_connected(): + client = TrioModbusTcpClient(host='127.0.0.1') + with mock.patch('trio.open_tcp_stream'): + async with client.manage_connection(): + assert client.connected async def ag(iterable): From 8774d1b7b2ff70fee6807a16b6f7fe4c5d464830 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Tue, 27 Apr 2021 23:47:18 -0400 Subject: [PATCH 31/37] one more assert --- test/test_client_async_trio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_client_async_trio.py b/test/test_client_async_trio.py index fcf353584..0269b7c61 100644 --- a/test/test_client_async_trio.py +++ b/test/test_client_async_trio.py @@ -76,7 +76,7 @@ def testClientProtocolConnectionMade(protocol): transport = mock.Mock() protocol.connection_made(transport) assert protocol.transport == transport - # assert protocol.connected + assert protocol._connected @pytest.mark.trio From c750233a2a26e914431b5a239f2eddf3030784a0 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Wed, 28 Apr 2021 00:25:30 -0400 Subject: [PATCH 32/37] fixup py36-37 --- pymodbus/client/asynchronous/trio/__init__.py | 7 +++++-- test/test_client_async_trio.py | 8 ++++++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/pymodbus/client/asynchronous/trio/__init__.py b/pymodbus/client/asynchronous/trio/__init__.py index eb4662a24..9b33b8354 100644 --- a/pymodbus/client/asynchronous/trio/__init__.py +++ b/pymodbus/client/asynchronous/trio/__init__.py @@ -185,8 +185,11 @@ async def _receiver(self, stream): Process incoming raw stream data. :return: """ - async for data in stream: - self.protocol.data_received(data) + try: + async for data in stream: + self.protocol.data_received(data) + except Exception as e: + print() def stop(self): """ diff --git a/test/test_client_async_trio.py b/test/test_client_async_trio.py index 0269b7c61..9c8fe3e39 100644 --- a/test/test_client_async_trio.py +++ b/test/test_client_async_trio.py @@ -52,6 +52,10 @@ def test_factory_protocol_made_connection(): assert client.protocol is mock.sentinel.PROTOCOL +async def aag(): + return trio.testing.MemoryReceiveStream() + + @pytest.mark.trio async def test_factory_start_success(): mock_protocol_class = mock.Mock() @@ -61,7 +65,7 @@ async def test_factory_start_success(): port=mock.sentinel.PORT, ) - with mock.patch('trio.open_tcp_stream') as patch: + with mock.patch('trio.open_tcp_stream', instance=True, return_value=aag()) as patch: async with client.manage_connection(): patch.assert_called_once_with(mock.sentinel.HOST, mock.sentinel.PORT) @@ -256,7 +260,7 @@ def test_tcp_protocol_data_received(): @pytest.mark.trio async def test_tcp_client_manage_connection_is_connected(): client = TrioModbusTcpClient(host='127.0.0.1') - with mock.patch('trio.open_tcp_stream'): + with mock.patch('trio.open_tcp_stream', instance=True, return_value=aag()): async with client.manage_connection(): assert client.connected From c47a533f543a06d3073af079f3b54b67d6fa4010 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Wed, 28 Apr 2021 11:07:04 -0400 Subject: [PATCH 33/37] fixup patching and mocking of trio.open_tcp_stream() --- pymodbus/client/asynchronous/trio/__init__.py | 7 ++----- test/test_client_async_trio.py | 6 +++--- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/pymodbus/client/asynchronous/trio/__init__.py b/pymodbus/client/asynchronous/trio/__init__.py index 9b33b8354..eb4662a24 100644 --- a/pymodbus/client/asynchronous/trio/__init__.py +++ b/pymodbus/client/asynchronous/trio/__init__.py @@ -185,11 +185,8 @@ async def _receiver(self, stream): Process incoming raw stream data. :return: """ - try: - async for data in stream: - self.protocol.data_received(data) - except Exception as e: - print() + async for data in stream: + self.protocol.data_received(data) def stop(self): """ diff --git a/test/test_client_async_trio.py b/test/test_client_async_trio.py index 9c8fe3e39..9b10b29ea 100644 --- a/test/test_client_async_trio.py +++ b/test/test_client_async_trio.py @@ -52,7 +52,7 @@ def test_factory_protocol_made_connection(): assert client.protocol is mock.sentinel.PROTOCOL -async def aag(): +async def aag(*args, **kwargs): return trio.testing.MemoryReceiveStream() @@ -65,7 +65,7 @@ async def test_factory_start_success(): port=mock.sentinel.PORT, ) - with mock.patch('trio.open_tcp_stream', instance=True, return_value=aag()) as patch: + with mock.patch('trio.open_tcp_stream', new=mock.Mock(return_value=aag())) as patch: async with client.manage_connection(): patch.assert_called_once_with(mock.sentinel.HOST, mock.sentinel.PORT) @@ -260,7 +260,7 @@ def test_tcp_protocol_data_received(): @pytest.mark.trio async def test_tcp_client_manage_connection_is_connected(): client = TrioModbusTcpClient(host='127.0.0.1') - with mock.patch('trio.open_tcp_stream', instance=True, return_value=aag()): + with mock.patch('trio.open_tcp_stream', new=mock.Mock(return_value=aag())): async with client.manage_connection(): assert client.connected From 363903e5c8f14bbf99f793d787cb2f0b7dfef006 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Wed, 28 Apr 2021 11:28:42 -0400 Subject: [PATCH 34/37] use outcome so we can handle errors as well --- pymodbus/client/asynchronous/trio/__init__.py | 11 ++++++----- requirements.txt | 1 + setup.py | 3 ++- test/test_client_async_trio.py | 11 ++++++----- test/test_server_trio.py | 12 +++++------- 5 files changed, 20 insertions(+), 18 deletions(-) diff --git a/pymodbus/client/asynchronous/trio/__init__.py b/pymodbus/client/asynchronous/trio/__init__.py index eb4662a24..6389e6c67 100644 --- a/pymodbus/client/asynchronous/trio/__init__.py +++ b/pymodbus/client/asynchronous/trio/__init__.py @@ -6,6 +6,7 @@ import logging import async_generator +import outcome import trio from pymodbus.client.asynchronous.mixins import AsyncModbusClientMixin @@ -95,9 +96,9 @@ def _handle_response(self, reply, **kwargs): """ if reply is not None: tid = reply.transaction_id - handler = self.transaction.getTransaction(tid) - if handler: - handler(reply) + event_and_value = self.transaction.getTransaction(tid) + if event_and_value is not None: + event_and_value.set(outcome.Value(reply)) else: _logger.debug("Unrequested message: " + str(reply)) @@ -113,9 +114,9 @@ async def _build_response(self, tid): raise ConnectionException('Client is not connected') event_and_value = _EventAndValue() - self.transaction.addTransaction(event_and_value.set, tid) + self.transaction.addTransaction(event_and_value, tid) await event_and_value.event.wait() - return event_and_value.value + return event_and_value.value.unwrap() class ModbusTcpClientProtocol(BaseModbusAsyncClientProtocol): diff --git a/requirements.txt b/requirements.txt index 10770b990..2faeaa7ae 100644 --- a/requirements.txt +++ b/requirements.txt @@ -33,6 +33,7 @@ six>=1.15.0 # if you want to use the Trio asynchronous version, uncomment these # ------------------------------------------------------------------- # async_generator ~= 1.10 +# outcome ~= 1.1 # ------------------------------------------------------------------- # if you want to build the documentation, uncomment these diff --git a/setup.py b/setup.py index 8575b2b5b..723f455d2 100644 --- a/setup.py +++ b/setup.py @@ -109,8 +109,9 @@ ], 'trio': [ - 'trio ~= 0.17.0', 'async_generator ~= 1.10', + 'outcome ~= 1.1', + 'trio ~= 0.17.0', ], 'repl:python_version <= "2.7"': [ 'click>=7.0', diff --git a/test/test_client_async_trio.py b/test/test_client_async_trio.py index 9b10b29ea..783c8e7b6 100644 --- a/test/test_client_async_trio.py +++ b/test/test_client_async_trio.py @@ -2,6 +2,7 @@ import pytest import trio +import outcome from pymodbus.client.asynchronous.trio import ( _EventAndValue, @@ -203,13 +204,14 @@ def test_protocol_handle_response_calls_handler(): values=[40412], transaction=transaction_id, ) - handler = mock.Mock() - protocol.transaction.getTransaction = mock.Mock(return_value=handler) + event_and_value = _EventAndValue() + event_and_value.set = mock.Mock() + protocol.transaction.getTransaction = mock.Mock(return_value=event_and_value) protocol._handle_response(reply=response) protocol.transaction.getTransaction.assert_called_once_with(response.transaction_id) - handler.assert_called_once_with(response) + event_and_value.set.assert_called_once_with(outcome.Value(response)) @pytest.mark.trio @@ -240,8 +242,7 @@ async def test_protocol_build_response_adds_transaction(autojump_clock): transaction_id = 37 value = 13 event_and_value = _EventAndValue() - event_and_value.value = value - event_and_value.event.set() + event_and_value.set(outcome.Value(value)) with trio.move_on_after(1): with mock.patch('pymodbus.client.asynchronous.trio._EventAndValue', return_value=event_and_value): result = await protocol._build_response(tid=transaction_id) diff --git a/test/test_server_trio.py b/test/test_server_trio.py index ec5500351..5694240ce 100755 --- a/test/test_server_trio.py +++ b/test/test_server_trio.py @@ -103,10 +103,9 @@ async def trio_tcp_client_fixture(trio_tcp_server): @pytest.mark.trio -async def test_read_holding_registers(trio_tcp_client, trio_tcp_server): +async def test_read_holding_registers(trio_tcp_client, trio_tcp_server, autojump_clock): address = 12 value = 40413 - # TODO: learn what fx is about... trio_tcp_server.context[0].setValues( fx=3, address=address, @@ -122,7 +121,7 @@ async def test_read_holding_registers(trio_tcp_client, trio_tcp_server): @pytest.mark.trio -async def test_write_holding_registers(trio_tcp_client, trio_tcp_server): +async def test_write_holding_registers(trio_tcp_client, trio_tcp_server, autojump_clock): address = 12 value = 40413 @@ -133,7 +132,6 @@ async def test_write_holding_registers(trio_tcp_client, trio_tcp_server): ) assert isinstance(response, WriteMultipleRegistersResponse) - # TODO: learn what fx is about... server_values = trio_tcp_server.context[0].getValues( fx=3, address=address, @@ -143,7 +141,7 @@ async def test_write_holding_registers(trio_tcp_client, trio_tcp_server): @pytest.mark.trio -async def test_large_count_excepts(trio_tcp_client): +async def test_large_count_excepts(trio_tcp_client, autojump_clock): response = await trio_tcp_client.read_holding_registers( address=0, count=300, @@ -153,7 +151,7 @@ async def test_large_count_excepts(trio_tcp_client): @pytest.mark.trio -async def test_invalid_client_excepts_gateway_no_response(trio_tcp_client): +async def test_invalid_client_excepts_gateway_no_response(trio_tcp_client, autojump_clock): response = await trio_tcp_client.read_holding_registers( address=0, count=1, @@ -198,7 +196,7 @@ async def test_times_out_when_broadcast_enabled_and_no_contexts(trio_tcp_client) @pytest.mark.trio -async def test_logs_server_response_send(trio_tcp_client, caplog): +async def test_logs_server_response_send(trio_tcp_client, caplog, autojump_clock): with caplog.at_level(logging.DEBUG): await trio_tcp_client.read_holding_registers(address=0, count=1) From f08b86c01254642284947684cb6523a90ef99baf Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Wed, 28 Apr 2021 11:42:34 -0400 Subject: [PATCH 35/37] handle protocol connection lost --- pymodbus/client/asynchronous/trio/__init__.py | 17 ++++++++++++++++- test/test_client_async_trio.py | 11 +++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/pymodbus/client/asynchronous/trio/__init__.py b/pymodbus/client/asynchronous/trio/__init__.py index 6389e6c67..e5ec64648 100644 --- a/pymodbus/client/asynchronous/trio/__init__.py +++ b/pymodbus/client/asynchronous/trio/__init__.py @@ -73,7 +73,22 @@ def connection_made(self, transport): if self.factory: self.factory.protocol_made_connection(self) - # TODO: _connectionLost looks like functionality to have somewhere + def _connectionLost(self, reason): + """ + Called upon a client disconnect + + :param reason: The reason for the disconnect + """ + _logger.debug( + "Client disconnected from modbus server: %s" % reason) + self._connected = False + for tid in list(self.transaction): + event_and_value = self.transaction.getTransaction(tid) + if event_and_value.event.is_set(): + continue + event_and_value.set(outcome.Error(ConnectionException( + 'Connection lost during request', + ))) def _data_received(self, data): """ diff --git a/test/test_client_async_trio.py b/test/test_client_async_trio.py index 783c8e7b6..3a85bd482 100644 --- a/test/test_client_async_trio.py +++ b/test/test_client_async_trio.py @@ -170,6 +170,17 @@ def test_protocol_connection_made_notifies_factory(): factory.protocol_made_connection.assert_called_once_with(protocol) +def test_protocol_connection_lost(): + protocol = BaseModbusAsyncClientProtocol() + tid = 3 + event_and_value = _EventAndValue() + protocol.transaction.addTransaction(request=event_and_value, tid=tid) + protocol._connectionLost('') + assert event_and_value.event.is_set() + with pytest.raises(ConnectionException): + event_and_value.value.unwrap() + + def test_protocol_data_received_processes(): protocol = BaseModbusAsyncClientProtocol() protocol.framer.processIncomingPacket = mock.Mock() From c6db32657dcbac6e66c2edbba31c4642e679afb9 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Wed, 28 Apr 2021 13:15:27 -0400 Subject: [PATCH 36/37] explain the +1s --- test/test_server_trio.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/test_server_trio.py b/test/test_server_trio.py index 5694240ce..5e4c5bac9 100755 --- a/test/test_server_trio.py +++ b/test/test_server_trio.py @@ -111,8 +111,8 @@ async def test_read_holding_registers(trio_tcp_client, trio_tcp_server, autojump address=address, values=[value - 5, value, value + 5], ) - # TODO: is the +1 good? seems related to ModbusSlaveContext.zero_mode probably response = await trio_tcp_client.read_holding_registers( + # plus one for since the address is the beginning of the three values address=address + 1, count=1, ) @@ -125,8 +125,8 @@ async def test_write_holding_registers(trio_tcp_client, trio_tcp_server, autojum address = 12 value = 40413 - # TODO: is the +1 good? seems related to ModbusSlaveContext.zero_mode probably response = await trio_tcp_client.write_registers( + # plus one for since the address is the beginning of the three values address=address + 1, values=[value], ) @@ -344,7 +344,7 @@ async def test_incoming_closes_response_send_channel(trio_server_context): argnames=['data_blocks'], argvalues=[ [[sample_read_data]], - # TODO: can the framer actually handle this? + # TODO: enable when the framer can handle fragmentation # [[sample_read_data[:5], sample_read_data[5:]]], # [[bytes([byte]) for byte in sample_read_data]], ], From bcef8b76a50ac6f4fb589eda84c379af440e675d Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Wed, 28 Apr 2021 13:30:46 -0400 Subject: [PATCH 37/37] use a lock for trio client sending --- pymodbus/client/asynchronous/trio/__init__.py | 7 +++++-- test/test_client_async_trio.py | 1 + 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pymodbus/client/asynchronous/trio/__init__.py b/pymodbus/client/asynchronous/trio/__init__.py index e5ec64648..3ad2bb96f 100644 --- a/pymodbus/client/asynchronous/trio/__init__.py +++ b/pymodbus/client/asynchronous/trio/__init__.py @@ -39,6 +39,7 @@ class BaseModbusAsyncClientProtocol(AsyncModbusClientMixin): #: Factory that created this instance. factory = None transport = None + send_lock = None def _build_packet(self, request): request.transaction_id = self.transaction.getNextTID() @@ -52,8 +53,8 @@ async def execute(self, request=None): """ packet = self._build_packet(request=request) _logger.debug("send: " + hexlify_packets(packet)) - # TODO: should we retry on trio.BusyResourceError? - await self.transport.send_all(packet) + async with self.send_lock: + await self.transport.send_all(packet) with trio.fail_after(seconds=1): response = await self._build_response(request.transaction_id) return response @@ -142,6 +143,7 @@ class ModbusTcpClientProtocol(BaseModbusAsyncClientProtocol): #: Factory that created this instance. factory = None transport = None + send_lock = None def data_received(self, data): """ @@ -168,6 +170,7 @@ def __init__(self, host=None, port=502, protocol_class=None): """ #: Protocol used to talk to modbus device. self.protocol_class = protocol_class or ModbusTcpClientProtocol + self.protocol_class.send_lock = trio.Lock() #: Current protocol instance. self.protocol = None diff --git a/test/test_client_async_trio.py b/test/test_client_async_trio.py index 3a85bd482..e438bf62c 100644 --- a/test/test_client_async_trio.py +++ b/test/test_client_async_trio.py @@ -137,6 +137,7 @@ async def anoop(): @pytest.mark.trio async def test_protocol_execute_sends(): protocol = BaseModbusAsyncClientProtocol() + protocol.send_lock = trio.Lock() transport = mock.Mock() transport.send_all = mock.Mock(return_value=anoop()) protocol.transport = transport