From 29d3558a21fbc530c425655e33302405e416b6dc Mon Sep 17 00:00:00 2001 From: jan iversen Date: Mon, 26 Feb 2024 12:26:55 +0100 Subject: [PATCH 1/4] integrate message.encode() into framer.buildPacket. --- pymodbus/message/rtu.py | 56 +- pyproject.toml | 6 +- test/message/generator.py | 46 ++ test/message/test_ascii.py | 19 +- test/message/test_message.py | 109 +++ test/message/test_rtu.py | 101 +++ .../{to_do_framers_py => to_do_framers.py} | 16 - ...multidrop_py => to_do_server_multidrop.py} | 0 test/message/to_do_transaction.py | 744 +++++++++++++++++ test/message/to_do_transaction_py | 782 ------------------ test/test_transaction.py | 4 +- 11 files changed, 1062 insertions(+), 821 deletions(-) create mode 100755 test/message/generator.py create mode 100644 test/message/test_rtu.py rename test/message/{to_do_framers_py => to_do_framers.py} (96%) rename test/message/{to_do_server_multidrop_py => to_do_server_multidrop.py} (100%) create mode 100755 test/message/to_do_transaction.py delete mode 100755 test/message/to_do_transaction_py diff --git a/pymodbus/message/rtu.py b/pymodbus/message/rtu.py index fb3df4feb..d7abc1099 100644 --- a/pymodbus/message/rtu.py +++ b/pymodbus/message/rtu.py @@ -40,10 +40,62 @@ class MessageRTU(MessageBase): neither when receiving nor when sending. """ + @classmethod + def generate_crc16_table(cls) -> list[int]: + """Generate a crc16 lookup table. + + .. note:: This will only be generated once + """ + result = [] + for byte in range(256): + crc = 0x0000 + for _ in range(8): + if (byte ^ crc) & 0x0001: + crc = (crc >> 1) ^ 0xA001 + else: + crc >>= 1 + byte >>= 1 + result.append(crc) + return result + crc16_table: list[int] = [0] + def decode(self, _data: bytes) -> tuple[int, int, int, bytes]: """Decode message.""" return 0, 0, 0, b'' - def encode(self, _data: bytes, _device_id: int, _tid: int) -> bytes: + def encode(self, data: bytes, device_id: int, _tid: int) -> bytes: """Decode message.""" - return b'' + packet = device_id.to_bytes(1,'big') + data + return packet + MessageRTU.compute_CRC(packet).to_bytes(2,'big') + + @classmethod + def check_CRC(cls, data: bytes, check: int) -> bool: + """Check if the data matches the passed in CRC. + + :param data: The data to create a crc16 of + :param check: The CRC to validate + :returns: True if matched, False otherwise + """ + return cls.compute_CRC(data) == check + + @classmethod + def compute_CRC(cls, data: bytes) -> int: + """Compute a crc16 on the passed in bytes. + + For modbus, this is only used on the binary serial protocols (in this + case RTU). + + The difference between modbus's crc16 and a normal crc16 + is that modbus starts the crc value out at 0xffff. + + :param data: The data to create a crc16 of + :returns: The calculated CRC + """ + crc = 0xFFFF + for data_byte in data: + idx = cls.crc16_table[(crc ^ int(data_byte)) & 0xFF] + crc = ((crc >> 8) & 0xFF) ^ idx + swapped = ((crc << 8) & 0xFF00) | ((crc >> 8) & 0x00FF) + return swapped + +MessageRTU.crc16_table = MessageRTU.generate_crc16_table() diff --git a/pyproject.toml b/pyproject.toml index 695f47983..de992bef0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -225,7 +225,11 @@ source = [ "pymodbus/", "test/", ] -omit = ["examples/contrib/"] +omit = [ + "examples/contrib/", + "test/message/to_do*", + "test/message/generator.py", + ] branch = true [tool.coverage.report] diff --git a/test/message/generator.py b/test/message/generator.py new file mode 100755 index 000000000..038f0d2b6 --- /dev/null +++ b/test/message/generator.py @@ -0,0 +1,46 @@ +#!/usr/bin/env python3 +"""Build framer encode responses.""" + +from pymodbus.factory import ClientDecoder, ServerDecoder +from pymodbus.framer import ( + ModbusAsciiFramer, + ModbusRtuFramer, + ModbusSocketFramer, + ModbusTlsFramer, +) +from pymodbus.pdu import ModbusExceptions as merror +from pymodbus.register_read_message import ( + ReadHoldingRegistersRequest, + ReadHoldingRegistersResponse, +) + + +def set_calls(): + """Define calls.""" + for framer in (ModbusAsciiFramer, ModbusRtuFramer, ModbusSocketFramer, ModbusTlsFramer): + print(f"framer --> {framer}") + for dev_id in (0, 17, 255): + print(f" dev_id --> {dev_id}") + for tid in (0, 3077): + print(f" tid --> {tid}") + client = framer(ClientDecoder()) + request = ReadHoldingRegistersRequest(124, 2, dev_id) + request.transaction_id = tid + result = client.buildPacket(request) + print(f" request --> {result}") + print(f" request --> {result.hex()}") + server = framer(ServerDecoder()) + response = ReadHoldingRegistersResponse([141,142]) + response.slave_id = dev_id + response.transaction_id = tid + result = server.buildPacket(response) + print(f" response --> {result}") + print(f" response --> {result.hex()}") + exception = request.doException(merror.IllegalAddress) + exception.transaction_id = tid + exception.slave_id = dev_id + result = server.buildPacket(exception) + print(f" exception --> {result}") + print(f" exception --> {result.hex()}") + +set_calls() diff --git a/test/message/test_ascii.py b/test/message/test_ascii.py index 61fe91c24..5e46c258b 100644 --- a/test/message/test_ascii.py +++ b/test/message/test_ascii.py @@ -1,6 +1,4 @@ """Test transport.""" -import struct - import pytest from pymodbus.message.ascii import MessageAscii @@ -16,24 +14,9 @@ def prepare_frame(): return MessageAscii([1], False) - def test_check_LRC(self): - """Test check_LRC.""" - data = struct.pack(">HHHH", 0x1234, 0x2345, 0x3456, 0x4567) - assert MessageAscii.check_LRC(data, 0x1C) - - def test_check_noLRC(self): - """Test check_LRC.""" - data = struct.pack(">HHHH", 0x1234, 0x2345, 0x3456, 0x4567) - assert not MessageAscii.check_LRC(data, 0x0C) - - def test_compute_LRC(self): - """Test compute_LRC.""" - data = struct.pack(">HHHH", 0x1234, 0x2345, 0x3456, 0x4567) - assert MessageAscii.compute_LRC(data) == 0x1c - def test_roundtrip_LRC(self): """Test combined compute/check LRC.""" - data = struct.pack(">HHHH", 0x1234, 0x2345, 0x3456, 0x4567) + data = b'\x12\x34\x23\x45\x34\x56\x45\x67' assert MessageAscii.compute_LRC(data) == 0x1c assert MessageAscii.check_LRC(data, 0x1C) diff --git a/test/message/test_message.py b/test/message/test_message.py index 436c5a536..ef5a3809a 100644 --- a/test/message/test_message.py +++ b/test/message/test_message.py @@ -5,6 +5,10 @@ import pytest from pymodbus.message import MessageType +from pymodbus.message.ascii import MessageAscii +from pymodbus.message.rtu import MessageRTU +from pymodbus.message.socket import MessageSocket +from pymodbus.message.tls import MessageTLS from pymodbus.transport import CommParams @@ -96,3 +100,108 @@ async def test_encode(self, msg, data, dev_id, tid, res_data): """Test decode method in all types.""" t_data = msg.msg_handle.encode(data, dev_id, tid) assert res_data == t_data + + @pytest.mark.parametrize( + ("func", "lrc", "expect"), + [(MessageAscii.check_LRC, 0x1c, True), + (MessageAscii.check_LRC, 0x0c, False), + (MessageAscii.compute_LRC, None, 0x1c), + (MessageRTU.check_CRC, 0xE2DB, True), + (MessageRTU.check_CRC, 0xDBE2, False), + (MessageRTU.compute_CRC, None, 0xE2DB), + ] + ) + def test_LRC_CRC(self, func, lrc, expect): + """Test check_LRC.""" + data = b'\x12\x34\x23\x45\x34\x56\x45\x67' + assert expect == func(data, lrc) if lrc else func(data) + + +class TestMessages: # pylint: disable=too-few-public-methods + """Test message classes.""" + + @pytest.mark.parametrize( + ("frame", "frame_expected"), + [ + (MessageAscii, [ + b':0003007C00027F\r\n', + b':000304008D008EDE\r\n', + b':0083027B\r\n', + b':1103007C00026E\r\n', + b':110304008D008ECD\r\n', + b':1183026A\r\n', + b':FF03007C000280\r\n', + b':FF0304008D008EDF\r\n', + b':FF83027C\r\n', + ]), + (MessageRTU, [ + b'\x00\x03\x00\x7c\x00\x02\x04\x02', + b'\x00\x03\x04\x00\x8d\x00\x8e\xfa\xbc', + b'\x00\x83\x02\x91\x31', + b'\x11\x03\x00\x7c\x00\x02\x07\x43', + b'\x11\x03\x04\x00\x8d\x00\x8e\xfb\xbd', + b'\x11\x83\x02\xc1\x34', + b'\xff\x03\x00|\x00\x02\x10\x0d', + b'\xff\x03\x04\x00\x8d\x00\x8e\xf5\xb3', + b'\xff\x83\x02\xa1\x01', + ]), + (MessageSocket, [ + b'\x00\x00\x00\x00\x00\x06\x00\x03\x00\x7c\x00\x02', + b'\x00\x00\x00\x00\x00\x07\x00\x03\x04\x00\x8d\x00\x8e', + b'\x00\x00\x00\x00\x00\x03\x00\x83\x02', + b'\x00\x00\x00\x00\x00\x06\x11\x03\x00\x7c\x00\x02', + b'\x00\x00\x00\x00\x00\x07\x11\x03\x04\x00\x8d\x00\x8e', + b'\x00\x00\x00\x00\x00\x03\x11\x83\x02', + b'\x00\x00\x00\x00\x00\x06\xff\x03\x00\x7c\x00\x02', + b'\x00\x00\x00\x00\x00\x07\xff\x03\x04\x00\x8d\x00\x8e', + b'\x00\x00\x00\x00\x00\x03\xff\x83\x02', + b'\x0c\x05\x00\x00\x00\x06\x00\x03\x00\x7c\x00\x02', + b'\x0c\x05\x00\x00\x00\x07\x00\x03\x04\x00\x8d\x00\x8e', + b'\x0c\x05\x00\x00\x00\x03\x00\x83\x02', + b'\x0c\x05\x00\x00\x00\x06\x11\x03\x00\x7c\x00\x02', + b'\x0c\x05\x00\x00\x00\x07\x11\x03\x04\x00\x8d\x00\x8e', + b'\x0c\x05\x00\x00\x00\x03\x11\x83\x02', + b'\x0c\x05\x00\x00\x00\x06\xff\x03\x00\x7c\x00\x02', + b'\x0c\x05\x00\x00\x00\x07\xff\x03\x04\x00\x8d\x00\x8e', + b'\x0c\x05\x00\x00\x00\x03\xff\x83\x02', + ]), + (MessageTLS, [ + b'\x03\x00\x7c\x00\x02', + b'\x03\x04\x00\x8d\x00\x8e', + b'\x83\x02', + ]), + ] + ) + @pytest.mark.parametrize( + ("inx1", "data"), + [ + (0, b"\x03\x00\x7c\x00\x02",), # Request + (1, b"\x03\x04\x00\x8d\x00\x8e",), # Response + (2, b'\x83\x02',), # Exception + ] + ) + @pytest.mark.parametrize( + ("inx2", "dev_id"), + [ + (0, 0), + (3, 17), + (6, 255), + ] + ) + @pytest.mark.parametrize( + ("inx3", "tid"), + [ + (0, 0), + (9, 3077), + ] + ) + def test_encode(self, frame, frame_expected, data, dev_id, tid, inx1, inx2, inx3): + """Test encode method.""" + if frame != MessageSocket and tid: + return + if frame == MessageTLS and (tid or dev_id): + return + frame_obj = frame(None, True) + expected = frame_expected[inx1 + inx2 + inx3] + encoded_data = frame_obj.encode(data, dev_id, tid) + assert encoded_data == expected diff --git a/test/message/test_rtu.py b/test/message/test_rtu.py new file mode 100644 index 000000000..4ae1244d4 --- /dev/null +++ b/test/message/test_rtu.py @@ -0,0 +1,101 @@ +"""Test transport.""" +import pytest + +from pymodbus.message.rtu import MessageRTU + + +class TestMessageRTU: + """Test message module.""" + + @staticmethod + @pytest.fixture(name="frame") + def prepare_frame(): + """Return message object.""" + return MessageRTU([1], False) + + def test_crc16_table(self): + """Test the crc16 table is prefilled.""" + assert len(MessageRTU.crc16_table) == 256 + assert isinstance(MessageRTU.crc16_table[0], int) + assert isinstance(MessageRTU.crc16_table[255], int) + + def test_roundtrip_CRC(self): + """Test combined compute/check CRC.""" + data = b'\x12\x34\x23\x45\x34\x56\x45\x67' + assert MessageRTU.compute_CRC(data) == 0xE2DB + assert MessageRTU.check_CRC(data, 0xE2DB) + + # b"\x02\x01\x01\x00Q\xcc" + # b"\x01\x01\x03\x01\x00\n\xed\x89" + # b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x43" + # b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD" + + # b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\x11\x03" # good frame + part of next frame + + # b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAC" # invalid frame CRC + # b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAC" # bad crc + # b"\x61\x62\x00\x01\x00\n\xec\x1c" # bad function code + # b"\x01\x03\x03\x01\x00\n\x94\x49" # Not ok + + # test frame ready + # (b"", False), + # (b"\x11", False), + # (b"\x11\x03", False), + # (b"\x11\x03\x06", False), + # (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49", False), + # (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD", True), + # (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\xAB\xCD", True), + + + @pytest.mark.parametrize( + ("packet", "used_len", "res_id", "res"), + [ + (b':010100010001FC\r\n', 17, 1, b'\x01\x00\x01\x00\x01'), + (b':00010001000AF4\r\n', 17, 0, b'\x01\x00\x01\x00\x0a'), + (b':01010001000AF3\r\n', 17, 1, b'\x01\x00\x01\x00\x0a'), + (b':61620001000A32\r\n', 17, 97, b'\x62\x00\x01\x00\x0a'), + (b':01270001000ACD\r\n', 17, 1, b'\x27\x00\x01\x00\x0a'), + (b':010100', 0, 0, b''), # short frame + (b':00010001000AF4', 0, 0, b''), + (b'abc:00010001000AF4', 3, 0, b''), # garble before frame + (b'abc00010001000AF4', 17, 0, b''), # only garble + (b':01010001000A00\r\n', 17, 0, b''), + ], + ) + def xtest_decode(self, frame, packet, used_len, res_id, res): + """Test decode.""" + res_len, tid, dev_id, data = frame.decode(packet) + assert res_len == used_len + assert data == res + assert not tid + assert dev_id == res_id + + @pytest.mark.parametrize( + ("data", "dev_id", "res_msg"), + [ + (b'\x01\x01\x00', 2, b'\x02\x01\x01\x00\x51\xcc'), + (b'\x03\x06\xAE\x41\x56\x52\x43\x40', 17, b'\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD'), + (b'\x01\x03\x01\x00\x0a', 1, b'\x01\x01\x03\x01\x00\x0a\xed\x89'), + ], + ) + def test_encode(self, frame, data, dev_id, res_msg): + """Test encode.""" + msg = frame.encode(data, dev_id, 0) + assert res_msg == msg + assert dev_id == int(msg[0]) + + @pytest.mark.parametrize( + ("data", "dev_id", "res_msg"), + [ + (b'\x01\x01\x00', 2, b'\x02\x01\x01\x00\x51\xcc'), + (b'\x03\x06\xAE\x41\x56\x52\x43\x40', 17, b'\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD'), + (b'\x01\x03\x01\x00\x0a', 1, b'\x01\x01\x03\x01\x00\x0a\xed\x89'), + ], + ) + def xtest_roundtrip(self, frame, data, dev_id, res_msg): + """Test encode.""" + msg = frame.encode(data, dev_id, 0) + res_len, _, res_id, res_data = frame.decode(msg) + assert data == res_data + assert dev_id == res_id + assert res_len == len(res_msg) diff --git a/test/message/to_do_framers_py b/test/message/to_do_framers.py similarity index 96% rename from test/message/to_do_framers_py rename to test/message/to_do_framers.py index 0eafac228..d93b6c081 100644 --- a/test/message/to_do_framers_py +++ b/test/message/to_do_framers.py @@ -4,7 +4,6 @@ import pytest from pymodbus import Framer -from pymodbus.bit_read_message import ReadCoilsRequest from pymodbus.client.base import ModbusBaseClient from pymodbus.exceptions import ModbusIOException from pymodbus.factory import ClientDecoder @@ -449,21 +448,6 @@ def _handle_response(_reply): assert response_ok, "Response is valid, but not accepted" # ---- 100% coverage - @pytest.mark.parametrize( - ("framer", "message"), - [ - (ModbusAsciiFramer, b':00010001000AF4\r\n',), - (ModbusBinaryFramer, b'{\x00\x01\x00\x01\x00\n\xec\x1c}',), - (ModbusRtuFramer, b"\x00\x01\x00\x01\x00\n\xec\x1c",), - (ModbusSocketFramer, b'\x00\x00\x00\x00\x00\x06\x00\x01\x00\x01\x00\n',), - ] - ) - def test_build_packet(self, framer, message): - """Test build packet.""" - test_framer = framer(ClientDecoder()) - request = ReadCoilsRequest(1, 10) - assert test_framer.buildPacket(request) == message - @pytest.mark.parametrize( ("framer", "message"), diff --git a/test/message/to_do_server_multidrop_py b/test/message/to_do_server_multidrop.py similarity index 100% rename from test/message/to_do_server_multidrop_py rename to test/message/to_do_server_multidrop.py diff --git a/test/message/to_do_transaction.py b/test/message/to_do_transaction.py new file mode 100755 index 000000000..36a2c3df0 --- /dev/null +++ b/test/message/to_do_transaction.py @@ -0,0 +1,744 @@ +"""Test transaction.""" +from itertools import count +from unittest import mock + +from pymodbus.exceptions import ( + ModbusIOException, +) +from pymodbus.factory import ServerDecoder +from pymodbus.pdu import ModbusRequest +from pymodbus.transaction import ( + ModbusAsciiFramer, + ModbusBinaryFramer, + ModbusRtuFramer, + ModbusSocketFramer, + ModbusTlsFramer, + ModbusTransactionManager, +) + + +TEST_MESSAGE = b"\x7b\x01\x03\x00\x00\x00\x05\x85\xC9\x7d" + + +class TestTransaction: # pylint: disable=too-many-public-methods + """Unittest for the pymodbus.transaction module.""" + + client = None + decoder = None + _tcp = None + _tls = None + _rtu = None + _ascii = None + _binary = None + _manager = None + _tm = None + + # ----------------------------------------------------------------------- # + # Test Construction + # ----------------------------------------------------------------------- # + def setup_method(self): + """Set up the test environment.""" + self.client = None + self.decoder = ServerDecoder() + self._tcp = ModbusSocketFramer(decoder=self.decoder, client=None) + self._tls = ModbusTlsFramer(decoder=self.decoder, client=None) + self._rtu = ModbusRtuFramer(decoder=self.decoder, client=None) + self._ascii = ModbusAsciiFramer(decoder=self.decoder, client=None) + self._binary = ModbusBinaryFramer(decoder=self.decoder, client=None) + self._manager = ModbusTransactionManager(self.client) + + # ----------------------------------------------------------------------- # + # Modbus transaction manager + # ----------------------------------------------------------------------- # + + def test_calculate_expected_response_length(self): + """Test calculate expected response length.""" + self._manager.client = mock.MagicMock() + self._manager.client.framer = mock.MagicMock() + self._manager._set_adu_size() # pylint: disable=protected-access + assert not self._manager._calculate_response_length( # pylint: disable=protected-access + 0 + ) + self._manager.base_adu_size = 10 + assert ( + self._manager._calculate_response_length(5) # pylint: disable=protected-access + == 15 + ) + + def test_calculate_exception_length(self): + """Test calculate exception length.""" + for framer, exception_length in ( + ("ascii", 11), + ("binary", 7), + ("rtu", 5), + ("tcp", 9), + ("tls", 2), + ("dummy", None), + ): + self._manager.client = mock.MagicMock() + if framer == "ascii": + self._manager.client.framer = self._ascii + elif framer == "binary": + self._manager.client.framer = self._binary + elif framer == "rtu": + self._manager.client.framer = self._rtu + elif framer == "tcp": + self._manager.client.framer = self._tcp + elif framer == "tls": + self._manager.client.framer = self._tls + else: + self._manager.client.framer = mock.MagicMock() + + self._manager._set_adu_size() # pylint: disable=protected-access + assert ( + self._manager._calculate_exception_length() # pylint: disable=protected-access + == exception_length + ) + + @mock.patch("pymodbus.transaction.time") + def test_execute(self, mock_time): + """Test execute.""" + mock_time.time.side_effect = count() + + client = mock.MagicMock() + client.framer = self._ascii + client.framer._buffer = b"deadbeef" # pylint: disable=protected-access + client.framer.processIncomingPacket = mock.MagicMock() + client.framer.processIncomingPacket.return_value = None + client.framer.buildPacket = mock.MagicMock() + client.framer.buildPacket.return_value = b"deadbeef" + client.framer.sendPacket = mock.MagicMock() + client.framer.sendPacket.return_value = len(b"deadbeef") + client.framer.decode_data = mock.MagicMock() + client.framer.decode_data.return_value = { + "slave": 1, + "fcode": 222, + "length": 27, + } + request = mock.MagicMock() + request.get_response_pdu_size.return_value = 10 + request.slave_id = 1 + request.function_code = 222 + trans = ModbusTransactionManager(client) + trans._recv = mock.MagicMock( # pylint: disable=protected-access + return_value=b"abcdef" + ) + assert trans.retries == 3 + assert not trans.retry_on_empty + + trans.getTransaction = mock.MagicMock() + trans.getTransaction.return_value = "response" + response = trans.execute(request) + assert response == "response" + # No response + trans._recv = mock.MagicMock( # pylint: disable=protected-access + return_value=b"abcdef" + ) + trans.transactions = {} + trans.getTransaction = mock.MagicMock() + trans.getTransaction.return_value = None + response = trans.execute(request) + assert isinstance(response, ModbusIOException) + + # No response with retries + trans.retry_on_empty = True + trans._recv = mock.MagicMock( # pylint: disable=protected-access + side_effect=iter([b"", b"abcdef"]) + ) + response = trans.execute(request) + assert isinstance(response, ModbusIOException) + + # wrong handle_local_echo + trans._recv = mock.MagicMock( # pylint: disable=protected-access + side_effect=iter([b"abcdef", b"deadbe", b"123456"]) + ) + client.comm_params.handle_local_echo = True + trans.retry_on_empty = False + trans.retry_on_invalid = False + assert trans.execute(request).message == "[Input/Output] Wrong local echo" + client.comm_params.handle_local_echo = False + + # retry on invalid response + trans.retry_on_invalid = True + trans._recv = mock.MagicMock( # pylint: disable=protected-access + side_effect=iter([b"", b"abcdef", b"deadbe", b"123456"]) + ) + response = trans.execute(request) + assert isinstance(response, ModbusIOException) + + # Unable to decode response + trans._recv = mock.MagicMock( # pylint: disable=protected-access + side_effect=ModbusIOException() + ) + client.framer.processIncomingPacket.side_effect = mock.MagicMock( + side_effect=ModbusIOException() + ) + assert isinstance(trans.execute(request), ModbusIOException) + + # Broadcast + client.params.broadcast_enable = True + request.slave_id = 0 + response = trans.execute(request) + assert response == b"Broadcast write sent - no response expected" + + # Broadcast w/ Local echo + client.comm_params.handle_local_echo = True + client.params.broadcast_enable = True + recv = mock.MagicMock(return_value=b"deadbeef") + trans._recv = recv # pylint: disable=protected-access + request.slave_id = 0 + response = trans.execute(request) + assert response == b"Broadcast write sent - no response expected" + recv.assert_called_once_with(8, False) + client.comm_params.handle_local_echo = False + + def test_transaction_manager_tid(self): + """Test the transaction manager TID.""" + for tid in range(1, self._manager.getNextTID() + 10): + assert tid + 1 == self._manager.getNextTID() + self._manager.reset() + assert self._manager.getNextTID() == 1 + + def test_get_transaction_manager_transaction(self): + """Test the getting a transaction from the transaction manager.""" + + class Request: # pylint: disable=too-few-public-methods + """Request.""" + + self._manager.reset() + handle = Request() + handle.transaction_id = ( # pylint: disable=attribute-defined-outside-init + self._manager.getNextTID() + ) + handle.message = b"testing" # pylint: disable=attribute-defined-outside-init + self._manager.addTransaction(handle) + result = self._manager.getTransaction(handle.transaction_id) + assert handle.message == result.message + + def test_delete_transaction_manager_transaction(self): + """Test deleting a transaction from the dict transaction manager.""" + + class Request: # pylint: disable=too-few-public-methods + """Request.""" + + self._manager.reset() + handle = Request() + handle.transaction_id = ( # pylint: disable=attribute-defined-outside-init + self._manager.getNextTID() + ) + handle.message = b"testing" # pylint: disable=attribute-defined-outside-init + + self._manager.addTransaction(handle) + self._manager.delTransaction(handle.transaction_id) + assert not self._manager.getTransaction(handle.transaction_id) + + # ----------------------------------------------------------------------- # + # TCP tests + # ----------------------------------------------------------------------- # + def test_tcp_framer_transaction_ready(self): + """Test a tcp frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x01\x02\x00\x08" + self._tcp.processIncomingPacket(msg, callback, [1]) + self._tcp._buffer = msg # pylint: disable=protected-access + + def test_tcp_framer_transaction_full(self): + """Test a full tcp frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x01\x02\x00\x08" + self._tcp.processIncomingPacket(msg, callback, [0, 1]) + assert result.function_code.to_bytes(1,'big') + result.encode() == msg[7:] + + def test_tcp_framer_transaction_half(self): + """Test a half completed tcp frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg1 = b"\x00\x01\x12\x34\x00" + msg2 = b"\x06\xff\x02\x01\x02\x00\x08" + self._tcp.processIncomingPacket(msg1, callback, [0, 1]) + assert not result + self._tcp.processIncomingPacket(msg2, callback, [0, 1]) + assert result + assert result.function_code.to_bytes(1,'big') + result.encode() == msg2[2:] + + def test_tcp_framer_transaction_half2(self): + """Test a half completed tcp frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg1 = b"\x00\x01\x12\x34\x00\x06\xff" + msg2 = b"\x02\x01\x02\x00\x08" + self._tcp.processIncomingPacket(msg1, callback, [0, 1]) + assert not result + self._tcp.processIncomingPacket(msg2, callback, [0, 1]) + assert result + assert result.function_code.to_bytes(1,'big') + result.encode() == msg2 + + def test_tcp_framer_transaction_half3(self): + """Test a half completed tcp frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg1 = b"\x00\x01\x12\x34\x00\x06\xff\x02\x01\x02\x00" + msg2 = b"\x08" + self._tcp.processIncomingPacket(msg1, callback, [0, 1]) + assert not result + self._tcp.processIncomingPacket(msg2, callback, [0, 1]) + assert result + assert result.function_code.to_bytes(1,'big') + result.encode() == msg1[7:] + msg2 + + def test_tcp_framer_transaction_short(self): + """Test that we can get back on track after an invalid message.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + # msg1 = b"\x99\x99\x99\x99\x00\x01\x00\x17" + msg1 = b'' + msg2 = b"\x00\x01\x12\x34\x00\x06\xff\x02\x01\x02\x00\x08" + self._tcp.processIncomingPacket(msg1, callback, [0, 1]) + assert not result + self._tcp.processIncomingPacket(msg2, callback, [0, 1]) + assert result + assert result.function_code.to_bytes(1,'big') + result.encode() == msg2[7:] + + def test_tcp_framer_populate(self): + """Test a tcp frame packet build.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + expected = ModbusRequest() + expected.transaction_id = 0x0001 + expected.protocol_id = 0x1234 + expected.slave_id = 0xFF + msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x12\x34\x01\x02" + self._tcp.processIncomingPacket(msg, callback, [0, 1]) + # assert self._tcp.checkFrame() + # actual = ModbusRequest() + # self._tcp.populateResult(actual) + # for name in ("transaction_id", "protocol_id", "slave_id"): + # assert getattr(expected, name) == getattr(actual, name) + + # ----------------------------------------------------------------------- # + # TLS tests + # ----------------------------------------------------------------------- # + def test_framer_tls_framer_transaction_ready(self): + """Test a tls frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x12\x34\x01\x02" + self._tcp.processIncomingPacket(msg[0:4], callback, [0, 1]) + assert not result + self._tcp.processIncomingPacket(msg[4:], callback, [0, 1]) + assert result + + def test_framer_tls_framer_transaction_full(self): + """Test a full tls frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x12\x34\x01\x02" + self._tcp.processIncomingPacket(msg, callback, [0, 1]) + assert result + + def test_framer_tls_framer_transaction_half(self): + """Test a half completed tls frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x12\x34\x01\x02" + self._tcp.processIncomingPacket(msg[0:8], callback, [0, 1]) + assert not result + self._tcp.processIncomingPacket(msg[8:], callback, [0, 1]) + assert result + + def test_framer_tls_framer_transaction_short(self): + """Test that we can get back on track after an invalid message.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x12\x34\x01\x02" + self._tcp.processIncomingPacket(msg[0:2], callback, [0, 1]) + assert not result + self._tcp.processIncomingPacket(msg[2:], callback, [0, 1]) + assert result + + def test_framer_tls_framer_decode(self): + """Testmessage decoding.""" + msg1 = b"" + msg2 = b"\x01\x12\x34\x00\x08" + result = self._tls.decode_data(msg1) + assert not result + result = self._tls.decode_data(msg2) + assert result == {"fcode": 1} + + def test_framer_tls_incoming_packet(self): + """Framer tls incoming packet.""" + msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x12\x34\x01\x02" + + slave = 0x01 + msg_result = None + + def mock_callback(result): + """Mock callback.""" + nonlocal msg_result + + msg_result = result.encode() + + self._tls.processIncomingPacket(msg, mock_callback, slave) + # assert msg == msg_result + + # self._tls.isFrameReady = mock.MagicMock(return_value=True) + # x = mock.MagicMock(return_value=False) + # self._tls._validate_slave_id = x + # self._tls.processIncomingPacket(msg, mock_callback, slave) + # assert not self._tls._buffer + # self._tls.advanceFrame() + # x = mock.MagicMock(return_value=True) + # self._tls._validate_slave_id = x + # self._tls.processIncomingPacket(msg, mock_callback, slave) + # assert msg[1:] == msg_result + # self._tls.advanceFrame() + + def test_framer_tls_process(self): + """Framer tls process.""" + # class MockResult: + # """Mock result.""" + + # def __init__(self, code): + # """Init.""" + # self.function_code = code + + # def mock_callback(_arg): + # """Mock callback.""" + + # self._tls.decoder.decode = mock.MagicMock(return_value=None) + # with pytest.raises(ModbusIOException): + # self._tls._process(mock_callback) + + # result = MockResult(0x01) + # self._tls.decoder.decode = mock.MagicMock(return_value=result) + # with pytest.raises(InvalidMessageReceivedException): + # self._tls._process( + # mock_callback, error=True + # ) + # self._tls._process(mock_callback) + # assert not self._tls._buffer + + def test_framer_tls_framer_populate(self): + """Test a tls frame packet build.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x12\x34\x01\x02" + self._tcp.processIncomingPacket(msg, callback, [0, 1]) + assert result + + # ----------------------------------------------------------------------- # + # RTU tests + # ----------------------------------------------------------------------- # + def test_rtu_framer_transaction_ready(self): + """Test if the checks for a complete frame work.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg_parts = [b"\x00\x01\x00", b"\x00\x00\x01\xfc\x1b"] + self._rtu.processIncomingPacket(msg_parts[0], callback, [0, 1]) + assert not result + self._rtu.processIncomingPacket(msg_parts[1], callback, [0, 1]) + assert result + + def test_rtu_framer_transaction_full(self): + """Test a full rtu frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b"\x00\x01\x00\x00\x00\x01\xfc\x1b" + self._rtu.processIncomingPacket(msg, callback, [0, 1]) + assert result + + def test_rtu_framer_transaction_half(self): + """Test a half completed rtu frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg_parts = [b"\x00\x01\x00", b"\x00\x00\x01\xfc\x1b"] + self._rtu.processIncomingPacket(msg_parts[0], callback, [0, 1]) + assert not result + self._rtu.processIncomingPacket(msg_parts[1], callback, [0, 1]) + assert result + + def test_rtu_framer_populate(self): + """Test a rtu frame packet build.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b"\x00\x01\x00\x00\x00\x01\xfc\x1b" + self._rtu.processIncomingPacket(msg, callback, [0, 1]) + header_dict = self._rtu._header # pylint: disable=protected-access + assert len(msg) == header_dict["len"] + assert int(msg[0]) == header_dict["uid"] + assert msg[-2:] == header_dict["crc"] + + def test_rtu_decode_exception(self): + """Test that the RTU framer can decode errors.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b"\x00\x90\x02\x9c\x01" + self._rtu.processIncomingPacket(msg, callback, [0, 1]) + assert result + + def test_process(self): + """Test process.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b"\x00\x01\x00\x00\x00\x01\xfc\x1b" + self._rtu.processIncomingPacket(msg, callback, [0, 1]) + assert result + + def test_rtu_process_incoming_packets(self): + """Test rtu process incoming packets.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b"\x00\x01\x00\x00\x00\x01\xfc\x1b" + slave = 0x00 + + self._rtu.processIncomingPacket(msg, callback, slave) + assert result + + # ----------------------------------------------------------------------- # + # ASCII tests + # ----------------------------------------------------------------------- # + def test_ascii_framer_transaction_ready(self): + """Test a ascii frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b":F7031389000A60\r\n" + self._ascii.processIncomingPacket(msg, callback, [0,1]) + assert result + + def test_ascii_framer_transaction_full(self): + """Test a full ascii frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b"sss:F7031389000A60\r\n" + self._ascii.processIncomingPacket(msg, callback, [0,1]) + assert result + + def test_ascii_framer_transaction_half(self): + """Test a half completed ascii frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg_parts = (b"sss:F7031389", b"000A60\r\n") + self._ascii.processIncomingPacket(msg_parts[0], callback, [0,1]) + assert not result + self._ascii.processIncomingPacket(msg_parts[1], callback, [0,1]) + assert result + + def test_ascii_framer_populate(self): + """Test a ascii frame packet build.""" + request = ModbusRequest() + self._ascii.populateResult(request) + assert not request.slave_id + + def test_ascii_process_incoming_packets(self): + """Test ascii process incoming packet.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b":F7031389000A60\r\n" + self._ascii.processIncomingPacket(msg, callback, [0,1]) + assert result + + # ----------------------------------------------------------------------- # + # Binary tests + # ----------------------------------------------------------------------- # + def test_binary_framer_transaction_ready(self): + """Test a binary frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = TEST_MESSAGE + self._binary.processIncomingPacket(msg, callback, [0,1]) + assert result + + def test_binary_framer_transaction_full(self): + """Test a full binary frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = TEST_MESSAGE + self._binary.processIncomingPacket(msg, callback, [0,1]) + assert result + + def test_binary_framer_transaction_half(self): + """Test a half completed binary frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg_parts = (b"\x7b\x01\x03\x00", b"\x00\x00\x05\x85\xC9\x7d") + self._binary.processIncomingPacket(msg_parts[0], callback, [0,1]) + assert not result + self._binary.processIncomingPacket(msg_parts[1], callback, [0,1]) + assert result + + def test_binary_framer_populate(self): + """Test a binary frame packet build.""" + request = ModbusRequest() + self._binary.populateResult(request) + assert not request.slave_id + + def test_binary_process_incoming_packet(self): + """Test binary process incoming packet.""" + mock_data = TEST_MESSAGE + slave = 0x00 + + def mock_callback(_mock_data): + pass + + self._binary.processIncomingPacket(mock_data, mock_callback, slave) + + # Test failure: + self._binary.checkFrame = mock.MagicMock(return_value=False) + self._binary.processIncomingPacket(mock_data, mock_callback, slave) diff --git a/test/message/to_do_transaction_py b/test/message/to_do_transaction_py deleted file mode 100755 index 48a182560..000000000 --- a/test/message/to_do_transaction_py +++ /dev/null @@ -1,782 +0,0 @@ -"""Test transaction.""" -from binascii import a2b_hex -from itertools import count -from unittest import mock - -import pytest - -from pymodbus.exceptions import ( - InvalidMessageReceivedException, - ModbusIOException, -) -from pymodbus.factory import ServerDecoder -from pymodbus.pdu import ModbusRequest -from pymodbus.transaction import ( - ModbusAsciiFramer, - ModbusBinaryFramer, - ModbusRtuFramer, - ModbusSocketFramer, - ModbusTlsFramer, - ModbusTransactionManager, -) - - -TEST_MESSAGE = b"\x7b\x01\x03\x00\x00\x00\x05\x85\xC9\x7d" - - -class TestTransaction: # pylint: disable=too-many-public-methods - """Unittest for the pymodbus.transaction module.""" - - client = None - decoder = None - _tcp = None - _tls = None - _rtu = None - _ascii = None - _binary = None - _manager = None - _tm = None - - # ----------------------------------------------------------------------- # - # Test Construction - # ----------------------------------------------------------------------- # - def setup_method(self): - """Set up the test environment.""" - self.client = None - self.decoder = ServerDecoder() - self._tcp = ModbusSocketFramer(decoder=self.decoder, client=None) - self._tls = ModbusTlsFramer(decoder=self.decoder, client=None) - self._rtu = ModbusRtuFramer(decoder=self.decoder, client=None) - self._ascii = ModbusAsciiFramer(decoder=self.decoder, client=None) - self._binary = ModbusBinaryFramer(decoder=self.decoder, client=None) - self._manager = ModbusTransactionManager(self.client) - - # ----------------------------------------------------------------------- # - # Modbus transaction manager - # ----------------------------------------------------------------------- # - - def test_calculate_expected_response_length(self): - """Test calculate expected response length.""" - self._manager.client = mock.MagicMock() - self._manager.client.framer = mock.MagicMock() - self._manager._set_adu_size() # pylint: disable=protected-access - assert not self._manager._calculate_response_length( # pylint: disable=protected-access - 0 - ) - self._manager.base_adu_size = 10 - assert ( - self._manager._calculate_response_length(5) # pylint: disable=protected-access - == 15 - ) - - def test_calculate_exception_length(self): - """Test calculate exception length.""" - for framer, exception_length in ( - ("ascii", 11), - ("binary", 7), - ("rtu", 5), - ("tcp", 9), - ("tls", 2), - ("dummy", None), - ): - self._manager.client = mock.MagicMock() - if framer == "ascii": - self._manager.client.framer = self._ascii - elif framer == "binary": - self._manager.client.framer = self._binary - elif framer == "rtu": - self._manager.client.framer = self._rtu - elif framer == "tcp": - self._manager.client.framer = self._tcp - elif framer == "tls": - self._manager.client.framer = self._tls - else: - self._manager.client.framer = mock.MagicMock() - - self._manager._set_adu_size() # pylint: disable=protected-access - assert ( - self._manager._calculate_exception_length() # pylint: disable=protected-access - == exception_length - ) - - @mock.patch("pymodbus.transaction.time") - def test_execute(self, mock_time): - """Test execute.""" - mock_time.time.side_effect = count() - - client = mock.MagicMock() - client.framer = self._ascii - client.framer._buffer = b"deadbeef" # pylint: disable=protected-access - client.framer.processIncomingPacket = mock.MagicMock() - client.framer.processIncomingPacket.return_value = None - client.framer.buildPacket = mock.MagicMock() - client.framer.buildPacket.return_value = b"deadbeef" - client.framer.sendPacket = mock.MagicMock() - client.framer.sendPacket.return_value = len(b"deadbeef") - client.framer.decode_data = mock.MagicMock() - client.framer.decode_data.return_value = { - "slave": 1, - "fcode": 222, - "length": 27, - } - request = mock.MagicMock() - request.get_response_pdu_size.return_value = 10 - request.slave_id = 1 - request.function_code = 222 - trans = ModbusTransactionManager(client) - trans._recv = mock.MagicMock( # pylint: disable=protected-access - return_value=b"abcdef" - ) - assert trans.retries == 3 - assert not trans.retry_on_empty - - trans.getTransaction = mock.MagicMock() - trans.getTransaction.return_value = "response" - response = trans.execute(request) - assert response == "response" - # No response - trans._recv = mock.MagicMock( # pylint: disable=protected-access - return_value=b"abcdef" - ) - trans.transactions = {} - trans.getTransaction = mock.MagicMock() - trans.getTransaction.return_value = None - response = trans.execute(request) - assert isinstance(response, ModbusIOException) - - # No response with retries - trans.retry_on_empty = True - trans._recv = mock.MagicMock( # pylint: disable=protected-access - side_effect=iter([b"", b"abcdef"]) - ) - response = trans.execute(request) - assert isinstance(response, ModbusIOException) - - # wrong handle_local_echo - trans._recv = mock.MagicMock( # pylint: disable=protected-access - side_effect=iter([b"abcdef", b"deadbe", b"123456"]) - ) - client.comm_params.handle_local_echo = True - trans.retry_on_empty = False - trans.retry_on_invalid = False - assert trans.execute(request).message == "[Input/Output] Wrong local echo" - client.comm_params.handle_local_echo = False - - # retry on invalid response - trans.retry_on_invalid = True - trans._recv = mock.MagicMock( # pylint: disable=protected-access - side_effect=iter([b"", b"abcdef", b"deadbe", b"123456"]) - ) - response = trans.execute(request) - assert isinstance(response, ModbusIOException) - - # Unable to decode response - trans._recv = mock.MagicMock( # pylint: disable=protected-access - side_effect=ModbusIOException() - ) - client.framer.processIncomingPacket.side_effect = mock.MagicMock( - side_effect=ModbusIOException() - ) - assert isinstance(trans.execute(request), ModbusIOException) - - # Broadcast - client.params.broadcast_enable = True - request.slave_id = 0 - response = trans.execute(request) - assert response == b"Broadcast write sent - no response expected" - - # Broadcast w/ Local echo - client.comm_params.handle_local_echo = True - client.params.broadcast_enable = True - recv = mock.MagicMock(return_value=b"deadbeef") - trans._recv = recv # pylint: disable=protected-access - request.slave_id = 0 - response = trans.execute(request) - assert response == b"Broadcast write sent - no response expected" - recv.assert_called_once_with(8, False) - client.comm_params.handle_local_echo = False - - def test_transaction_manager_tid(self): - """Test the transaction manager TID.""" - for tid in range(1, self._manager.getNextTID() + 10): - assert tid + 1 == self._manager.getNextTID() - self._manager.reset() - assert self._manager.getNextTID() == 1 - - def test_get_transaction_manager_transaction(self): - """Test the getting a transaction from the transaction manager.""" - - class Request: # pylint: disable=too-few-public-methods - """Request.""" - - self._manager.reset() - handle = Request() - handle.transaction_id = ( # pylint: disable=attribute-defined-outside-init - self._manager.getNextTID() - ) - handle.message = b"testing" # pylint: disable=attribute-defined-outside-init - self._manager.addTransaction(handle) - result = self._manager.getTransaction(handle.transaction_id) - assert handle.message == result.message - - def test_delete_transaction_manager_transaction(self): - """Test deleting a transaction from the dict transaction manager.""" - - class Request: # pylint: disable=too-few-public-methods - """Request.""" - - self._manager.reset() - handle = Request() - handle.transaction_id = ( # pylint: disable=attribute-defined-outside-init - self._manager.getNextTID() - ) - handle.message = b"testing" # pylint: disable=attribute-defined-outside-init - - self._manager.addTransaction(handle) - self._manager.delTransaction(handle.transaction_id) - assert not self._manager.getTransaction(handle.transaction_id) - - # ----------------------------------------------------------------------- # - # TCP tests - # ----------------------------------------------------------------------- # - @pytest.mark.skip() - def test_tcp_framer_transaction_ready(self): - """Test a tcp frame transaction.""" - msg = b"\x00\x01\x12\x34\x00\x04\xff\x02\x12\x34" - assert not self._tcp.isFrameReady() - assert not self._tcp.checkFrame() - self._tcp._buffer = msg # pylint: disable=protected-access - assert self._tcp.isFrameReady() - assert self._tcp.checkFrame() - self._tcp.advanceFrame() - assert not self._tcp.isFrameReady() - assert not self._tcp.checkFrame() - # assert self._ascii.getFrame() == b"" - - @pytest.mark.skip() - def test_tcp_framer_transaction_full(self): - """Test a full tcp frame transaction.""" - msg = b"\x00\x01\x12\x34\x00\x04\xff\x02\x12\x34" - self._tcp._buffer = msg # pylint: disable=protected-access - assert self._tcp.checkFrame() - result = self._tcp.getFrame() - assert result == msg[7:] - self._tcp.advanceFrame() - - @pytest.mark.skip() - def test_tcp_framer_transaction_half(self): - """Test a half completed tcp frame transaction.""" - msg1 = b"\x00\x01\x12\x34\x00" - msg2 = b"\x04\xff\x02\x12\x34" - self._tcp._buffer = msg1 # pylint: disable=protected-access - assert not self._tcp.checkFrame() - result = self._tcp.getFrame() - assert result == b"" - self._tcp._buffer += msg2 - assert self._tcp.checkFrame() - result = self._tcp.getFrame() - assert result == msg2[2:] - self._tcp.advanceFrame() - - @pytest.mark.skip() - def test_tcp_framer_transaction_half2(self): - """Test a half completed tcp frame transaction.""" - msg1 = b"\x00\x01\x12\x34\x00\x04\xff" - msg2 = b"\x02\x12\x34" - self._tcp._buffer = msg1 # pylint: disable=protected-access - assert not self._tcp.checkFrame() - result = self._tcp.getFrame() - assert result == b"" - self._tcp._buffer += msg2 - assert self._tcp.checkFrame() - result = self._tcp.getFrame() - assert msg2 == result - self._tcp.advanceFrame() - - @pytest.mark.skip() - def test_tcp_framer_transaction_half3(self): - """Test a half completed tcp frame transaction.""" - msg1 = b"\x00\x01\x12\x34\x00\x04\xff\x02\x12" - msg2 = b"\x34" - self._tcp._buffer = msg1 # pylint: disable=protected-access - assert not self._tcp.checkFrame() - result = self._tcp.getFrame() - assert result == msg1[7:] - self._tcp._buffer += msg2 - assert self._tcp.checkFrame() - result = self._tcp.getFrame() - assert result == msg1[7:] + msg2 - self._tcp.advanceFrame() - - @pytest.mark.skip() - def test_tcp_framer_transaction_short(self): - """Test that we can get back on track after an invalid message.""" - msg1 = b"\x99\x99\x99\x99\x00\x01\x00\x01" - msg2 = b"\x00\x01\x12\x34\x00\x04\xff\x02\x12\x34" - self._tcp._buffer = msg1 # pylint: disable=protected-access - assert not self._tcp.checkFrame() - result = self._tcp.getFrame() - assert result == b"" - self._tcp.advanceFrame() - self._tcp._buffer += msg2 - assert len(self._tcp._buffer) == 10 # pylint: disable=protected-access - assert self._tcp.checkFrame() - result = self._tcp.getFrame() - assert result == msg2[7:] - self._tcp.advanceFrame() - - @pytest.mark.skip() - def test_tcp_framer_populate(self): - """Test a tcp frame packet build.""" - expected = ModbusRequest() - expected.transaction_id = 0x0001 - expected.protocol_id = 0x1234 - expected.slave_id = 0xFF - msg = b"\x00\x01\x12\x34\x00\x04\xff\x02\x12\x34" - self._tcp._buffer = msg # pylint: disable=protected-access - assert self._tcp.checkFrame() - actual = ModbusRequest() - self._tcp.populateResult(actual) - for name in ("transaction_id", "protocol_id", "slave_id"): - assert getattr(expected, name) == getattr(actual, name) - self._tcp.advanceFrame() - - def test_tcp_framer_packet(self): - """Test a tcp frame packet build.""" - old_encode = ModbusRequest.encode - ModbusRequest.encode = lambda self: b"" - message = ModbusRequest() - message.transaction_id = 0x0001 - message.protocol_id = 0x1234 - message.slave_id = 0xFF - message.function_code = 0x01 - expected = b"\x00\x01\x12\x34\x00\x02\xff\x01" - actual = self._tcp.buildPacket(message) - assert expected == actual - ModbusRequest.encode = old_encode - - # ----------------------------------------------------------------------- # - # TLS tests - # ----------------------------------------------------------------------- # - @pytest.mark.skip() - def test_framer_tls_framer_transaction_ready(self): - """Test a tls frame transaction.""" - msg = b"\x01\x12\x34\x00\x08" - assert not self._tls.isFrameReady() - assert not self._tls.checkFrame() - self._tls._buffer = msg # pylint: disable=protected-access - assert self._tls.isFrameReady() - assert self._tls.checkFrame() - self._tls.advanceFrame() - assert not self._tls.isFrameReady() - assert not self._tls.checkFrame() - assert self._tls.getFrame() == b"" - - @pytest.mark.skip() - def test_framer_tls_framer_transaction_full(self): - """Test a full tls frame transaction.""" - msg = b"\x01\x12\x34\x00\x08" - self._tls._buffer = msg # pylint: disable=protected-access - assert self._tls.checkFrame() - result = self._tls.getFrame() - assert result == msg[0:] - self._tls.advanceFrame() - - @pytest.mark.skip() - def test_framer_tls_framer_transaction_half(self): - """Test a half completed tls frame transaction.""" - msg1 = b"" - msg2 = b"\x01\x12\x34\x00\x08" - self._tls._buffer = msg1 # pylint: disable=protected-access - assert not self._tls.checkFrame() - result = self._tls.getFrame() - assert result == b"" - self._tls._buffer += msg2 - assert self._tls.checkFrame() - result = self._tls.getFrame() - assert result == msg2[0:] - self._tls.advanceFrame() - - @pytest.mark.skip() - def test_framer_tls_framer_transaction_short(self): - """Test that we can get back on track after an invalid message.""" - msg1 = b"" - msg2 = b"\x01\x12\x34\x00\x08" - self._tls._buffer = msg1 # pylint: disable=protected-access - assert not self._tls.checkFrame() - result = self._tls.getFrame() - assert result == b"" - self._tls.advanceFrame() - self._tls._buffer = msg2 # pylint: disable=protected-access - assert len(self._tls._buffer) == 5 # pylint: disable=protected-access - assert self._tls.checkFrame() - result = self._tls.getFrame() - assert result == msg2[0:] - self._tls.advanceFrame() - - @pytest.mark.skip() - def test_framer_tls_framer_decode(self): - """Testmessage decoding.""" - msg1 = b"" - msg2 = b"\x01\x12\x34\x00\x08" - result = self._tls.decode_data(msg1) - assert not result - result = self._tls.decode_data(msg2) - assert result == {"fcode": 1} - self._tls.advanceFrame() - - @pytest.mark.skip() - def test_framer_tls_incoming_packet(self): - """Framer tls incoming packet.""" - msg = b"\x01\x12\x34\x00\x08" - - slave = 0x01 - msg_result = None - - def mock_callback(result): - """Mock callback.""" - nonlocal msg_result - - msg_result = result.encode() - - self._tls.isFrameReady = mock.MagicMock(return_value=False) - self._tls.processIncomingPacket(msg, mock_callback, slave) - assert msg == self._tls._buffer # pylint: disable=protected-access - self._tls.advanceFrame() - - self._tls.isFrameReady = mock.MagicMock(return_value=True) - x = mock.MagicMock(return_value=False) - self._tls._validate_slave_id = x # pylint: disable=protected-access - self._tls.processIncomingPacket(msg, mock_callback, slave) - assert not self._tls._buffer # pylint: disable=protected-access - self._tls.advanceFrame() - x = mock.MagicMock(return_value=True) - self._tls._validate_slave_id = x # pylint: disable=protected-access - self._tls.processIncomingPacket(msg, mock_callback, slave) - assert msg[1:] == msg_result - self._tls.advanceFrame() - - @pytest.mark.skip() - def test_framer_tls_process(self): - """Framer tls process.""" - - class MockResult: # pylint: disable=too-few-public-methods - """Mock result.""" - - def __init__(self, code): - """Init.""" - self.function_code = code - - def mock_callback(_arg): - """Mock callback.""" - - self._tls.decoder.decode = mock.MagicMock(return_value=None) - with pytest.raises(ModbusIOException): - self._tls._process(mock_callback) # pylint: disable=protected-access - - result = MockResult(0x01) - self._tls.decoder.decode = mock.MagicMock(return_value=result) - with pytest.raises(InvalidMessageReceivedException): - self._tls._process( # pylint: disable=protected-access - mock_callback, error=True - ) - self._tls._process(mock_callback) # pylint: disable=protected-access - assert not self._tls._buffer # pylint: disable=protected-access - - @pytest.mark.skip() - def test_framer_tls_framer_populate(self): - """Test a tls frame packet build.""" - ModbusRequest() - msg = b"\x01\x12\x34\x00\x08" - self._tls._buffer = msg # pylint: disable=protected-access - assert self._tls.checkFrame() - actual = ModbusRequest() - self._tls.populateResult(actual) - self._tls.advanceFrame() - - def test_framer_tls_framer_packet(self): - """Test a tls frame packet build.""" - old_encode = ModbusRequest.encode - ModbusRequest.encode = lambda self: b"" - message = ModbusRequest() - message.function_code = 0x01 - expected = b"\x01" - actual = self._tls.buildPacket(message) - assert expected == actual - ModbusRequest.encode = old_encode - - # ----------------------------------------------------------------------- # - # RTU tests - # ----------------------------------------------------------------------- # - @pytest.mark.skip() - def test_rtu_framer_transaction_ready(self): - """Test if the checks for a complete frame work.""" - assert not self._rtu.isFrameReady() - - msg_parts = [b"\x00\x01\x00", b"\x00\x00\x01\xfc\x1b"] - self._rtu._buffer = msg_parts[0] # pylint: disable=protected-access - assert not self._rtu.isFrameReady() - assert not self._rtu.checkFrame() - - self._rtu._buffer += msg_parts[1] - assert self._rtu.isFrameReady() - assert self._rtu.checkFrame() - - @pytest.mark.skip() - def test_rtu_framer_transaction_full(self): - """Test a full rtu frame transaction.""" - msg = b"\x00\x01\x00\x00\x00\x01\xfc\x1b" - stripped_msg = msg[1:-2] - self._rtu._buffer = msg # pylint: disable=protected-access - assert self._rtu.checkFrame() - result = self._rtu.getFrame() - assert stripped_msg == result - self._rtu.advanceFrame() - - @pytest.mark.skip() - def test_rtu_framer_transaction_half(self): - """Test a half completed rtu frame transaction.""" - msg_parts = [b"\x00\x01\x00", b"\x00\x00\x01\xfc\x1b"] - stripped_msg = b"".join(msg_parts)[1:-2] - self._rtu._buffer = msg_parts[0] # pylint: disable=protected-access - assert not self._rtu.checkFrame() - self._rtu._buffer += msg_parts[1] - assert self._rtu.isFrameReady() - assert self._rtu.checkFrame() - result = self._rtu.getFrame() - assert stripped_msg == result - self._rtu.advanceFrame() - - @pytest.mark.skip() - def test_rtu_framer_populate(self): - """Test a rtu frame packet build.""" - request = ModbusRequest() - msg = b"\x00\x01\x00\x00\x00\x01\xfc\x1b" - self._rtu._buffer = msg # pylint: disable=protected-access - self._rtu.populateHeader() - self._rtu.populateResult(request) - - header_dict = self._rtu._header # pylint: disable=protected-access - assert len(msg) == header_dict["len"] - assert int(msg[0]) == header_dict["uid"] - assert msg[-2:] == header_dict["crc"] - assert not request.slave_id - - def test_rtu_framer_packet(self): - """Test a rtu frame packet build.""" - old_encode = ModbusRequest.encode - ModbusRequest.encode = lambda self: b"" - message = ModbusRequest() - message.slave_id = 0xFF - message.function_code = 0x01 - expected = b"\xff\x01\x81\x80" # only header + CRC - no data - actual = self._rtu.buildPacket(message) - assert expected == actual - ModbusRequest.encode = old_encode - - @pytest.mark.skip() - def test_rtu_decode_exception(self): - """Test that the RTU framer can decode errors.""" - message = b"\x00\x90\x02\x9c\x01" - self._rtu._buffer = message # pylint: disable=protected-access - result = self._rtu.checkFrame() - assert result - - @pytest.mark.skip() - def test_process(self): - """Test process.""" - - class MockResult: # pylint: disable=too-few-public-methods - """Mock result.""" - - def __init__(self, code): - self.function_code = code - - def mock_callback(_arg): - """Mock callback.""" - - mock_result = MockResult(code=0) - self._rtu.getFrame = mock.MagicMock() - self._rtu.decoder = mock.MagicMock() - self._rtu.decoder.decode = mock.MagicMock(return_value=mock_result) - self._rtu.populateResult = mock.MagicMock() - self._rtu.advanceFrame = mock.MagicMock() - - self._rtu._process(mock_callback) # pylint: disable=protected-access - self._rtu.populateResult.assert_called_with(mock_result) - self._rtu.advanceFrame.assert_called_with() - assert self._rtu.advanceFrame.called - - # Check errors - self._rtu.decoder.decode = mock.MagicMock(return_value=None) - with pytest.raises(ModbusIOException): - self._rtu._process(mock_callback) # pylint: disable=protected-access - - @pytest.mark.skip() - def test_rtu_process_incoming_packets(self): - """Test rtu process incoming packets.""" - mock_data = b"\x00\x01\x00\x00\x00\x01\xfc\x1b" - slave = 0x00 - - def mock_callback(): - """Mock callback.""" - - self._rtu._buffer = mock.MagicMock() # pylint: disable=protected-access - self._rtu._process = mock.MagicMock() # pylint: disable=protected-access - self._rtu.isFrameReady = mock.MagicMock(return_value=False) - self._rtu._buffer = mock_data # pylint: disable=protected-access - - self._rtu.processIncomingPacket(mock_data, mock_callback, slave) - - # ----------------------------------------------------------------------- # - # ASCII tests - # ----------------------------------------------------------------------- # - @pytest.mark.skip() - def test_ascii_framer_transaction_ready(self): - """Test a ascii frame transaction.""" - msg = b":F7031389000A60\r\n" - assert not self._ascii.isFrameReady() - assert not self._ascii.checkFrame() - self._ascii._buffer = msg # pylint: disable=protected-access - assert self._ascii.isFrameReady() - assert self._ascii.checkFrame() - self._ascii.advanceFrame() - assert not self._ascii.isFrameReady() - assert not self._ascii.checkFrame() - assert not self._ascii.getFrame() - - @pytest.mark.skip() - def test_ascii_framer_transaction_full(self): - """Test a full ascii frame transaction.""" - msg = b"sss:F7031389000A60\r\n" - pack = a2b_hex(msg[6:-4]) - self._ascii._buffer = msg # pylint: disable=protected-access - assert self._ascii.checkFrame() - result = self._ascii.getFrame() - assert pack == result - self._ascii.advanceFrame() - - @pytest.mark.skip() - def test_ascii_framer_transaction_half(self): - """Test a half completed ascii frame transaction.""" - msg1 = b"sss:F7031389" - msg2 = b"000A60\r\n" - pack = a2b_hex(msg1[6:] + msg2[:-4]) - self._ascii._buffer = msg1 # pylint: disable=protected-access - assert not self._ascii.checkFrame() - result = self._ascii.getFrame() - assert not result - self._ascii._buffer += msg2 - assert self._ascii.checkFrame() - result = self._ascii.getFrame() - assert pack == result - self._ascii.advanceFrame() - - def test_ascii_framer_populate(self): - """Test a ascii frame packet build.""" - request = ModbusRequest() - self._ascii.populateResult(request) - assert not request.slave_id - - def test_ascii_framer_packet(self): - """Test a ascii frame packet build.""" - old_encode = ModbusRequest.encode - ModbusRequest.encode = lambda self: b"" - message = ModbusRequest() - message.slave_id = 0xFF - message.function_code = 0x01 - expected = b":FF0100\r\n" - actual = self._ascii.buildPacket(message) - assert expected == actual - ModbusRequest.encode = old_encode - - def test_ascii_process_incoming_packets(self): - """Test ascii process incoming packet.""" - mock_data = b":F7031389000A60\r\n" - slave = 0x00 - - def mock_callback(_mock_data, *_args, **_kwargs): - """Mock callback.""" - - self._ascii.processIncomingPacket(mock_data, mock_callback, slave) - - # Test failure: - self._ascii.checkFrame = mock.MagicMock(return_value=False) - self._ascii.processIncomingPacket(mock_data, mock_callback, slave) - - # ----------------------------------------------------------------------- # - # Binary tests - # ----------------------------------------------------------------------- # - @pytest.mark.skip() - def test_binary_framer_transaction_ready(self): - """Test a binary frame transaction.""" - msg = TEST_MESSAGE - assert not self._binary.isFrameReady() - assert not self._binary.checkFrame() - self._binary._buffer = msg # pylint: disable=protected-access - assert self._binary.isFrameReady() - assert self._binary.checkFrame() - self._binary.advanceFrame() - assert not self._binary.isFrameReady() - assert not self._binary.checkFrame() - assert not self._binary.getFrame() - - @pytest.mark.skip() - def test_binary_framer_transaction_full(self): - """Test a full binary frame transaction.""" - msg = TEST_MESSAGE - pack = msg[2:-3] - self._binary._buffer = msg # pylint: disable=protected-access - assert self._binary.checkFrame() - result = self._binary.getFrame() - assert pack == result - self._binary.advanceFrame() - - @pytest.mark.skip() - def test_binary_framer_transaction_half(self): - """Test a half completed binary frame transaction.""" - msg1 = b"\x7b\x01\x03\x00" - msg2 = b"\x00\x00\x05\x85\xC9\x7d" - pack = msg1[2:] + msg2[:-3] - self._binary._buffer = msg1 # pylint: disable=protected-access - assert not self._binary.checkFrame() - result = self._binary.getFrame() - assert not result - self._binary._buffer += msg2 - - assert self._binary.checkFrame() - result = self._binary.getFrame() - assert pack == result - self._binary.advanceFrame() - - def test_binary_framer_populate(self): - """Test a binary frame packet build.""" - request = ModbusRequest() - self._binary.populateResult(request) - assert not request.slave_id - - def test_binary_framer_packet(self): - """Test a binary frame packet build.""" - old_encode = ModbusRequest.encode - ModbusRequest.encode = lambda self: b"" - message = ModbusRequest() - message.slave_id = 0xFF - message.function_code = 0x01 - expected = b"\x7b\xff\x01\x81\x80\x7d" - actual = self._binary.buildPacket(message) - assert expected == actual - ModbusRequest.encode = old_encode - - def test_binary_process_incoming_packet(self): - """Test binary process incoming packet.""" - mock_data = TEST_MESSAGE - slave = 0x00 - - def mock_callback(_mock_data): - pass - - self._binary.processIncomingPacket(mock_data, mock_callback, slave) - - # Test failure: - self._binary.checkFrame = mock.MagicMock(return_value=False) - self._binary.processIncomingPacket(mock_data, mock_callback, slave) diff --git a/test/test_transaction.py b/test/test_transaction.py index 233730dd1..97a53a1a3 100755 --- a/test/test_transaction.py +++ b/test/test_transaction.py @@ -364,10 +364,10 @@ def test_tcp_framer_packet(self): ModbusRequest.encode = lambda self: b"" message = ModbusRequest() message.transaction_id = 0x0001 - message.protocol_id = 0x1234 + message.protocol_id = 0x0000 message.slave_id = 0xFF message.function_code = 0x01 - expected = b"\x00\x01\x12\x34\x00\x02\xff\x01" + expected = b"\x00\x01\x00\x00\x00\x02\xff\x01" actual = self._tcp.buildPacket(message) assert expected == actual ModbusRequest.encode = old_encode From 6b50249067669b6e219828861d21e4972212f419 Mon Sep 17 00:00:00 2001 From: jan iversen Date: Tue, 5 Mar 2024 10:37:47 +0100 Subject: [PATCH 2/4] CRC/LRC Check integrated. --- pymodbus/framer/ascii_framer.py | 1 + pymodbus/framer/binary_framer.py | 6 ++-- pymodbus/framer/rtu_framer.py | 8 +++-- pymodbus/framer/socket_framer.py | 2 ++ pymodbus/framer/tls_framer.py | 2 ++ pymodbus/utilities.py | 53 -------------------------------- test/test_utilities.py | 6 ---- 7 files changed, 13 insertions(+), 65 deletions(-) diff --git a/pymodbus/framer/ascii_framer.py b/pymodbus/framer/ascii_framer.py index 9d68ea46a..0d999cb73 100644 --- a/pymodbus/framer/ascii_framer.py +++ b/pymodbus/framer/ascii_framer.py @@ -41,6 +41,7 @@ def __init__(self, decoder, client=None): self._hsize = 0x02 self._start = b":" self._end = b"\r\n" + self.message_handler = MessageAscii([0], True) def decode_data(self, data): """Decode data.""" diff --git a/pymodbus/framer/binary_framer.py b/pymodbus/framer/binary_framer.py index 07808a4de..ce56581f5 100644 --- a/pymodbus/framer/binary_framer.py +++ b/pymodbus/framer/binary_framer.py @@ -5,7 +5,7 @@ from pymodbus.exceptions import ModbusIOException from pymodbus.framer.base import BYTE_ORDER, FRAME_HEADER, ModbusFramer from pymodbus.logging import Log -from pymodbus.utilities import checkCRC, computeCRC +from pymodbus.message.rtu import MessageRTU BINARY_FRAME_HEADER = BYTE_ORDER + FRAME_HEADER @@ -76,7 +76,7 @@ def check_frame(self) -> bool: self._header["uid"] = struct.unpack(">B", self._buffer[1:2])[0] self._header["crc"] = struct.unpack(">H", self._buffer[end - 2 : end])[0] data = self._buffer[1 : end - 2] - return checkCRC(data, self._header["crc"]) + return MessageRTU.check_CRC(data, self._header["crc"]) return False while len(self._buffer) > 1: @@ -113,7 +113,7 @@ def buildPacket(self, message): struct.pack(BINARY_FRAME_HEADER, message.slave_id, message.function_code) + data ) - packet += struct.pack(">H", computeCRC(packet)) + packet += struct.pack(">H", MessageRTU.compute_CRC(packet)) packet = self._start + packet + self._end return packet diff --git a/pymodbus/framer/rtu_framer.py b/pymodbus/framer/rtu_framer.py index 65c36a661..a0ac06fbb 100644 --- a/pymodbus/framer/rtu_framer.py +++ b/pymodbus/framer/rtu_framer.py @@ -8,7 +8,8 @@ ) from pymodbus.framer.base import BYTE_ORDER, FRAME_HEADER, ModbusFramer from pymodbus.logging import Log -from pymodbus.utilities import ModbusTransactionState, checkCRC, computeCRC +from pymodbus.message.rtu import MessageRTU +from pymodbus.utilities import ModbusTransactionState RTU_FRAME_HEADER = BYTE_ORDER + FRAME_HEADER @@ -62,6 +63,7 @@ def __init__(self, decoder, client=None): self._end = b"\x0d\x0a" self._min_frame_size = 4 self.function_codes = decoder.lookup.keys() if decoder else {} + self.message_handler = MessageRTU([0], True) def decode_data(self, data): """Decode data.""" @@ -131,7 +133,7 @@ def check_frame(self): data = self._buffer[: frame_size - 2] crc = self._header["crc"] crc_val = (int(crc[0]) << 8) + int(crc[1]) - return checkCRC(data, crc_val) + return MessageRTU.check_CRC(data, crc_val) except (IndexError, KeyError, struct.error): return False @@ -175,7 +177,7 @@ def buildPacket(self, message): struct.pack(RTU_FRAME_HEADER, message.slave_id, message.function_code) + data ) - packet += struct.pack(">H", computeCRC(packet)) + packet += struct.pack(">H", MessageRTU.compute_CRC(packet)) # Ensure that transaction is actually the slave id for serial comms if message.slave_id: message.transaction_id = message.slave_id diff --git a/pymodbus/framer/socket_framer.py b/pymodbus/framer/socket_framer.py index dca7b3b94..9b5e5564e 100644 --- a/pymodbus/framer/socket_framer.py +++ b/pymodbus/framer/socket_framer.py @@ -7,6 +7,7 @@ ) from pymodbus.framer.base import SOCKET_FRAME_HEADER, ModbusFramer from pymodbus.logging import Log +from pymodbus.message.socket import MessageSocket # --------------------------------------------------------------------------- # @@ -42,6 +43,7 @@ def __init__(self, decoder, client=None): """ super().__init__(decoder, client) self._hsize = 0x07 + self.message_handler = MessageSocket([0], True) def decode_data(self, data): """Decode data.""" diff --git a/pymodbus/framer/tls_framer.py b/pymodbus/framer/tls_framer.py index b3430864b..f37729953 100644 --- a/pymodbus/framer/tls_framer.py +++ b/pymodbus/framer/tls_framer.py @@ -7,6 +7,7 @@ ) from pymodbus.framer.base import TLS_FRAME_HEADER, ModbusFramer from pymodbus.logging import Log +from pymodbus.message.tls import MessageTLS # --------------------------------------------------------------------------- # @@ -34,6 +35,7 @@ def __init__(self, decoder, client=None): """ super().__init__(decoder, client) self._hsize = 0x0 + self.message_encoder = MessageTLS([0], True) def decode_data(self, data): """Decode data.""" diff --git a/pymodbus/utilities.py b/pymodbus/utilities.py index 7035451bb..d73c153bb 100644 --- a/pymodbus/utilities.py +++ b/pymodbus/utilities.py @@ -10,8 +10,6 @@ "pack_bitstring", "unpack_bitstring", "default", - "computeCRC", - "checkCRC", "rtuFrameSize", ] @@ -152,57 +150,6 @@ def unpack_bitstring(data: bytes) -> list[bool]: # --------------------------------------------------------------------------- # -def __generate_crc16_table(): - """Generate a crc16 lookup table. - - .. note:: This will only be generated once - """ - result = [] - for byte in range(256): - crc = 0x0000 - for _ in range(8): - if (byte ^ crc) & 0x0001: - crc = (crc >> 1) ^ 0xA001 - else: - crc >>= 1 - byte >>= 1 - result.append(crc) - return result - - -__crc16_table = __generate_crc16_table() - - -def computeCRC(data): # pylint: disable=invalid-name - """Compute a crc16 on the passed in string. - - For modbus, this is only used on the binary serial protocols (in this - case RTU). - - The difference between modbus's crc16 and a normal crc16 - is that modbus starts the crc value out at 0xffff. - - :param data: The data to create a crc16 of - :returns: The calculated CRC - """ - crc = 0xFFFF - for data_byte in data: - idx = __crc16_table[(crc ^ int(data_byte)) & 0xFF] - crc = ((crc >> 8) & 0xFF) ^ idx - swapped = ((crc << 8) & 0xFF00) | ((crc >> 8) & 0x00FF) - return swapped - - -def checkCRC(data, check): # pylint: disable=invalid-name - """Check if the data matches the passed in CRC. - - :param data: The data to create a crc16 of - :param check: The CRC to validate - :returns: True if matched, False otherwise - """ - return computeCRC(data) == check - - def rtuFrameSize(data, byte_count_pos): # pylint: disable=invalid-name """Calculate the size of the frame based on the byte count. diff --git a/test/test_utilities.py b/test/test_utilities.py index 8af067bb8..8dcb6a5f2 100644 --- a/test/test_utilities.py +++ b/test/test_utilities.py @@ -2,7 +2,6 @@ import struct from pymodbus.utilities import ( - checkCRC, default, dict_property, pack_bitstring, @@ -91,8 +90,3 @@ def test_bit_packing(self): """Test all string <=> bit packing functions.""" assert unpack_bitstring(b"\x55") == self.bits assert pack_bitstring(self.bits) == b"\x55" - - def test_cyclic_redundancy_check(self): - """Test the cyclic redundancy check code.""" - assert checkCRC(self.data, 0xE2DB) - assert checkCRC(self.string, 0x889E) From 4b8ab6f7b1d150dd8a78e6a75cda1e03084ce819 Mon Sep 17 00:00:00 2001 From: jan iversen Date: Tue, 5 Mar 2024 10:55:10 +0100 Subject: [PATCH 3/4] ASCII+TLS integrated. --- pymodbus/framer/ascii_framer.py | 7 ++++++- pymodbus/framer/tls_framer.py | 4 ++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/pymodbus/framer/ascii_framer.py b/pymodbus/framer/ascii_framer.py index 0d999cb73..b965dbc27 100644 --- a/pymodbus/framer/ascii_framer.py +++ b/pymodbus/framer/ascii_framer.py @@ -113,7 +113,12 @@ def buildPacket(self, message): packet.extend(b2a_hex(encoded)) packet.extend(f"{checksum:02x}".encode()) packet.extend(self._end) - return bytes(packet).upper() + packet = bytes(packet).upper() + + data = message.function_code.to_bytes(1,'big') + encoded + packet_new = self.message_handler.encode(data, message.slave_id, message.transaction_id) + assert packet == packet_new, "ASCII FRAMER BuildPacket failed!" + return packet # __END__ diff --git a/pymodbus/framer/tls_framer.py b/pymodbus/framer/tls_framer.py index f37729953..70cd821ce 100644 --- a/pymodbus/framer/tls_framer.py +++ b/pymodbus/framer/tls_framer.py @@ -82,6 +82,10 @@ def buildPacket(self, message): data = message.encode() packet = struct.pack(TLS_FRAME_HEADER, message.function_code) packet += data + + data_new = message.function_code.to_bytes(1,'big') + data + packet_new = self.message_encoder.encode(data_new, message.slave_id, message.transaction_id) + assert packet == packet_new, "TLS FRAMER BuildPacket failed!" return packet From bee31239bdf4d2b7aa70dacb96f3c64fc9fa1e29 Mon Sep 17 00:00:00 2001 From: jan iversen Date: Tue, 5 Mar 2024 11:53:10 +0100 Subject: [PATCH 4/4] RTU + SOCKET. --- pymodbus/framer/rtu_framer.py | 5 +++++ pymodbus/framer/socket_framer.py | 4 ++++ 2 files changed, 9 insertions(+) diff --git a/pymodbus/framer/rtu_framer.py b/pymodbus/framer/rtu_framer.py index a0ac06fbb..a7256ac3d 100644 --- a/pymodbus/framer/rtu_framer.py +++ b/pymodbus/framer/rtu_framer.py @@ -178,6 +178,11 @@ def buildPacket(self, message): + data ) packet += struct.pack(">H", MessageRTU.compute_CRC(packet)) + + data_new = message.function_code.to_bytes(1, 'big') + data + packet_new = self.message_handler.encode(data_new, message.slave_id, message.transaction_id) + assert packet == packet_new, "RTU FRAMER BuildPacket failed!" + # Ensure that transaction is actually the slave id for serial comms if message.slave_id: message.transaction_id = message.slave_id diff --git a/pymodbus/framer/socket_framer.py b/pymodbus/framer/socket_framer.py index 9b5e5564e..8b638392b 100644 --- a/pymodbus/framer/socket_framer.py +++ b/pymodbus/framer/socket_framer.py @@ -128,4 +128,8 @@ def buildPacket(self, message): message.function_code, ) packet += data + + data_new = message.function_code.to_bytes(1, 'big') + data + packet_new = self.message_handler.encode(data_new, message.slave_id, message.transaction_id) + assert packet == packet_new, "SOCKET FRAMER BuildPacket failed!" return packet