diff --git a/pymodbus/factory.py b/pymodbus/factory.py index e8d6b46dc..947b8735d 100644 --- a/pymodbus/factory.py +++ b/pymodbus/factory.py @@ -164,7 +164,7 @@ def getFCdict(cls): def __init__(self): """Initialize the client lookup tables.""" functions = {f.function_code for f in self.__function_table} - self.__lookup = self.getFCdict() + self.lookup = self.getFCdict() self.__sub_lookup = {f: {} for f in functions} for f in self.__sub_function_table: self.__sub_lookup[f.function_code][f.sub_function_code] = f @@ -187,7 +187,7 @@ def lookupPduClass(self, function_code): :param function_code: The function code specified in a frame. :returns: The class of the PDU that has a matching `function_code`. """ - return self.__lookup.get(function_code, ExceptionResponse) + return self.lookup.get(function_code, ExceptionResponse) def _helper(self, data): """Generate the correct request object from a valid request packet. @@ -198,12 +198,12 @@ def _helper(self, data): :returns: The decoded request or illegal function request object """ function_code = int(data[0]) - if not (request := self.__lookup.get(function_code, lambda: None)()): + if not (request := self.lookup.get(function_code, lambda: None)()): Log.debug("Factory Request[{}]", function_code) request = IllegalFunctionRequest(function_code) else: fc_string = "%s: %s" % ( # pylint: disable=consider-using-f-string - str(self.__lookup[function_code]) # pylint: disable=use-maxsplit-arg + str(self.lookup[function_code]) # pylint: disable=use-maxsplit-arg .split(".")[-1] .rstrip('">"'), function_code, @@ -230,7 +230,7 @@ def register(self, function=None): ". Class needs to be derived from " "`pymodbus.pdu.ModbusRequest` " ) - self.__lookup[function.function_code] = function + self.lookup[function.function_code] = function if hasattr(function, "sub_function_code"): if function.function_code not in self.__sub_lookup: self.__sub_lookup[function.function_code] = {} @@ -293,7 +293,7 @@ class ClientDecoder: def __init__(self): """Initialize the client lookup tables.""" functions = {f.function_code for f in self.function_table} - self.__lookup = {f.function_code: f for f in self.function_table} + self.lookup = {f.function_code: f for f in self.function_table} self.__sub_lookup = {f: {} for f in functions} for f in self.__sub_function_table: self.__sub_lookup[f.function_code][f.sub_function_code] = f @@ -304,7 +304,7 @@ def lookupPduClass(self, function_code): :param function_code: The function code specified in a frame. :returns: The class of the PDU that has a matching `function_code`. """ - return self.__lookup.get(function_code, ExceptionResponse) + return self.lookup.get(function_code, ExceptionResponse) def decode(self, message): """Decode a response packet. @@ -330,15 +330,15 @@ def _helper(self, data): :raises ModbusException: """ fc_string = function_code = int(data[0]) - if function_code in self.__lookup: + if function_code in self.lookup: fc_string = "%s: %s" % ( # pylint: disable=consider-using-f-string - str(self.__lookup[function_code]) # pylint: disable=use-maxsplit-arg + str(self.lookup[function_code]) # pylint: disable=use-maxsplit-arg .split(".")[-1] .rstrip('">"'), function_code, ) Log.debug("Factory Response[{}]", fc_string) - response = self.__lookup.get(function_code, lambda: None)() + response = self.lookup.get(function_code, lambda: None)() if function_code > 0x80: code = function_code & 0x7F # strip error portion response = ExceptionResponse(code, ecode.IllegalFunction) @@ -361,7 +361,7 @@ def register(self, function): ". Class needs to be derived from " "`pymodbus.pdu.ModbusResponse` " ) - self.__lookup[function.function_code] = function + self.lookup[function.function_code] = function if hasattr(function, "sub_function_code"): if function.function_code not in self.__sub_lookup: self.__sub_lookup[function.function_code] = {} diff --git a/pymodbus/framer/rtu_framer.py b/pymodbus/framer/rtu_framer.py index dc149a0f3..af26cf88f 100644 --- a/pymodbus/framer/rtu_framer.py +++ b/pymodbus/framer/rtu_framer.py @@ -64,6 +64,7 @@ def __init__(self, decoder, client=None): self._hsize = 0x01 self._end = b"\x0d\x0a" self._min_frame_size = 4 + self.function_codes = set(self.decoder.lookup) if self.decoder else {} # ----------------------------------------------------------------------- # # Private Helper Functions @@ -117,7 +118,7 @@ def resetFrame(self): Log.debug( "Resetting frame - Current Frame in buffer - {}", self._buffer, ":hex" ) - self._buffer = b"" + # self._buffer = b"" self._header = {"uid": 0x00, "len": 0, "crc": b"\x00\x00"} def isFrameReady(self): @@ -191,6 +192,23 @@ def populateResult(self, result): result.slave_id = self._header["uid"] result.transaction_id = self._header["uid"] + def getFrameStart(self, slaves, broadcast, skip_cur_frame): + """Scan buffer for a relevant frame start.""" + start = 1 if skip_cur_frame else 0 + if (buf_len := len(self._buffer)) < 4: + return False + for i in range(start, buf_len - 3): # + if not broadcast and self._buffer[i] not in slaves: + continue + if self._buffer[i + 1] not in self.function_codes: + continue + if i: + self._buffer = self._buffer[i:] # remove preceding trash. + return True + if buf_len > 3: + self._buffer = self._buffer[-3:] + return False + # ----------------------------------------------------------------------- # # Public Member Functions # ----------------------------------------------------------------------- # @@ -214,25 +232,26 @@ def processIncomingPacket(self, data, callback, slave, **kwargs): """ if not isinstance(slave, (list, tuple)): slave = [slave] + broadcast = not slave[0] self.addToFrame(data) single = kwargs.get("single", False) - while True: - if self.isFrameReady(): - if self.checkFrame(): - if self._validate_slave_id(slave, single): - self._process(callback) - else: - header_txt = self._header["uid"] - Log.debug("Not a valid slave id - {}, ignoring!!", header_txt) - self.resetFrame() - break - else: - Log.debug("Frame check failed, ignoring!!") - self.resetFrame() - break - else: + skip_cur_frame = False + while self.getFrameStart(slave, broadcast, skip_cur_frame): + if not self.isFrameReady(): Log.debug("Frame - [{}] not ready", data) break + if not self.checkFrame(): + Log.debug("Frame check failed, ignoring!!") + self.resetFrame() + skip_cur_frame = True + continue + if not self._validate_slave_id(slave, single): + header_txt = self._header["uid"] + Log.debug("Not a valid slave id - {}, ignoring!!", header_txt) + self.resetFrame() + skip_cur_frame = True + continue + self._process(callback) def buildPacket(self, message): """Create a ready to send modbus packet. diff --git a/test/test_client_multidrop.py b/test/test_client_multidrop.py index 946b43138..bc145d096 100644 --- a/test/test_client_multidrop.py +++ b/test/test_client_multidrop.py @@ -30,21 +30,25 @@ def test_ok_frame(self, framer, callback): framer.processIncomingPacket(serial_event, callback, self.slaves) callback.assert_called_once() + def test_ok_2frame(self, framer, callback): + """Test ok frame.""" + serial_event = self.good_frame + self.good_frame + framer.processIncomingPacket(serial_event, callback, self.slaves) + assert callback.call_count == 2 + def test_bad_crc(self, framer, callback): """Test bad crc.""" serial_event = b"\x02\x03\x00\x01\x00}\xd4\x19" # Manually mangled crc framer.processIncomingPacket(serial_event, callback, self.slaves) callback.assert_not_called() - def test_wrong_unit(self, framer, callback): - """Test frame wrong unit""" - serial_event = ( - b"\x01\x03\x00\x01\x00}\xd4+" # Frame with good CRC but other unit id - ) + def test_wrong_id(self, framer, callback): + """Test frame wrong id""" + serial_event = b"\x01\x03\x00\x01\x00}\xd4+" # Frame with good CRC but other id framer.processIncomingPacket(serial_event, callback, self.slaves) callback.assert_not_called() - def test_big_split_response_frame_from_other_unit(self, framer, callback): + def test_big_split_response_frame_from_other_id(self, framer, callback): """Test split response.""" # This is a single *response* from device id 1 after being queried for 125 holding register values # Because the response is so long it spans several serial events @@ -70,26 +74,21 @@ def test_split_frame(self, framer, callback): framer.processIncomingPacket(serial_event, callback, self.slaves) callback.assert_called_once() - @pytest.mark.skip - def test_complete_frame_trailing_data_without_unit_id(self, framer, callback): + def test_complete_frame_trailing_data_without_id(self, framer, callback): """Test trailing data.""" - garbage = b"\x05\x04\x03" # Note the garbage doesn't contain our unit id + garbage = b"\x05\x04\x03" # without id serial_event = garbage + self.good_frame framer.processIncomingPacket(serial_event, callback, self.slaves) callback.assert_called_once() - @pytest.mark.skip - def test_complete_frame_trailing_data_with_unit_id(self, framer, callback): + def test_complete_frame_trailing_data_with_id(self, framer, callback): """Test trailing data.""" - garbage = ( - b"\x05\x04\x03\x02\x01\x00" # Note the garbage does contain our unit id - ) + garbage = b"\x05\x04\x03\x02\x01\x00" # with id serial_event = garbage + self.good_frame framer.processIncomingPacket(serial_event, callback, self.slaves) callback.assert_called_once() - @pytest.mark.skip - def test_split_frame_trailing_data_with_unit_id(self, framer, callback): + def test_split_frame_trailing_data_with_id(self, framer, callback): """Test split frame.""" garbage = b"\x05\x04\x03\x02\x01\x00" serial_events = [garbage + self.good_frame[:5], self.good_frame[5:]] @@ -97,6 +96,30 @@ def test_split_frame_trailing_data_with_unit_id(self, framer, callback): framer.processIncomingPacket(serial_event, callback, self.slaves) callback.assert_called_once() + def test_coincidental_1(self, framer, callback): + """Test conincidental.""" + garbage = b"\x02\x90\x07" + serial_events = [garbage, self.good_frame[:5], self.good_frame[5:]] + for serial_event in serial_events: + framer.processIncomingPacket(serial_event, callback, self.slaves) + callback.assert_called_once() + + def test_coincidental_2(self, framer, callback): + """Test conincidental.""" + garbage = b"\x02\x10\x07" + serial_events = [garbage, self.good_frame[:5], self.good_frame[5:]] + for serial_event in serial_events: + framer.processIncomingPacket(serial_event, callback, self.slaves) + callback.assert_called_once() + + def test_coincidental_3(self, framer, callback): + """Test conincidental.""" + garbage = b"\x02\x10\x07\x10" + serial_events = [garbage, self.good_frame[:5], self.good_frame[5:]] + for serial_event in serial_events: + framer.processIncomingPacket(serial_event, callback, self.slaves) + callback.assert_called_once() + def test_wrapped_frame(self, framer, callback): """Test wrapped frame.""" garbage = b"\x05\x04\x03\x02\x01\x00" @@ -107,9 +130,8 @@ def test_wrapped_frame(self, framer, callback): # i.e. this probably represents a case where a command came for us, but we didn't get # to the serial buffer in time (some other co-routine or perhaps a block on the USB bus) # and the master moved on and queried another device - callback.assert_not_called() + callback.assert_called_once() - @pytest.mark.skip def test_frame_with_trailing_data(self, framer, callback): """Test trailing data.""" garbage = b"\x05\x04\x03\x02\x01\x00" @@ -117,4 +139,31 @@ def test_frame_with_trailing_data(self, framer, callback): framer.processIncomingPacket(serial_event, callback, self.slaves) # We should not respond in this case for identical reasons as test_wrapped_frame - callback.assert_not_called() + callback.assert_called_once() + + def test_getFrameStart(self, framer): + """Test getFrameStart.""" + framer_ok = b"\x02\x03\x00\x01\x00}\xd4\x18" + framer._buffer = framer_ok # pylint: disable=protected-access + assert framer.getFrameStart(self.slaves, False, False) + assert framer_ok == framer._buffer # pylint: disable=protected-access + + framer_2ok = framer_ok + framer_ok + framer._buffer = framer_2ok # pylint: disable=protected-access + assert framer.getFrameStart(self.slaves, False, False) + assert framer_2ok == framer._buffer # pylint: disable=protected-access + assert framer.getFrameStart(self.slaves, False, True) + assert framer_ok == framer._buffer # pylint: disable=protected-access + + framer._buffer = framer_ok[:2] # pylint: disable=protected-access + assert not framer.getFrameStart(self.slaves, False, False) + assert framer_ok[:2] == framer._buffer # pylint: disable=protected-access + + framer._buffer = framer_ok[:3] # pylint: disable=protected-access + assert not framer.getFrameStart(self.slaves, False, False) + assert framer_ok[:3] == framer._buffer # pylint: disable=protected-access + + framer_ok = b"\xF0\x03\x00\x01\x00}\xd4\x18" + framer._buffer = framer_ok # pylint: disable=protected-access + assert not framer.getFrameStart(self.slaves, False, False) + assert framer._buffer == framer_ok[-3:] # pylint: disable=protected-access diff --git a/test/test_framers.py b/test/test_framers.py index de2eb34e3..b7df1fff9 100644 --- a/test/test_framers.py +++ b/test/test_framers.py @@ -143,7 +143,6 @@ def test_rtu_reset_framer(rtu_framer, data): # pylint: disable=redefined-outer- "len": 0, "crc": b"\x00\x00", } - assert rtu_framer._buffer == b"" # pylint: disable=protected-access @pytest.mark.parametrize( @@ -255,7 +254,7 @@ def test_populate_result(rtu_framer): # pylint: disable=redefined-outer-name ( b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD", 16, - True, + False, False, ), # incorrect slave id (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\x11\x03", 17, False, True), diff --git a/test/xmulti_rtu1 b/test/xmulti_rtu1 new file mode 100644 index 000000000..1d0a34a9a --- /dev/null +++ b/test/xmulti_rtu1 @@ -0,0 +1,80 @@ +import logging + +from pymodbus.framer.rtu_framer import ModbusRtuFramer +from pymodbus.utilities import checkCRC +import struct + +log = logging.getLogger(__name__) + + +class PatchedModbusRtuFramer(ModbusRtuFramer): + def isFrameIntendedForUs(self, units: list[int]): + try: + unit = self._buffer[0] + return unit in units + except IndexError as e: + return True + + def advanceToNextOccurrenceOfUnit(self, units: list[int]): + j = None + for i, b in enumerate(self._buffer): + if b in units and i > 0: + j = i + break + + if j: + self._buffer = self._buffer[j:] + else: + self._buffer = b"" + + self._header = {"uid": 0x00, "len": 0, "crc": b"\x00\x00"} + return len(self._buffer) + + def checkFrame(self): + try: + self.populateHeader() + frame_size = self._header["len"] + + if len(self._buffer) > frame_size: + # This means there are bytes *after* a valid frame intended to us. + # If there are bytes after that means we've probably de-sychronized + # and the master has considered us as having timed out and moved on. + # That or there is somehow a random sequence of bytes which looks like + # a real command to us and magically a crc. + # In either case, we must not respond. + return False + + data = self._buffer[: frame_size - 2] + crc = self._header["crc"] + crc_val = (int(crc[0]) << 8) + int(crc[1]) + return checkCRC(data, crc_val) + except (IndexError, KeyError, struct.error): + return False + + def processIncomingPacket(self, data, callback, unit, **kwargs): + if not isinstance(unit, (list, tuple)): + unit = [unit] + self.addToFrame(data) + single = kwargs.get("single", False) + while True: + if not self.isFrameIntendedForUs(unit): + if self.advanceToNextOccurrenceOfUnit(unit) == 0: + log.info(f"โŒ Frame - [{data}] not intended for us, ignoring!!") + break + elif self.isFrameReady(): + if self.checkFrame(): + if self._validate_unit_id(unit, single): + self._process(callback) + log.info(f"โœ… Frame - [{data}] responded to!!") + else: + header_txt = self._header["uid"] + log.info(f"Not a valid unit id - {header_txt}, ignoring!!") + self.resetFrame() + break + else: + log.info("Frame check failed, ignoring!!") + if self.advanceToNextOccurrenceOfUnit(unit) == 0: + break + else: + log.info(f"Frame - [{data}] not ready") + break diff --git a/test/xmulti_rtu2 b/test/xmulti_rtu2 new file mode 100644 index 000000000..25bec00f6 --- /dev/null +++ b/test/xmulti_rtu2 @@ -0,0 +1,104 @@ +import logging +import struct + +from pymodbus.framer.rtu_framer import ModbusRtuFramer +from pymodbus.utilities import checkCRC + +log = logging.getLogger(__name__) + + +class PatchedModbusRtuFramer(ModbusRtuFramer): + def isFrameIntendedForUs(self, units: list[int]): + try: + # Always validate that the unit id and function code + unit = self._buffer[0] + fc = self._buffer[1] + + valid_unit_id = unit in units + valid_function_code = fc in [2, 1, 5, 15, 4, 3, 6, 16, 43] + + if fc in [15, 16]: + # These commands will have arbitrary payload lengths + # Further validation is beneficial + n_registers = int.from_bytes(self._buffer[4:6], byteorder="big") + n_bytes = self._buffer[6] + + expected_number_of_bytes = n_registers * 2 + + valid_number_of_registers = n_registers <= 123 + valid_number_of_bytes = n_bytes == expected_number_of_bytes + return ( + valid_unit_id + and valid_function_code + and valid_number_of_registers + and valid_number_of_bytes + ) + return valid_unit_id and valid_function_code + + except IndexError as e: + return True + + def advanceToNextOccurrenceOfUnit(self, units: list[int]): + j = None + for i, b in enumerate(self._buffer): + if b in units and i > 0: + j = i + break + + if j: + log.debug("โญ") + self._buffer = self._buffer[j:] + else: + log.debug("๐Ÿ—‘") + self._buffer = b"" + + self._header = {"uid": 0x00, "len": 0, "crc": b"\x00\x00"} + return len(self._buffer) + + def checkFrame(self): + try: + self.populateHeader() + frame_size = self._header["len"] + + if len(self._buffer) > frame_size: + # This means there are bytes *after* a valid frame intended to us. + # If there are bytes after that means we've probably de-sychronized + # and the master has considered us as having timed out and moved on. + # That or there is somehow a random sequence of bytes which looks like + # a real command to us and magically a crc. + # In either case, we must not respond. + return False + + data = self._buffer[: frame_size - 2] + crc = self._header["crc"] + crc_val = (int(crc[0]) << 8) + int(crc[1]) + return checkCRC(data, crc_val) + except (IndexError, KeyError, struct.error): + return False + + def processIncomingPacket(self, data, callback, unit, **kwargs): + log.debug(f"๐ŸŒŸ {data.hex(':')}") + if not isinstance(unit, (list, tuple)): + unit = [unit] + self.addToFrame(data) + single = kwargs.get("single", False) + while True: + log.debug(f"๐Ÿงช {self._buffer.hex(':')}") + if not self.isFrameIntendedForUs(unit): + if self.advanceToNextOccurrenceOfUnit(unit) == 0: + break + elif self.isFrameReady(): + if self.checkFrame(): + if self._validate_unit_id(unit, single): + log.debug(f"โœ… {self._buffer.hex(':')}") + self._process(callback) + else: + self.resetFrame() + break + else: + log.debug("Frame check failed, ignoring!!") + if self.advanceToNextOccurrenceOfUnit(unit) == 0: + break + else: + log.debug(f"โŒ› {self._buffer.hex(':')}") + break diff --git a/test/xmulti_test1 b/test/xmulti_test1 new file mode 100644 index 000000000..66fdfdde0 --- /dev/null +++ b/test/xmulti_test1 @@ -0,0 +1,117 @@ +from unittest import mock + +import pytest +from pymodbus.server.async_io import ServerDecoder + +from fix_pymodbus.patched_modbus_rtu_framer import PatchedModbusRtuFramer + + +@pytest.fixture +def framer() -> PatchedModbusRtuFramer: + return PatchedModbusRtuFramer(ServerDecoder()) + + +@pytest.fixture +def callback() -> mock.Mock: + return mock.Mock() + + +expected_unit = [2] +good_frame = b"\x02\x03\x00\x01\x00}\xd4\x18" + + +def test_complete_frame(framer: PatchedModbusRtuFramer, callback: mock.Mock): + serial_event = good_frame + framer.processIncomingPacket(serial_event, callback, expected_unit) + callback.assert_called_once() + + +def test_complete_frame_bad_crc(framer: PatchedModbusRtuFramer, callback: mock.Mock): + serial_event = b"\x02\x03\x00\x01\x00}\xd4\x19" # Manually mangled crc + framer.processIncomingPacket(serial_event, callback, expected_unit) + callback.assert_not_called() + + +def test_complete_frame_wrong_unit(framer: PatchedModbusRtuFramer, callback: mock.Mock): + serial_event = ( + b"\x01\x03\x00\x01\x00}\xd4+" # Frame with good CRC but other unit id + ) + framer.processIncomingPacket(serial_event, callback, expected_unit) + callback.assert_not_called() + + +def test_big_split_response_frame_from_other_unit( + framer: PatchedModbusRtuFramer, callback: mock.Mock +): + # This is a single *response* from device id 1 after being queried for 125 holding register values + # Because the response is so long it spans several serial events + serial_events = [ + b"\x01\x03\xfa\xc4y\xc0\x00\xc4y\xc0\x00\xc4y\xc0\x00\xc4y\xc0\x00\xc4y\xc0\x00Dz\x00\x00C\x96\x00\x00", + b"?\x05\x1e\xb8DH\x00\x00D\x96\x00\x00D\xfa\x00\x00DH\x00\x00D\x96\x00\x00D\xfa\x00\x00DH\x00", + b"\x00D\x96\x00\x00D\xfa\x00\x00B\x96\x00\x00B\xb4\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + b"\x00\x00\x00\x00\x00\x00\x00N,", + ] + for serial_event in serial_events: + framer.processIncomingPacket(serial_event, callback, expected_unit) + callback.assert_not_called() + + +def test_split_frame(framer: PatchedModbusRtuFramer, callback: mock.Mock): + serial_events = [good_frame[:5], good_frame[5:]] + for serial_event in serial_events: + framer.processIncomingPacket(serial_event, callback, expected_unit) + callback.assert_called_once() + + +def test_complete_frame_trailing_data_without_unit_id( + framer: PatchedModbusRtuFramer, callback: mock.Mock +): + garbage = b"\x05\x04\x03" # Note the garbage doesn't contain our unit id + serial_event = garbage + good_frame + framer.processIncomingPacket(serial_event, callback, expected_unit) + callback.assert_called_once() + + +def test_complete_frame_trailing_data_with_unit_id( + framer: PatchedModbusRtuFramer, callback: mock.Mock +): + garbage = b"\x05\x04\x03\x02\x01\x00" # Note the garbage does contain our unit id + serial_event = garbage + good_frame + framer.processIncomingPacket(serial_event, callback, expected_unit) + callback.assert_called_once() + + +def test_split_frame_trailing_data_with_unit_id( + framer: PatchedModbusRtuFramer, callback: mock.Mock +): + garbage = b"\x05\x04\x03\x02\x01\x00" + serial_events = [garbage + good_frame[:5], good_frame[5:]] + for serial_event in serial_events: + framer.processIncomingPacket(serial_event, callback, expected_unit) + callback.assert_called_once() + + +def test_wrapped_frame(framer: PatchedModbusRtuFramer, callback: mock.Mock): + garbage = b"\x05\x04\x03\x02\x01\x00" + serial_event = garbage + good_frame + garbage + framer.processIncomingPacket(serial_event, callback, expected_unit) + + # We probably should not respond in this case; in this case we've likely become desynchronized + # i.e. this probably represents a case where a command came for us, but we didn't get + # to the serial buffer in time (some other co-routine or perhaps a block on the USB bus) + # and the master moved on and queried another device + callback.assert_not_called() + + +def test_frame_with_trailing_data(framer: PatchedModbusRtuFramer, callback: mock.Mock): + garbage = b"\x05\x04\x03\x02\x01\x00" + serial_event = good_frame + garbage + framer.processIncomingPacket(serial_event, callback, expected_unit) + + # We should not respond in this case for identical reasons as test_wrapped_frame + callback.assert_not_called() diff --git a/test/xmulti_test2 b/test/xmulti_test2 new file mode 100644 index 000000000..1c584f620 --- /dev/null +++ b/test/xmulti_test2 @@ -0,0 +1,133 @@ +from unittest import mock + +import pytest +from pymodbus.server.async_io import ServerDecoder + +from fix_pymodbus.patched_modbus_rtu_framer import PatchedModbusRtuFramer + + +@pytest.fixture +def framer() -> PatchedModbusRtuFramer: + return PatchedModbusRtuFramer(ServerDecoder()) + + +@pytest.fixture +def callback() -> mock.Mock: + return mock.Mock() + + +expected_unit = [2] +good_frame = b"\x02\x03\x00\x01\x00}\xd4\x18" + + +def test_complete_frame(framer: PatchedModbusRtuFramer, callback: mock.Mock): + serial_event = good_frame + framer.processIncomingPacket(serial_event, callback, expected_unit) + callback.assert_called_once() + + +def test_complete_frame_bad_crc(framer: PatchedModbusRtuFramer, callback: mock.Mock): + serial_event = b"\x02\x03\x00\x01\x00}\xd4\x19" # Manually mangled crc + framer.processIncomingPacket(serial_event, callback, expected_unit) + callback.assert_not_called() + + +def test_complete_frame_wrong_unit(framer: PatchedModbusRtuFramer, callback: mock.Mock): + serial_event = ( + b"\x01\x03\x00\x01\x00}\xd4+" # Frame with good CRC but other unit id + ) + framer.processIncomingPacket(serial_event, callback, expected_unit) + callback.assert_not_called() + + +def test_big_split_response_frame_from_other_unit( + framer: PatchedModbusRtuFramer, callback: mock.Mock +): + # This is a single *response* from device id 1 after being queried for 125 holding register values + # Because the response is so long it spans several serial events + serial_events = [ + b"\x01\x03\xfa\xc4y\xc0\x00\xc4y\xc0\x00\xc4y\xc0\x00\xc4y\xc0\x00\xc4y\xc0\x00Dz\x00\x00C\x96\x00\x00", + b"?\x05\x1e\xb8DH\x00\x00D\x96\x00\x00D\xfa\x00\x00DH\x00\x00D\x96\x00\x00D\xfa\x00\x00DH\x00", + b"\x00D\x96\x00\x00D\xfa\x00\x00B\x96\x00\x00B\xb4\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + b"\x00\x00\x00\x00\x00\x00\x00N,", + ] + for serial_event in serial_events: + framer.processIncomingPacket(serial_event, callback, expected_unit) + callback.assert_not_called() + + +def test_split_frame(framer: PatchedModbusRtuFramer, callback: mock.Mock): + serial_events = [good_frame[:5], good_frame[5:]] + for serial_event in serial_events: + framer.processIncomingPacket(serial_event, callback, expected_unit) + callback.assert_called_once() + + +def test_complete_frame_trailing_data_without_unit_id( + framer: PatchedModbusRtuFramer, callback: mock.Mock +): + garbage = b"\x05\x04\x03" # Note the garbage doesn't contain our unit id + serial_event = garbage + good_frame + framer.processIncomingPacket(serial_event, callback, expected_unit) + callback.assert_called_once() + + +def test_complete_frame_trailing_data_with_unit_id( + framer: PatchedModbusRtuFramer, callback: mock.Mock +): + garbage = b"\x05\x04\x03\x02\x01\x00" # Note the garbage does contain our unit id + serial_event = garbage + good_frame + framer.processIncomingPacket(serial_event, callback, expected_unit) + callback.assert_called_once() + + +def test_split_frame_trailing_data_with_unit_id( + framer: PatchedModbusRtuFramer, callback: mock.Mock +): + garbage = b"\x05\x04\x03\x02\x01\x00" + serial_events = [garbage + good_frame[:5], good_frame[5:]] + for serial_event in serial_events: + framer.processIncomingPacket(serial_event, callback, expected_unit) + callback.assert_called_once() + + +def test_coincidental_1(framer: PatchedModbusRtuFramer, callback: mock.Mock): + garbage = b"\x02\x14\x07" + serial_events = [garbage, good_frame[:5], good_frame[5:]] + for serial_event in serial_events: + framer.processIncomingPacket(serial_event, callback, expected_unit) + callback.assert_called_once() + + +def test_coincidental_2(framer: PatchedModbusRtuFramer, callback: mock.Mock): + garbage = b"\x02\x10\x07" + serial_events = [garbage, good_frame[:5], good_frame[5:]] + for serial_event in serial_events: + framer.processIncomingPacket(serial_event, callback, expected_unit) + callback.assert_called_once() + + +def test_wrapped_frame(framer: PatchedModbusRtuFramer, callback: mock.Mock): + garbage = b"\x05\x04\x03\x02\x01\x00" + serial_event = garbage + good_frame + garbage + framer.processIncomingPacket(serial_event, callback, expected_unit) + + # We probably should not respond in this case; in this case we've likely become desynchronized + # i.e. this probably represents a case where a command came for us, but we didn't get + # to the serial buffer in time (some other co-routine or perhaps a block on the USB bus) + # and the master moved on and queried another device + callback.assert_not_called() + + +def test_frame_with_trailing_data(framer: PatchedModbusRtuFramer, callback: mock.Mock): + garbage = b"\x05\x04\x03\x02\x01\x00" + serial_event = good_frame + garbage + framer.processIncomingPacket(serial_event, callback, expected_unit) + + # We should not respond in this case for identical reasons as test_wrapped_frame + callback.assert_not_called()