From 51a421ba7ff8411855efe241fd418f3703dcf9a2 Mon Sep 17 00:00:00 2001 From: jan iversen Date: Tue, 26 Nov 2024 09:41:14 +0100 Subject: [PATCH] Final. --- pymodbus/server/async_io.py | 27 +++++++++------------------ pymodbus/transaction/transaction.py | 12 +++++++++++- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/pymodbus/server/async_io.py b/pymodbus/server/async_io.py index ee8903989..40d837613 100644 --- a/pymodbus/server/async_io.py +++ b/pymodbus/server/async_io.py @@ -9,7 +9,7 @@ from pymodbus.datastore import ModbusServerContext from pymodbus.device import ModbusControlBlock, ModbusDeviceIdentification -from pymodbus.exceptions import ModbusException, NoSuchSlaveException +from pymodbus.exceptions import NoSuchSlaveException from pymodbus.framer import FRAMER_NAME_TO_CLASS, FramerType from pymodbus.logging import Log from pymodbus.pdu import DecodePDU @@ -43,7 +43,6 @@ def __init__(self, owner): self.server = owner self.framer = self.server.framer(self.server.decoder) self.running = False - self.receive_queue: asyncio.Queue = asyncio.Queue() self.handler_task = None # coroutine to be run on asyncio loop self.databuffer = b'' self.loop = asyncio.get_running_loop() @@ -125,7 +124,14 @@ async def handle(self) -> None: """ while self.running: try: - pdu, *addr = await self.receive_queue.get() + pdu, *addr, exc = await self.server_execute() + if exc: + pdu = ExceptionResponse( + 40, + exception_code=ExceptionResponse.ILLEGAL_FUNCTION + ) + self.server_send(pdu, 0) + continue await self.server_async_execute(pdu, *addr) except asyncio.CancelledError: # catch and ignore cancellation errors @@ -190,21 +196,6 @@ def server_send(self, message, addr, **kwargs): pdu = self.framer.buildFrame(message) self.send(pdu, addr=addr) - def callback_data(self, data: bytes, addr: tuple | None = ()) -> int: - """Handle received data.""" - try: - used_len, pdu = self.framer.processIncomingFrame(data) - except ModbusException: - pdu = ExceptionResponse( - 40, - exception_code=ExceptionResponse.ILLEGAL_FUNCTION - ) - self.server_send(pdu, 0) - return len(self.databuffer) - if pdu: - self.receive_queue.put_nowait((pdu, addr)) - return used_len - class ModbusBaseServer(ModbusProtocol): """Common functionality for all server classes.""" diff --git a/pymodbus/transaction/transaction.py b/pymodbus/transaction/transaction.py index 51a395e49..5533ab835 100644 --- a/pymodbus/transaction/transaction.py +++ b/pymodbus/transaction/transaction.py @@ -149,6 +149,16 @@ async def execute(self, no_response_expected: bool, request: ModbusPDU) -> Modbu self.response_future = asyncio.Future() return None + async def server_execute(self) -> tuple[ModbusPDU, int, Exception]: + """Wait for request. + + Used in server, with an instance for each connection, therefore + there are NO concurrency. + """ + pdu, addr, exc = await asyncio.wait_for(self.response_future, None) + self.response_future = asyncio.Future() + return pdu, addr, exc + def callback_new_connection(self): """Call when listener receive new connection request.""" @@ -168,7 +178,6 @@ def callback_disconnected(self, exc: Exception | None) -> None: def callback_data(self, data: bytes, addr: tuple | None = None) -> int: """Handle received data.""" - _ = (addr) if self.trace_recv_packet: data = self.trace_recv_packet(data) # pylint: disable=not-callable try: @@ -176,6 +185,7 @@ def callback_data(self, data: bytes, addr: tuple | None = None) -> int: except ModbusIOException as exc: if self.is_server: self.response_future.set_result((None, addr, exc)) + return len(data) raise exc if pdu: if self.trace_recv_pdu: