diff --git a/pymodbus/framer/ascii_framer.py b/pymodbus/framer/ascii_framer.py index ec6a6e0576..17b6302cf0 100644 --- a/pymodbus/framer/ascii_framer.py +++ b/pymodbus/framer/ascii_framer.py @@ -38,8 +38,6 @@ def __init__(self, decoder, client=None): :param decoder: The decoder implementation to use """ super().__init__(decoder, client) - self._buffer = b"" - self._header = {"lrc": "0000", "len": 0, "uid": 0x00} self._hsize = 0x02 self._start = b":" self._end = b"\r\n" @@ -117,18 +115,6 @@ def getFrame(self): return a2b_hex(buffer) return b"" - def resetFrame(self): - """Reset the entire message frame. - - This allows us to skip ovver errors that may be in the stream. - It is hard to know if we are simply out of sync or if there is - an error in the stream as we have no way to check the start or - end of the message (python just doesn't have the resolution to - check for millisecond delays). - """ - self._buffer = b"" - self._header = {"lrc": "0000", "len": 0, "uid": 0x00} - def populateResult(self, result): """Populate the modbus result header. diff --git a/pymodbus/framer/base.py b/pymodbus/framer/base.py index 63f0819391..33d4215b03 100644 --- a/pymodbus/framer/base.py +++ b/pymodbus/framer/base.py @@ -3,6 +3,7 @@ from typing import Any, Dict, Union from pymodbus.factory import ClientDecoder, ServerDecoder +from pymodbus.logging import Log # Unit ID, Function Code @@ -32,7 +33,15 @@ def __init__( """ self.decoder = decoder self.client = client - self._header: Dict[str, Any] = {} + self._header: Dict[str, Any] = { + "lrc": "0000", + "len": 0, + "uid": 0x00, + "tid": 0, + "pid": 0, + "crc": b"\x00\x00", + } + self._buffer = b"" def _validate_slave_id(self, slaves: list, single: bool) -> bool: """Validate if the received data is valid for the client. @@ -66,3 +75,25 @@ def recvPacket(self, size): :return: """ return self.client.recv(size) + + def resetFrame(self): + """Reset the entire message frame. + + This allows us to skip ovver errors that may be in the stream. + It is hard to know if we are simply out of sync or if there is + an error in the stream as we have no way to check the start or + end of the message (python just doesn't have the resolution to + check for millisecond delays). + """ + Log.debug( + "Resetting frame - Current Frame in buffer - {}", self._buffer, ":hex" + ) + self._buffer = b"" + self._header = { + "lrc": "0000", + "crc": b"\x00\x00", + "len": 0, + "uid": 0x00, + "pid": 0, + "tid": 0, + } diff --git a/pymodbus/framer/binary_framer.py b/pymodbus/framer/binary_framer.py index d065d9fb4d..87ffca6274 100644 --- a/pymodbus/framer/binary_framer.py +++ b/pymodbus/framer/binary_framer.py @@ -47,8 +47,7 @@ def __init__(self, decoder, client=None): :param decoder: The decoder implementation to use """ super().__init__(decoder, client) - self._buffer = b"" - self._header = {"crc": 0x0000, "len": 0, "uid": 0x00} + # self._header.update({"crc": 0x0000}) self._hsize = 0x01 self._start = b"\x7b" # { self._end = b"\x7d" # } @@ -209,17 +208,5 @@ def _preflight(self, data): array.append(item) return bytes(array) - def resetFrame(self): - """Reset the entire message frame. - - This allows us to skip ovver errors that may be in the stream. - It is hard to know if we are simply out of sync or if there is - an error in the stream as we have no way to check the start or - end of the message (python just doesn't have the resolution to - check for millisecond delays). - """ - self._buffer = b"" - self._header = {"crc": 0x0000, "len": 0, "uid": 0x00} - # __END__ diff --git a/pymodbus/framer/rtu_framer.py b/pymodbus/framer/rtu_framer.py index 16254f68a4..1b97423858 100644 --- a/pymodbus/framer/rtu_framer.py +++ b/pymodbus/framer/rtu_framer.py @@ -59,8 +59,6 @@ def __init__(self, decoder, client=None): :param decoder: The decoder factory implementation to use """ super().__init__(decoder, client) - self._buffer = b"" - self._header = {"uid": 0x00, "len": 0, "crc": b"\x00\x00"} self._hsize = 0x01 self._end = b"\x0d\x0a" self._min_frame_size = 4 @@ -115,11 +113,9 @@ def resetFrame(self): end of the message (python just doesn't have the resolution to check for millisecond delays). """ - Log.debug( - "Resetting frame - Current Frame in buffer - {}", self._buffer, ":hex" - ) - # self._buffer = b"" - self._header = {"uid": 0x00, "len": 0, "crc": b"\x00\x00"} + x = self._buffer + super().resetFrame() + self._buffer = x def isFrameReady(self): """Check if we should continue decode logic. diff --git a/pymodbus/framer/socket_framer.py b/pymodbus/framer/socket_framer.py index 043d2b0d6f..8b36eb847c 100644 --- a/pymodbus/framer/socket_framer.py +++ b/pymodbus/framer/socket_framer.py @@ -42,8 +42,6 @@ def __init__(self, decoder, client=None): :param decoder: The decoder factory implementation to use """ super().__init__(decoder, client) - self._buffer = b"" - self._header = {"tid": 0, "pid": 0, "len": 0, "uid": 0} self._hsize = 0x07 # ----------------------------------------------------------------------- # @@ -187,18 +185,6 @@ def _process(self, callback, tid, error=False): else: callback(result) # defer or push to a thread? - def resetFrame(self): - """Reset the entire message frame. - - This allows us to skip ovver errors that may be in the stream. - It is hard to know if we are simply out of sync or if there is - an error in the stream as we have no way to check the start or - end of the message (python just doesn't have the resolution to - check for millisecond delays). - """ - self._buffer = b"" - self._header = {"tid": 0, "pid": 0, "len": 0, "uid": 0} - def getRawFrame(self): """Return the complete buffer.""" return self._buffer diff --git a/pymodbus/framer/tls_framer.py b/pymodbus/framer/tls_framer.py index 252bedb0b6..169ef31baf 100644 --- a/pymodbus/framer/tls_framer.py +++ b/pymodbus/framer/tls_framer.py @@ -34,8 +34,6 @@ def __init__(self, decoder, client=None): :param decoder: The decoder factory implementation to use """ super().__init__(decoder, client) - self._buffer = b"" - self._header = {} self._hsize = 0x0 # ----------------------------------------------------------------------- # @@ -148,17 +146,6 @@ def _process(self, callback, error=False): self.advanceFrame() callback(result) # defer or push to a thread? - def resetFrame(self): - """Reset the entire message frame. - - This allows us to skip ovver errors that may be in the stream. - It is hard to know if we are simply out of sync or if there is - an error in the stream as we have no way to check the start or - end of the message (python just doesn't have the resolution to - check for millisecond delays). - """ - self._buffer = b"" - def getRawFrame(self): """Return the complete buffer.""" return self._buffer diff --git a/pymodbus/other_message.py b/pymodbus/other_message.py index 377464da0d..28115766c4 100644 --- a/pymodbus/other_message.py +++ b/pymodbus/other_message.py @@ -88,7 +88,7 @@ def __init__(self, status=0x00, **kwargs): :param status: The status response to report """ ModbusResponse.__init__(self, **kwargs) - self.status = status + self.status = status if status < 256 else 255 def encode(self): """Encode the response. diff --git a/pymodbus/server/async_io.py b/pymodbus/server/async_io.py index c1dbe0942c..2285ef85e2 100644 --- a/pymodbus/server/async_io.py +++ b/pymodbus/server/async_io.py @@ -124,6 +124,33 @@ def callback_disconnected(self, call_exc: Exception) -> None: traceback.format_exc(), ) + async def inner_handle(self): + """Handle handler.""" + slaves = self.server.context.slaves() + # this is an asyncio.Queue await, it will never fail + data = await self._recv_() + if isinstance(data, tuple): + # addr is populated when talking over UDP + data, *addr = data + else: + addr = (None,) # empty tuple + + # if broadcast is enabled make sure to + # process requests to address 0 + if self.server.broadcast_enable: # pragma: no cover + if 0 not in slaves: + slaves.append(0) + + Log.debug("Handling data: {}", data, ":hex") + + single = self.server.context.single + self.framer.processIncomingPacket( + data=data, + callback=lambda x: self.execute(x, *addr), + slave=slaves, + single=single, + ) + async def handle(self): """Return Asyncio coroutine which represents a single conversation. @@ -145,31 +172,7 @@ async def handle(self): reset_frame = False while self.running: try: - slaves = self.server.context.slaves() - # this is an asyncio.Queue await, it will never fail - data = await self._recv_() - if isinstance(data, tuple): - # addr is populated when talking over UDP - data, *addr = data - else: - addr = (None,) # empty tuple - - # if broadcast is enabled make sure to - # process requests to address 0 - if self.server.broadcast_enable: # pragma: no cover - if 0 not in slaves: - slaves.append(0) - - Log.debug("Handling data: {}", data, ":hex") - - single = self.server.context.single - self.framer.processIncomingPacket( - data=data, - callback=lambda x: self.execute(x, *addr), - slave=slaves, - single=single, - ) - + await self.inner_handle() except asyncio.CancelledError: # catch and ignore cancellation errors if self.running: @@ -251,7 +254,6 @@ def __send(msg, *addr): if kwargs.get("skip_encoding", False): __send(message, *addr) elif message.should_respond: - # self.server.control.Counter.BusMessage += 1 pdu = self.framer.buildPacket(message) __send(pdu, *addr) else: @@ -394,10 +396,12 @@ async def server_close(self): active_connecions = self.local_active_connections.copy() for k_item, v_item in active_connecions.items(): Log.warning("aborting active session {}", k_item) - v_item.transport.close() - await asyncio.sleep(0.1) - v_item.handler_task.cancel() - await v_item.handler_task + if v_item.transport: + v_item.transport.close() + await asyncio.sleep(0.1) + if v_item.handler_task: + v_item.handler_task.cancel() + await v_item.handler_task self.local_active_connections = {} self.transport_close() diff --git a/test/test_framers.py b/test/test_framers.py index 7d85724134..cdb9bdc71f 100644 --- a/test/test_framers.py +++ b/test/test_framers.py @@ -55,15 +55,21 @@ def test_framer_initialization(framer): assert framer.decoder == decoder if isinstance(framer, ModbusAsciiFramer): assert framer._header == { # pylint: disable=protected-access + "tid": 0, + "pid": 0, "lrc": "0000", "len": 0, "uid": 0x00, + "crc": b"\x00\x00", } assert framer._hsize == 0x02 # pylint: disable=protected-access assert framer._start == b":" # pylint: disable=protected-access assert framer._end == b"\r\n" # pylint: disable=protected-access elif isinstance(framer, ModbusRtuFramer): assert framer._header == { # pylint: disable=protected-access + "tid": 0, + "pid": 0, + "lrc": "0000", "uid": 0x00, "len": 0, "crc": b"\x00\x00", @@ -73,7 +79,10 @@ def test_framer_initialization(framer): assert framer._min_frame_size == 4 # pylint: disable=protected-access else: assert framer._header == { # pylint: disable=protected-access - "crc": 0x0000, + "tid": 0, + "pid": 0, + "lrc": "0000", + "crc": b"\x00\x00", "len": 0, "uid": 0x00, } @@ -143,9 +152,12 @@ def test_rtu_reset_framer(rtu_framer, data): rtu_framer._buffer = data # pylint: disable=protected-access rtu_framer.resetFrame() assert rtu_framer._header == { # pylint: disable=protected-access - "uid": 0x00, - "len": 0, + "lrc": "0000", "crc": b"\x00\x00", + "len": 0, + "uid": 0x00, + "pid": 0, + "tid": 0, } @@ -190,11 +202,25 @@ def test_rtu_populate_header_fail(rtu_framer, data): [ ( b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD", - {"crc": b"\x49\xAD", "uid": 17, "len": 11}, + { + "crc": b"\x49\xAD", + "uid": 17, + "len": 11, + "lrc": "0000", + "tid": 0, + "pid": 0, + }, ), ( b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\x11\x03", - {"crc": b"\x49\xAD", "uid": 17, "len": 11}, + { + "crc": b"\x49\xAD", + "uid": 17, + "len": 11, + "lrc": "0000", + "tid": 0, + "pid": 0, + }, ), ], )