From 7c4a310e10abfc17b42330c3474758b67e50dda8 Mon Sep 17 00:00:00 2001 From: jan iversen Date: Wed, 6 Mar 2024 17:31:33 +0100 Subject: [PATCH] update message tests (incorporate all old tests). (#2088) --- test/message/test_ascii.py | 21 - test/message/test_message.py | 129 +++++- test/message/test_rtu.py | 70 --- test/message/test_socket.py | 15 - test/message/test_tls.py | 13 - test/message/to_do_framers.py | 505 -------------------- test/message/to_do_transaction.py | 744 ------------------------------ 7 files changed, 126 insertions(+), 1371 deletions(-) delete mode 100644 test/message/to_do_framers.py delete mode 100755 test/message/to_do_transaction.py diff --git a/test/message/test_ascii.py b/test/message/test_ascii.py index 5e46c258b..1235e646e 100644 --- a/test/message/test_ascii.py +++ b/test/message/test_ascii.py @@ -14,12 +14,6 @@ def prepare_frame(): return MessageAscii([1], False) - def test_roundtrip_LRC(self): - """Test combined compute/check LRC.""" - data = b'\x12\x34\x23\x45\x34\x56\x45\x67' - assert MessageAscii.compute_LRC(data) == 0x1c - assert MessageAscii.check_LRC(data, 0x1C) - @pytest.mark.parametrize( ("packet", "used_len", "res_id", "res"), [ @@ -43,21 +37,6 @@ def test_decode(self, frame, packet, used_len, res_id, res): assert not tid assert dev_id == res_id - @pytest.mark.parametrize( - ("data", "dev_id", "res_msg"), - [ - (b'\x01\x05\x04\x00\x17', 1, b':010105040017DE\r\n'), - (b'\x03\x07\x06\x00\x73', 2, b':0203070600737B\r\n'), - (b'\x08\x00\x01', 3, b':03080001F4\r\n'), - (b'\x84\x01', 2, b':02840179\r\n'), - ], - ) - def test_encode(self, frame, data, dev_id, res_msg): - """Test encode.""" - msg = frame.encode(data, dev_id, 0) - assert res_msg == msg - assert dev_id == int(msg[1:3], 16) - @pytest.mark.parametrize( ("data", "dev_id", "res_msg"), [ diff --git a/test/message/test_message.py b/test/message/test_message.py index ef5a3809a..68dee5d37 100644 --- a/test/message/test_message.py +++ b/test/message/test_message.py @@ -116,8 +116,27 @@ def test_LRC_CRC(self, func, lrc, expect): data = b'\x12\x34\x23\x45\x34\x56\x45\x67' assert expect == func(data, lrc) if lrc else func(data) + def test_roundtrip_LRC(self): + """Test combined compute/check LRC.""" + data = b'\x12\x34\x23\x45\x34\x56\x45\x67' + assert MessageAscii.compute_LRC(data) == 0x1c + assert MessageAscii.check_LRC(data, 0x1C) + + def test_crc16_table(self): + """Test the crc16 table is prefilled.""" + assert len(MessageRTU.crc16_table) == 256 + assert isinstance(MessageRTU.crc16_table[0], int) + assert isinstance(MessageRTU.crc16_table[255], int) + + def test_roundtrip_CRC(self): + """Test combined compute/check CRC.""" + data = b'\x12\x34\x23\x45\x34\x56\x45\x67' + assert MessageRTU.compute_CRC(data) == 0xE2DB + assert MessageRTU.check_CRC(data, 0xE2DB) -class TestMessages: # pylint: disable=too-few-public-methods + + +class TestMessages: """Test message classes.""" @pytest.mark.parametrize( @@ -198,10 +217,114 @@ class TestMessages: # pylint: disable=too-few-public-methods def test_encode(self, frame, frame_expected, data, dev_id, tid, inx1, inx2, inx3): """Test encode method.""" if frame != MessageSocket and tid: - return + pytest.skip("Not supported") if frame == MessageTLS and (tid or dev_id): - return + pytest.skip("Not supported") frame_obj = frame(None, True) expected = frame_expected[inx1 + inx2 + inx3] encoded_data = frame_obj.encode(data, dev_id, tid) assert encoded_data == expected + + @pytest.mark.parametrize( + ("msg_type", "data", "dev_id", "tid", "expected"), + [ + (MessageType.ASCII, b':0003007C00027F\r\n', 0, 0, b"\x03\x00\x7c\x00\x02",), # Request + (MessageType.ASCII, b':000304008D008EDE\r\n', 0, 0, b"\x03\x04\x00\x8d\x00\x8e",), # Response + (MessageType.ASCII, b':0083027B\r\n', 0, 0, b'\x83\x02',), # Exception + (MessageType.ASCII, b':1103007C00026E\r\n', 17, 0, b"\x03\x00\x7c\x00\x02",), # Request + (MessageType.ASCII, b':110304008D008ECD\r\n', 17, 0, b"\x03\x04\x00\x8d\x00\x8e",), # Response + (MessageType.ASCII, b':1183026A\r\n', 17, 0, b'\x83\x02',), # Exception + (MessageType.ASCII, b':FF03007C000280\r\n', 255, 0, b"\x03\x00\x7c\x00\x02",), # Request + (MessageType.ASCII, b':FF0304008D008EDF\r\n', 255, 0, b"\x03\x04\x00\x8d\x00\x8e",), # Response + (MessageType.ASCII, b':FF83027C\r\n', 255, 0, b'\x83\x02',), # Exception + (MessageType.RTU, b'\x00\x03\x00\x7c\x00\x02\x04\x02', 0, 0, b"\x03\x00\x7c\x00\x02",), # Request + (MessageType.RTU, b'\x00\x03\x04\x00\x8d\x00\x8e\xfa\xbc', 0, 0, b"\x03\x04\x00\x8d\x00\x8e",), # Response + (MessageType.RTU, b'\x00\x83\x02\x91\x31', 0, 0, b'\x83\x02',), # Exception + (MessageType.RTU, b'\x11\x03\x00\x7c\x00\x02\x07\x43', 17, 0, b"\x03\x00\x7c\x00\x02",), # Request + (MessageType.RTU, b'\x11\x03\x04\x00\x8d\x00\x8e\xfb\xbd', 17, 0, b"\x03\x04\x00\x8d\x00\x8e",), # Response + (MessageType.RTU, b'\x11\x83\x02\xc1\x34', 17, 0, b'\x83\x02',), # Exception + (MessageType.RTU, b'\xff\x03\x00|\x00\x02\x10\x0d', 255, 0, b"\x03\x00\x7c\x00\x02",), # Request + (MessageType.RTU, b'\xff\x03\x04\x00\x8d\x00\x8e\xf5\xb3', 255, 0, b"\x03\x04\x00\x8d\x00\x8e",), # Response + (MessageType.RTU, b'\xff\x83\x02\xa1\x01', 255, 0, b'\x83\x02',), # Exception + (MessageType.SOCKET, b'\x00\x00\x00\x00\x00\x06\x00\x03\x00\x7c\x00\x02', 0, 0, b"\x03\x00\x7c\x00\x02",), # Request + (MessageType.SOCKET, b'\x00\x00\x00\x00\x00\x07\x00\x03\x04\x00\x8d\x00\x8e', 0, 0, b"\x03\x04\x00\x8d\x00\x8e",), # Response + (MessageType.SOCKET, b'\x00\x00\x00\x00\x00\x03\x00\x83\x02', 0, 0, b'\x83\x02',), # Exception + (MessageType.SOCKET, b'\x00\x00\x00\x00\x00\x06\x11\x03\x00\x7c\x00\x02', 17, 0, b"\x03\x00\x7c\x00\x02",), # Request + (MessageType.SOCKET, b'\x00\x00\x00\x00\x00\x07\x11\x03\x04\x00\x8d\x00\x8e', 17, 0, b"\x03\x04\x00\x8d\x00\x8e",), # Response + (MessageType.SOCKET, b'\x00\x00\x00\x00\x00\x03\x11\x83\x02', 17, 0, b'\x83\x02',), # Exception + (MessageType.SOCKET, b'\x00\x00\x00\x00\x00\x06\xff\x03\x00\x7c\x00\x02', 255, 0, b"\x03\x00\x7c\x00\x02",), # Request + (MessageType.SOCKET, b'\x00\x00\x00\x00\x00\x07\xff\x03\x04\x00\x8d\x00\x8e', 255, 0, b"\x03\x04\x00\x8d\x00\x8e",), # Response + (MessageType.SOCKET, b'\x00\x00\x00\x00\x00\x03\xff\x83\x02', 255, 0, b'\x83\x02',), # Exception + (MessageType.SOCKET, b'\x0c\x05\x00\x00\x00\x06\x00\x03\x00\x7c\x00\x02', 0, 3077, b"\x03\x00\x7c\x00\x02",), # Request + (MessageType.SOCKET, b'\x0c\x05\x00\x00\x00\x07\x00\x03\x04\x00\x8d\x00\x8e', 0, 3077, b"\x03\x04\x00\x8d\x00\x8e",), # Response + (MessageType.SOCKET, b'\x0c\x05\x00\x00\x00\x03\x00\x83\x02', 0, 3077, b'\x83\x02',), # Exception + (MessageType.SOCKET, b'\x0c\x05\x00\x00\x00\x06\x11\x03\x00\x7c\x00\x02', 17, 3077, b"\x03\x00\x7c\x00\x02",), # Request + (MessageType.SOCKET, b'\x0c\x05\x00\x00\x00\x07\x11\x03\x04\x00\x8d\x00\x8e', 17, 3077, b"\x03\x04\x00\x8d\x00\x8e",), # Response + (MessageType.SOCKET, b'\x0c\x05\x00\x00\x00\x03\x11\x83\x02', 17, 3077, b'\x83\x02',), # Exception + (MessageType.SOCKET, b'\x0c\x05\x00\x00\x00\x06\xff\x03\x00\x7c\x00\x02', 255, 3077, b"\x03\x00\x7c\x00\x02",), # Request + (MessageType.SOCKET, b'\x0c\x05\x00\x00\x00\x07\xff\x03\x04\x00\x8d\x00\x8e', 255, 3077, b"\x03\x04\x00\x8d\x00\x8e",), # Response + (MessageType.SOCKET, b'\x0c\x05\x00\x00\x00\x03\xff\x83\x02', 255, 3077, b'\x83\x02',), # Exception + (MessageType.TLS, b'\x03\x00\x7c\x00\x02', 0, 0, b"\x03\x00\x7c\x00\x02",), # Request + (MessageType.TLS, b'\x03\x04\x00\x8d\x00\x8e', 0, 0, b"\x03\x04\x00\x8d\x00\x8e",), # Response + (MessageType.TLS, b'\x83\x02', 0, 0, b'\x83\x02',), # Exception + ] + ) + @pytest.mark.parametrize( + ("split"), + [ + "no", + "half", + "single", + ] + ) + async def test_decode(self, dummy_message, msg_type, data, dev_id, tid, expected, split): + """Test encode method.""" + if msg_type == MessageType.RTU: + pytest.skip("Waiting on implementation!") + if msg_type == MessageType.TLS and split != "no": + pytest.skip("Not supported.") + frame = dummy_message( + msg_type, + CommParams(), + False, + [1], + ) + frame.callback_request_response = mock.Mock() + if split == "no": + used_len = frame.callback_data(data) + + elif split == "half": + split_len = int(len(data) / 2) + assert not frame.callback_data(data[0:split_len]) + frame.callback_request_response.assert_not_called() + used_len = frame.callback_data(data) + else: + last = len(data) + for i in range(0, last -1): + assert not frame.callback_data(data[0:i+1]) + frame.callback_request_response.assert_not_called() + used_len = frame.callback_data(data) + assert used_len == len(data) + frame.callback_request_response.assert_called_with(expected, dev_id, tid) + + @pytest.mark.parametrize( + ("frame", "data", "exp_len"), + [ + (MessageAscii, b':0003007C00017F\r\n', 17), # bad crc + # (MessageAscii, b'abc:0003007C00027F\r\n', 3), # garble in front + # (MessageAscii, b':0003007C00017F\r\nabc', 17), # bad crc, garble after + # (MessageAscii, b':0003007C00017F\r\n:0003', 17), # part second message + (MessageRTU, b'\x00\x83\x02\x91\x31', 0), # bad crc + # (MessageRTU, b'\x00\x83\x02\x91\x31', 0), # garble in front + # (MessageRTU, b'\x00\x83\x02\x91\x31', 0), # garble after + # (MessageRTU, b'\x00\x83\x02\x91\x31', 0), # part second message + ] + ) + async def test_decode_bad_crc(self, frame, data, exp_len): + """Test encode method.""" + if frame == MessageRTU: + pytest.skip("Waiting for implementation.") + frame_obj = frame(None, True) + used_len, _, _, data = frame_obj.decode(data) + assert used_len == exp_len + assert not data diff --git a/test/message/test_rtu.py b/test/message/test_rtu.py index 4ae1244d4..0eb8e5a59 100644 --- a/test/message/test_rtu.py +++ b/test/message/test_rtu.py @@ -13,76 +13,6 @@ def prepare_frame(): """Return message object.""" return MessageRTU([1], False) - def test_crc16_table(self): - """Test the crc16 table is prefilled.""" - assert len(MessageRTU.crc16_table) == 256 - assert isinstance(MessageRTU.crc16_table[0], int) - assert isinstance(MessageRTU.crc16_table[255], int) - - def test_roundtrip_CRC(self): - """Test combined compute/check CRC.""" - data = b'\x12\x34\x23\x45\x34\x56\x45\x67' - assert MessageRTU.compute_CRC(data) == 0xE2DB - assert MessageRTU.check_CRC(data, 0xE2DB) - - # b"\x02\x01\x01\x00Q\xcc" - # b"\x01\x01\x03\x01\x00\n\xed\x89" - # b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x43" - # b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD" - - # b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\x11\x03" # good frame + part of next frame - - # b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAC" # invalid frame CRC - # b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAC" # bad crc - # b"\x61\x62\x00\x01\x00\n\xec\x1c" # bad function code - # b"\x01\x03\x03\x01\x00\n\x94\x49" # Not ok - - # test frame ready - # (b"", False), - # (b"\x11", False), - # (b"\x11\x03", False), - # (b"\x11\x03\x06", False), - # (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49", False), - # (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD", True), - # (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\xAB\xCD", True), - - - @pytest.mark.parametrize( - ("packet", "used_len", "res_id", "res"), - [ - (b':010100010001FC\r\n', 17, 1, b'\x01\x00\x01\x00\x01'), - (b':00010001000AF4\r\n', 17, 0, b'\x01\x00\x01\x00\x0a'), - (b':01010001000AF3\r\n', 17, 1, b'\x01\x00\x01\x00\x0a'), - (b':61620001000A32\r\n', 17, 97, b'\x62\x00\x01\x00\x0a'), - (b':01270001000ACD\r\n', 17, 1, b'\x27\x00\x01\x00\x0a'), - (b':010100', 0, 0, b''), # short frame - (b':00010001000AF4', 0, 0, b''), - (b'abc:00010001000AF4', 3, 0, b''), # garble before frame - (b'abc00010001000AF4', 17, 0, b''), # only garble - (b':01010001000A00\r\n', 17, 0, b''), - ], - ) - def xtest_decode(self, frame, packet, used_len, res_id, res): - """Test decode.""" - res_len, tid, dev_id, data = frame.decode(packet) - assert res_len == used_len - assert data == res - assert not tid - assert dev_id == res_id - - @pytest.mark.parametrize( - ("data", "dev_id", "res_msg"), - [ - (b'\x01\x01\x00', 2, b'\x02\x01\x01\x00\x51\xcc'), - (b'\x03\x06\xAE\x41\x56\x52\x43\x40', 17, b'\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD'), - (b'\x01\x03\x01\x00\x0a', 1, b'\x01\x01\x03\x01\x00\x0a\xed\x89'), - ], - ) - def test_encode(self, frame, data, dev_id, res_msg): - """Test encode.""" - msg = frame.encode(data, dev_id, 0) - assert res_msg == msg - assert dev_id == int(msg[0]) @pytest.mark.parametrize( ("data", "dev_id", "res_msg"), diff --git a/test/message/test_socket.py b/test/message/test_socket.py index 4543fa3dc..1c4e930ce 100644 --- a/test/message/test_socket.py +++ b/test/message/test_socket.py @@ -33,21 +33,6 @@ def test_decode(self, frame, packet, used_len, res_id, res_tid, res): assert res_tid == tid assert dev_id == res_id - @pytest.mark.parametrize( - ("data", "dev_id", "tid", "res_msg"), - [ - (b'\x01\x05\x04\x00\x17', 7, 5, b'\x00\x05\x00\x00\x00\x06\x07\x01\x05\x04\x00\x17'), - (b'\x03\x07\x06\x00\x73', 2, 9, b'\x00\x09\x00\x00\x00\x06\x02\x03\x07\x06\x00\x73'), - (b'\x08\x00\x01', 3, 6, b'\x00\x06\x00\x00\x00\x04\x03\x08\x00\x01'), - (b'\x84\x01', 4, 8, b'\x00\x08\x00\x00\x00\x03\x04\x84\x01'), - ], - ) - def test_encode(self, frame, data, dev_id, tid, res_msg): - """Test encode.""" - msg = frame.encode(data, dev_id, tid) - assert res_msg == msg - assert dev_id == int(msg[6]) - assert tid == int.from_bytes(msg[0:2], 'big') @pytest.mark.parametrize( ("data", "dev_id", "tid", "res_msg"), diff --git a/test/message/test_tls.py b/test/message/test_tls.py index 966683119..d9140c2cb 100644 --- a/test/message/test_tls.py +++ b/test/message/test_tls.py @@ -30,19 +30,6 @@ def test_decode(self, frame, packet, used_len,): assert not tid assert not dev_id - @pytest.mark.parametrize( - ("data"), - [ - (b'\x01\x05\x04\x00\x17'), - (b'\x03\x07\x06\x00\x73'), - (b'\x08\x00\x01'), - (b'\x84\x01'), - ], - ) - def test_encode(self, frame, data): - """Test encode.""" - msg = frame.encode(data, 0, 0) - assert data == msg @pytest.mark.parametrize( ("data"), diff --git a/test/message/to_do_framers.py b/test/message/to_do_framers.py deleted file mode 100644 index d93b6c081..000000000 --- a/test/message/to_do_framers.py +++ /dev/null @@ -1,505 +0,0 @@ -"""Test framers.""" -from unittest import mock - -import pytest - -from pymodbus import Framer -from pymodbus.client.base import ModbusBaseClient -from pymodbus.exceptions import ModbusIOException -from pymodbus.factory import ClientDecoder -from pymodbus.framer import ( - ModbusAsciiFramer, - ModbusBinaryFramer, - ModbusRtuFramer, - ModbusSocketFramer, -) -from pymodbus.transport import CommType -from pymodbus.utilities import ModbusTransactionState - - -BASE_PORT = 6600 - - -TEST_MESSAGE = b"\x00\x01\x00\x01\x00\n\xec\x1c" - - -class TestFramers: - """Test framers.""" - - slaves = [2, 17] - - @staticmethod - @pytest.fixture(name="rtu_framer") - def fixture_rtu_framer(): - """RTU framer.""" - return ModbusRtuFramer(ClientDecoder()) - - @staticmethod - @pytest.fixture(name="ascii_framer") - def fixture_ascii_framer(): - """Ascii framer.""" - return ModbusAsciiFramer(ClientDecoder()) - - - @pytest.mark.parametrize( - "framer", - [ - ModbusRtuFramer, - ModbusAsciiFramer, - ModbusBinaryFramer, - ], -) - def test_framer_initialization(self, framer): - """Test framer initialization.""" - decoder = ClientDecoder() - framer = framer(decoder) - assert framer.client is None - assert framer._buffer == b"" # pylint: disable=protected-access - assert framer.decoder == decoder - if isinstance(framer, ModbusAsciiFramer): - assert framer._header == { # pylint: disable=protected-access - "tid": 0, - "pid": 0, - "lrc": "0000", - "len": 0, - "uid": 0x00, - "crc": b"\x00\x00", - } - assert framer._hsize == 0x02 # pylint: disable=protected-access - assert framer._start == b":" # pylint: disable=protected-access - assert framer._end == b"\r\n" # pylint: disable=protected-access - elif isinstance(framer, ModbusRtuFramer): - assert framer._header == { # pylint: disable=protected-access - "tid": 0, - "pid": 0, - "lrc": "0000", - "uid": 0x00, - "len": 0, - "crc": b"\x00\x00", - } - assert framer._hsize == 0x01 # pylint: disable=protected-access - assert framer._end == b"\x0d\x0a" # pylint: disable=protected-access - assert framer._min_frame_size == 4 # pylint: disable=protected-access - else: - assert framer._header == { # pylint: disable=protected-access - "tid": 0, - "pid": 0, - "lrc": "0000", - "crc": b"\x00\x00", - "len": 0, - "uid": 0x00, - } - assert framer._hsize == 0x01 # pylint: disable=protected-access - assert framer._start == b"\x7b" # pylint: disable=protected-access - assert framer._end == b"\x7d" # pylint: disable=protected-access - assert framer._repeat == [ # pylint: disable=protected-access - b"}"[0], - b"{"[0], - ] - - - @pytest.mark.parametrize( - ("data", "expected"), - [ - (b"", 0), - (b"\x02\x01\x01\x00Q\xcc", 1), - (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD", 1), # valid frame - (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAC", 0), # invalid frame CRC - ], - ) - def test_check_frame(self, rtu_framer, data, expected): - """Test check frame.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - rtu_framer.processIncomingPacket(data, callback, self.slaves) - assert count == expected - - - @pytest.mark.parametrize( - ("data", "header", "res"), - [ - (b"", {"uid": 0x00, "len": 0, "crc": b"\x00\x00"}, 0), - (b"abcd", {"uid": 0x00, "len": 2, "crc": b"\x00\x00"}, 0), - ( - b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\x12\x03", # real case, frame size is 11 - {"uid": 0x00, "len": 11, "crc": b"\x00\x00"}, - 1, - ), - ], - ) - def test_rtu_advance_framer(self, rtu_framer, data, header, res): - """Test rtu advance framer.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - rtu_framer._header = header # pylint: disable=protected-access - rtu_framer.processIncomingPacket(data, callback, self.slaves) - assert count == res - - - @pytest.mark.parametrize("data", [b"", b"abcd"]) - def test_rtu_reset_framer(self, rtu_framer, data): - """Test rtu reset framer.""" - rtu_framer._buffer = data # pylint: disable=protected-access - rtu_framer.resetFrame() - assert rtu_framer._header == { # pylint: disable=protected-access - "lrc": "0000", - "crc": b"\x00\x00", - "len": 0, - "uid": 0x00, - "pid": 0, - "tid": 0, - } - - - @pytest.mark.parametrize( - ("data", "expected"), - [ - (b"", 0), - (b"\x11", 0), - (b"\x11\x03", 0), - (b"\x11\x03\x06", 0), - (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49", 0), - (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD", 1), - (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\xAB\xCD", 1), - ], - ) - def test_is_frame_ready(self, rtu_framer, data, expected): - """Test is frame ready.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - rtu_framer.processIncomingPacket(data, callback, self.slaves) - assert count == expected - - - @pytest.mark.parametrize( - "data", - [ - b"", - b"\x11", - b"\x11\x03", - b"\x11\x03\x06", - b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x43", - ], - ) - def test_rtu_populate_header_fail(self, rtu_framer, data): - """Test rtu populate header fail.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - rtu_framer.processIncomingPacket(data, callback, self.slaves) - assert not count - - - @pytest.mark.parametrize( - ("data", "header"), - [ - ( - b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD", - { - "crc": b"\x49\xAD", - "uid": 17, - "len": 11, - "tid": 17, - }, - ), - ( - b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\x11\x03", - { - "crc": b"\x49\xAD", - "uid": 17, - "len": 11, - "tid": 17, - }, - ), - ], - ) - def test_rtu_populate_header(self, rtu_framer, data, header): - """Test rtu populate header.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - rtu_framer.processIncomingPacket(data, callback, self.slaves) - assert rtu_framer._header == header # pylint: disable=protected-access - - - def test_get_frame(self, rtu_framer): - """Test get frame.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - data = b"\x02\x01\x01\x00Q\xcc" - rtu_framer.processIncomingPacket(data, callback, self.slaves) - assert count - assert result.function_code.to_bytes(1,'big') + result.encode() == b"\x01\x01\x00" - - - def test_populate_result(self, rtu_framer): - """Test populate result.""" - rtu_framer._header["uid"] = 255 # pylint: disable=protected-access - result = mock.Mock() - rtu_framer.populateResult(result) - assert result.slave_id == 255 - - - @pytest.mark.parametrize( - ("data", "slaves", "reset_called", "cb_called"), - [ - (b"\x11", [17], 0, 0), # not complete frame - (b"\x11\x03", [17], 0, 0), # not complete frame - (b"\x11\x03\x06", [17], 0, 0), # not complete frame - (b"\x11\x03\x06\xAE\x41\x56\x52\x43", [17], 0, 0), # not complete frame - ( - b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40", - [17], - 0, - 0, - ), # not complete frame - ( - b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49", - [17], - 0, - 0, - ), # not complete frame - (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAC", [17], 1, 0), # bad crc - ( - b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD", - [17], - 0, - 1, - ), # good frame - ( - b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD", - [16], - 0, - 0, - ), # incorrect slave id - (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\x11\x03", [17], 0, 1), - # good frame + part of next frame - ], - ) - def test_rtu_incoming_packet(self, rtu_framer, data, slaves, reset_called, cb_called): - """Test rtu process incoming packet.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - with mock.patch.object( - rtu_framer, "resetFrame", wraps=rtu_framer.resetFrame - ) as mock_reset: - rtu_framer.processIncomingPacket(data, callback, slaves) - assert count == cb_called - assert mock_reset.call_count == reset_called - - - async def test_send_packet(self, rtu_framer): - """Test send packet.""" - message = TEST_MESSAGE - client = ModbusBaseClient( - Framer.ASCII, - host="localhost", - port=BASE_PORT + 1, - CommType=CommType.TCP, - ) - client.state = ModbusTransactionState.TRANSACTION_COMPLETE - client.silent_interval = 1 - client.last_frame_end = 1 - client.comm_params.timeout_connect = 0.25 - client.idle_time = mock.Mock(return_value=1) - client.send = mock.Mock(return_value=len(message)) - rtu_framer.client = client - assert rtu_framer.sendPacket(message) == len(message) - client.state = ModbusTransactionState.PROCESSING_REPLY - assert rtu_framer.sendPacket(message) == len(message) - - - def test_recv_packet(self, rtu_framer): - """Test receive packet.""" - message = TEST_MESSAGE - client = mock.Mock() - client.recv.return_value = message - rtu_framer.client = client - assert rtu_framer.recvPacket(len(message)) == message - - def test_process(self, rtu_framer): - """Test process.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - data = TEST_MESSAGE - rtu_framer.processIncomingPacket(data, callback, self.slaves) - assert not count - - @pytest.mark.parametrize(("slaves", "res"), [([16], 0), ([17], 1)]) - def test_validate__slave_id(self,rtu_framer, slaves, res): - """Test validate slave.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - data = b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\x12\x03" - rtu_framer.processIncomingPacket(data, callback, slaves) - assert count == res - - @pytest.mark.parametrize("data", [b":010100010001FC\r\n", b""]) - def test_decode_ascii_data(self, ascii_framer, data): - """Test decode ascii.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - ascii_framer.processIncomingPacket(data, callback, [1]) - if result: - assert result.slave_id == 1 - assert result.function_code == 1 - else: - assert not result - - def test_recv_split_packet(self): - """Test receive packet.""" - response_ok = False - - def _handle_response(_reply): - """Handle response.""" - nonlocal response_ok - response_ok = True - - message = bytearray(b"\x00\x01\x00\x00\x00\x0b\x01\x03\x08\x00\xb5\x12\x2f\x37\x21\x00\x03") - for i in range(0, len(message)): - part1 = message[:i] - part2 = message[i:] - response_ok = False - framer = ModbusSocketFramer(ClientDecoder()) - if i: - framer.processIncomingPacket(part1, _handle_response, slave=0) - assert not response_ok, "Response should not be accepted" - framer.processIncomingPacket(part2, _handle_response, slave=0) - assert response_ok, "Response is valid, but not accepted" - - - def test_recv_socket_exception_packet(self): - """Test receive packet.""" - response_ok = False - - def _handle_response(_reply): - """Handle response.""" - nonlocal response_ok - response_ok = True - - message = bytearray(b"\x00\x02\x00\x00\x00\x03\x01\x84\x02") - response_ok = False - framer = ModbusSocketFramer(ClientDecoder()) - framer.processIncomingPacket(message, _handle_response, slave=0) - assert response_ok, "Response is valid, but not accepted" - - message = bytearray(b"\x00\x01\x00\x00\x00\x0b\x01\x03\x08\x00\xb5\x12\x2f\x37\x21\x00\x03") - response_ok = False - framer = ModbusSocketFramer(ClientDecoder()) - framer.processIncomingPacket(message, _handle_response, slave=0) - assert response_ok, "Response is valid, but not accepted" - - # ---- 100% coverage - - @pytest.mark.parametrize( - ("framer", "message"), - [ - (ModbusAsciiFramer, b':01010001000AF3\r\n',), - (ModbusBinaryFramer, b'A{\x01\x01\x00\x01\x00\n\xed\xcd}',), - (ModbusRtuFramer, b"\x01\x01\x03\x01\x00\n\xed\x89",), - (ModbusSocketFramer, b'\x00\x00\x00\x00\x00\x06\x01\x01\x00\x01\x00\n',), - ] - ) - @pytest.mark.parametrize(("slave"), [0x01, 0x02]) - def test_processincomingpacket_ok(self, framer, message, slave): - """Test processIncomingPacket.""" - test_framer = framer(ClientDecoder()) - test_framer.processIncomingPacket(message, mock.Mock(), slave) - - - @pytest.mark.parametrize( - ("framer", "message"), - [ - (ModbusAsciiFramer, b':01270001000ACD\r\n',), - (ModbusBinaryFramer, b'{\x01\x1a\x00\x01\x00\n\x89\xcf}',), - (ModbusRtuFramer, b"\x01\x03\x03\x01\x00\n\x94\x49",), - (ModbusSocketFramer, b'\x00\x00\x00\x00\x00\x06\x01\x27\x00\x01\x00\n',), - ] - ) - def test_processincomingpacket_not_ok(self, framer, message): - """Test processIncomingPacket.""" - test_framer = framer(ClientDecoder()) - with pytest.raises(ModbusIOException): - test_framer.processIncomingPacket(message, mock.Mock(), 0x01) - - @pytest.mark.parametrize( - ("framer", "message"), - [ - (ModbusAsciiFramer, b':61620001000AF4\r\n',), - (ModbusBinaryFramer, b'{\x61\x62\x00\x01\x00\n\xec\x1c}',), - (ModbusRtuFramer, b"\x61\x62\x00\x01\x00\n\xec\x1c",), - (ModbusSocketFramer, b'\x00\x00\x00\x00\x00\x06\x61\x62\x00\x01\x00\n',), - ] - ) - @pytest.mark.parametrize("expected", [{"fcode": 98, "slave": 97}]) - def test_decode_data(self, framer, message, expected): - """Test decode data.""" - test_framer = framer(ClientDecoder()) - decoded = test_framer.decode_data(b'') - assert decoded == {} - decoded = test_framer.decode_data(message) - assert decoded["fcode"] == expected["fcode"] - assert decoded["slave"] == expected["slave"] - - def test_binary_framer_preflight(self): - """Test binary framer _preflight.""" - test_framer = ModbusBinaryFramer(ClientDecoder()) - assert test_framer._preflight(b'A{B}C') == b'A{{B}}C' # pylint: disable=protected-access diff --git a/test/message/to_do_transaction.py b/test/message/to_do_transaction.py deleted file mode 100755 index 36a2c3df0..000000000 --- a/test/message/to_do_transaction.py +++ /dev/null @@ -1,744 +0,0 @@ -"""Test transaction.""" -from itertools import count -from unittest import mock - -from pymodbus.exceptions import ( - ModbusIOException, -) -from pymodbus.factory import ServerDecoder -from pymodbus.pdu import ModbusRequest -from pymodbus.transaction import ( - ModbusAsciiFramer, - ModbusBinaryFramer, - ModbusRtuFramer, - ModbusSocketFramer, - ModbusTlsFramer, - ModbusTransactionManager, -) - - -TEST_MESSAGE = b"\x7b\x01\x03\x00\x00\x00\x05\x85\xC9\x7d" - - -class TestTransaction: # pylint: disable=too-many-public-methods - """Unittest for the pymodbus.transaction module.""" - - client = None - decoder = None - _tcp = None - _tls = None - _rtu = None - _ascii = None - _binary = None - _manager = None - _tm = None - - # ----------------------------------------------------------------------- # - # Test Construction - # ----------------------------------------------------------------------- # - def setup_method(self): - """Set up the test environment.""" - self.client = None - self.decoder = ServerDecoder() - self._tcp = ModbusSocketFramer(decoder=self.decoder, client=None) - self._tls = ModbusTlsFramer(decoder=self.decoder, client=None) - self._rtu = ModbusRtuFramer(decoder=self.decoder, client=None) - self._ascii = ModbusAsciiFramer(decoder=self.decoder, client=None) - self._binary = ModbusBinaryFramer(decoder=self.decoder, client=None) - self._manager = ModbusTransactionManager(self.client) - - # ----------------------------------------------------------------------- # - # Modbus transaction manager - # ----------------------------------------------------------------------- # - - def test_calculate_expected_response_length(self): - """Test calculate expected response length.""" - self._manager.client = mock.MagicMock() - self._manager.client.framer = mock.MagicMock() - self._manager._set_adu_size() # pylint: disable=protected-access - assert not self._manager._calculate_response_length( # pylint: disable=protected-access - 0 - ) - self._manager.base_adu_size = 10 - assert ( - self._manager._calculate_response_length(5) # pylint: disable=protected-access - == 15 - ) - - def test_calculate_exception_length(self): - """Test calculate exception length.""" - for framer, exception_length in ( - ("ascii", 11), - ("binary", 7), - ("rtu", 5), - ("tcp", 9), - ("tls", 2), - ("dummy", None), - ): - self._manager.client = mock.MagicMock() - if framer == "ascii": - self._manager.client.framer = self._ascii - elif framer == "binary": - self._manager.client.framer = self._binary - elif framer == "rtu": - self._manager.client.framer = self._rtu - elif framer == "tcp": - self._manager.client.framer = self._tcp - elif framer == "tls": - self._manager.client.framer = self._tls - else: - self._manager.client.framer = mock.MagicMock() - - self._manager._set_adu_size() # pylint: disable=protected-access - assert ( - self._manager._calculate_exception_length() # pylint: disable=protected-access - == exception_length - ) - - @mock.patch("pymodbus.transaction.time") - def test_execute(self, mock_time): - """Test execute.""" - mock_time.time.side_effect = count() - - client = mock.MagicMock() - client.framer = self._ascii - client.framer._buffer = b"deadbeef" # pylint: disable=protected-access - client.framer.processIncomingPacket = mock.MagicMock() - client.framer.processIncomingPacket.return_value = None - client.framer.buildPacket = mock.MagicMock() - client.framer.buildPacket.return_value = b"deadbeef" - client.framer.sendPacket = mock.MagicMock() - client.framer.sendPacket.return_value = len(b"deadbeef") - client.framer.decode_data = mock.MagicMock() - client.framer.decode_data.return_value = { - "slave": 1, - "fcode": 222, - "length": 27, - } - request = mock.MagicMock() - request.get_response_pdu_size.return_value = 10 - request.slave_id = 1 - request.function_code = 222 - trans = ModbusTransactionManager(client) - trans._recv = mock.MagicMock( # pylint: disable=protected-access - return_value=b"abcdef" - ) - assert trans.retries == 3 - assert not trans.retry_on_empty - - trans.getTransaction = mock.MagicMock() - trans.getTransaction.return_value = "response" - response = trans.execute(request) - assert response == "response" - # No response - trans._recv = mock.MagicMock( # pylint: disable=protected-access - return_value=b"abcdef" - ) - trans.transactions = {} - trans.getTransaction = mock.MagicMock() - trans.getTransaction.return_value = None - response = trans.execute(request) - assert isinstance(response, ModbusIOException) - - # No response with retries - trans.retry_on_empty = True - trans._recv = mock.MagicMock( # pylint: disable=protected-access - side_effect=iter([b"", b"abcdef"]) - ) - response = trans.execute(request) - assert isinstance(response, ModbusIOException) - - # wrong handle_local_echo - trans._recv = mock.MagicMock( # pylint: disable=protected-access - side_effect=iter([b"abcdef", b"deadbe", b"123456"]) - ) - client.comm_params.handle_local_echo = True - trans.retry_on_empty = False - trans.retry_on_invalid = False - assert trans.execute(request).message == "[Input/Output] Wrong local echo" - client.comm_params.handle_local_echo = False - - # retry on invalid response - trans.retry_on_invalid = True - trans._recv = mock.MagicMock( # pylint: disable=protected-access - side_effect=iter([b"", b"abcdef", b"deadbe", b"123456"]) - ) - response = trans.execute(request) - assert isinstance(response, ModbusIOException) - - # Unable to decode response - trans._recv = mock.MagicMock( # pylint: disable=protected-access - side_effect=ModbusIOException() - ) - client.framer.processIncomingPacket.side_effect = mock.MagicMock( - side_effect=ModbusIOException() - ) - assert isinstance(trans.execute(request), ModbusIOException) - - # Broadcast - client.params.broadcast_enable = True - request.slave_id = 0 - response = trans.execute(request) - assert response == b"Broadcast write sent - no response expected" - - # Broadcast w/ Local echo - client.comm_params.handle_local_echo = True - client.params.broadcast_enable = True - recv = mock.MagicMock(return_value=b"deadbeef") - trans._recv = recv # pylint: disable=protected-access - request.slave_id = 0 - response = trans.execute(request) - assert response == b"Broadcast write sent - no response expected" - recv.assert_called_once_with(8, False) - client.comm_params.handle_local_echo = False - - def test_transaction_manager_tid(self): - """Test the transaction manager TID.""" - for tid in range(1, self._manager.getNextTID() + 10): - assert tid + 1 == self._manager.getNextTID() - self._manager.reset() - assert self._manager.getNextTID() == 1 - - def test_get_transaction_manager_transaction(self): - """Test the getting a transaction from the transaction manager.""" - - class Request: # pylint: disable=too-few-public-methods - """Request.""" - - self._manager.reset() - handle = Request() - handle.transaction_id = ( # pylint: disable=attribute-defined-outside-init - self._manager.getNextTID() - ) - handle.message = b"testing" # pylint: disable=attribute-defined-outside-init - self._manager.addTransaction(handle) - result = self._manager.getTransaction(handle.transaction_id) - assert handle.message == result.message - - def test_delete_transaction_manager_transaction(self): - """Test deleting a transaction from the dict transaction manager.""" - - class Request: # pylint: disable=too-few-public-methods - """Request.""" - - self._manager.reset() - handle = Request() - handle.transaction_id = ( # pylint: disable=attribute-defined-outside-init - self._manager.getNextTID() - ) - handle.message = b"testing" # pylint: disable=attribute-defined-outside-init - - self._manager.addTransaction(handle) - self._manager.delTransaction(handle.transaction_id) - assert not self._manager.getTransaction(handle.transaction_id) - - # ----------------------------------------------------------------------- # - # TCP tests - # ----------------------------------------------------------------------- # - def test_tcp_framer_transaction_ready(self): - """Test a tcp frame transaction.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x01\x02\x00\x08" - self._tcp.processIncomingPacket(msg, callback, [1]) - self._tcp._buffer = msg # pylint: disable=protected-access - - def test_tcp_framer_transaction_full(self): - """Test a full tcp frame transaction.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x01\x02\x00\x08" - self._tcp.processIncomingPacket(msg, callback, [0, 1]) - assert result.function_code.to_bytes(1,'big') + result.encode() == msg[7:] - - def test_tcp_framer_transaction_half(self): - """Test a half completed tcp frame transaction.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - msg1 = b"\x00\x01\x12\x34\x00" - msg2 = b"\x06\xff\x02\x01\x02\x00\x08" - self._tcp.processIncomingPacket(msg1, callback, [0, 1]) - assert not result - self._tcp.processIncomingPacket(msg2, callback, [0, 1]) - assert result - assert result.function_code.to_bytes(1,'big') + result.encode() == msg2[2:] - - def test_tcp_framer_transaction_half2(self): - """Test a half completed tcp frame transaction.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - msg1 = b"\x00\x01\x12\x34\x00\x06\xff" - msg2 = b"\x02\x01\x02\x00\x08" - self._tcp.processIncomingPacket(msg1, callback, [0, 1]) - assert not result - self._tcp.processIncomingPacket(msg2, callback, [0, 1]) - assert result - assert result.function_code.to_bytes(1,'big') + result.encode() == msg2 - - def test_tcp_framer_transaction_half3(self): - """Test a half completed tcp frame transaction.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - msg1 = b"\x00\x01\x12\x34\x00\x06\xff\x02\x01\x02\x00" - msg2 = b"\x08" - self._tcp.processIncomingPacket(msg1, callback, [0, 1]) - assert not result - self._tcp.processIncomingPacket(msg2, callback, [0, 1]) - assert result - assert result.function_code.to_bytes(1,'big') + result.encode() == msg1[7:] + msg2 - - def test_tcp_framer_transaction_short(self): - """Test that we can get back on track after an invalid message.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - # msg1 = b"\x99\x99\x99\x99\x00\x01\x00\x17" - msg1 = b'' - msg2 = b"\x00\x01\x12\x34\x00\x06\xff\x02\x01\x02\x00\x08" - self._tcp.processIncomingPacket(msg1, callback, [0, 1]) - assert not result - self._tcp.processIncomingPacket(msg2, callback, [0, 1]) - assert result - assert result.function_code.to_bytes(1,'big') + result.encode() == msg2[7:] - - def test_tcp_framer_populate(self): - """Test a tcp frame packet build.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - expected = ModbusRequest() - expected.transaction_id = 0x0001 - expected.protocol_id = 0x1234 - expected.slave_id = 0xFF - msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x12\x34\x01\x02" - self._tcp.processIncomingPacket(msg, callback, [0, 1]) - # assert self._tcp.checkFrame() - # actual = ModbusRequest() - # self._tcp.populateResult(actual) - # for name in ("transaction_id", "protocol_id", "slave_id"): - # assert getattr(expected, name) == getattr(actual, name) - - # ----------------------------------------------------------------------- # - # TLS tests - # ----------------------------------------------------------------------- # - def test_framer_tls_framer_transaction_ready(self): - """Test a tls frame transaction.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x12\x34\x01\x02" - self._tcp.processIncomingPacket(msg[0:4], callback, [0, 1]) - assert not result - self._tcp.processIncomingPacket(msg[4:], callback, [0, 1]) - assert result - - def test_framer_tls_framer_transaction_full(self): - """Test a full tls frame transaction.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x12\x34\x01\x02" - self._tcp.processIncomingPacket(msg, callback, [0, 1]) - assert result - - def test_framer_tls_framer_transaction_half(self): - """Test a half completed tls frame transaction.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x12\x34\x01\x02" - self._tcp.processIncomingPacket(msg[0:8], callback, [0, 1]) - assert not result - self._tcp.processIncomingPacket(msg[8:], callback, [0, 1]) - assert result - - def test_framer_tls_framer_transaction_short(self): - """Test that we can get back on track after an invalid message.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x12\x34\x01\x02" - self._tcp.processIncomingPacket(msg[0:2], callback, [0, 1]) - assert not result - self._tcp.processIncomingPacket(msg[2:], callback, [0, 1]) - assert result - - def test_framer_tls_framer_decode(self): - """Testmessage decoding.""" - msg1 = b"" - msg2 = b"\x01\x12\x34\x00\x08" - result = self._tls.decode_data(msg1) - assert not result - result = self._tls.decode_data(msg2) - assert result == {"fcode": 1} - - def test_framer_tls_incoming_packet(self): - """Framer tls incoming packet.""" - msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x12\x34\x01\x02" - - slave = 0x01 - msg_result = None - - def mock_callback(result): - """Mock callback.""" - nonlocal msg_result - - msg_result = result.encode() - - self._tls.processIncomingPacket(msg, mock_callback, slave) - # assert msg == msg_result - - # self._tls.isFrameReady = mock.MagicMock(return_value=True) - # x = mock.MagicMock(return_value=False) - # self._tls._validate_slave_id = x - # self._tls.processIncomingPacket(msg, mock_callback, slave) - # assert not self._tls._buffer - # self._tls.advanceFrame() - # x = mock.MagicMock(return_value=True) - # self._tls._validate_slave_id = x - # self._tls.processIncomingPacket(msg, mock_callback, slave) - # assert msg[1:] == msg_result - # self._tls.advanceFrame() - - def test_framer_tls_process(self): - """Framer tls process.""" - # class MockResult: - # """Mock result.""" - - # def __init__(self, code): - # """Init.""" - # self.function_code = code - - # def mock_callback(_arg): - # """Mock callback.""" - - # self._tls.decoder.decode = mock.MagicMock(return_value=None) - # with pytest.raises(ModbusIOException): - # self._tls._process(mock_callback) - - # result = MockResult(0x01) - # self._tls.decoder.decode = mock.MagicMock(return_value=result) - # with pytest.raises(InvalidMessageReceivedException): - # self._tls._process( - # mock_callback, error=True - # ) - # self._tls._process(mock_callback) - # assert not self._tls._buffer - - def test_framer_tls_framer_populate(self): - """Test a tls frame packet build.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x12\x34\x01\x02" - self._tcp.processIncomingPacket(msg, callback, [0, 1]) - assert result - - # ----------------------------------------------------------------------- # - # RTU tests - # ----------------------------------------------------------------------- # - def test_rtu_framer_transaction_ready(self): - """Test if the checks for a complete frame work.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - msg_parts = [b"\x00\x01\x00", b"\x00\x00\x01\xfc\x1b"] - self._rtu.processIncomingPacket(msg_parts[0], callback, [0, 1]) - assert not result - self._rtu.processIncomingPacket(msg_parts[1], callback, [0, 1]) - assert result - - def test_rtu_framer_transaction_full(self): - """Test a full rtu frame transaction.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - msg = b"\x00\x01\x00\x00\x00\x01\xfc\x1b" - self._rtu.processIncomingPacket(msg, callback, [0, 1]) - assert result - - def test_rtu_framer_transaction_half(self): - """Test a half completed rtu frame transaction.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - msg_parts = [b"\x00\x01\x00", b"\x00\x00\x01\xfc\x1b"] - self._rtu.processIncomingPacket(msg_parts[0], callback, [0, 1]) - assert not result - self._rtu.processIncomingPacket(msg_parts[1], callback, [0, 1]) - assert result - - def test_rtu_framer_populate(self): - """Test a rtu frame packet build.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - msg = b"\x00\x01\x00\x00\x00\x01\xfc\x1b" - self._rtu.processIncomingPacket(msg, callback, [0, 1]) - header_dict = self._rtu._header # pylint: disable=protected-access - assert len(msg) == header_dict["len"] - assert int(msg[0]) == header_dict["uid"] - assert msg[-2:] == header_dict["crc"] - - def test_rtu_decode_exception(self): - """Test that the RTU framer can decode errors.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - msg = b"\x00\x90\x02\x9c\x01" - self._rtu.processIncomingPacket(msg, callback, [0, 1]) - assert result - - def test_process(self): - """Test process.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - msg = b"\x00\x01\x00\x00\x00\x01\xfc\x1b" - self._rtu.processIncomingPacket(msg, callback, [0, 1]) - assert result - - def test_rtu_process_incoming_packets(self): - """Test rtu process incoming packets.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - msg = b"\x00\x01\x00\x00\x00\x01\xfc\x1b" - slave = 0x00 - - self._rtu.processIncomingPacket(msg, callback, slave) - assert result - - # ----------------------------------------------------------------------- # - # ASCII tests - # ----------------------------------------------------------------------- # - def test_ascii_framer_transaction_ready(self): - """Test a ascii frame transaction.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - msg = b":F7031389000A60\r\n" - self._ascii.processIncomingPacket(msg, callback, [0,1]) - assert result - - def test_ascii_framer_transaction_full(self): - """Test a full ascii frame transaction.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - msg = b"sss:F7031389000A60\r\n" - self._ascii.processIncomingPacket(msg, callback, [0,1]) - assert result - - def test_ascii_framer_transaction_half(self): - """Test a half completed ascii frame transaction.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - msg_parts = (b"sss:F7031389", b"000A60\r\n") - self._ascii.processIncomingPacket(msg_parts[0], callback, [0,1]) - assert not result - self._ascii.processIncomingPacket(msg_parts[1], callback, [0,1]) - assert result - - def test_ascii_framer_populate(self): - """Test a ascii frame packet build.""" - request = ModbusRequest() - self._ascii.populateResult(request) - assert not request.slave_id - - def test_ascii_process_incoming_packets(self): - """Test ascii process incoming packet.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - msg = b":F7031389000A60\r\n" - self._ascii.processIncomingPacket(msg, callback, [0,1]) - assert result - - # ----------------------------------------------------------------------- # - # Binary tests - # ----------------------------------------------------------------------- # - def test_binary_framer_transaction_ready(self): - """Test a binary frame transaction.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - msg = TEST_MESSAGE - self._binary.processIncomingPacket(msg, callback, [0,1]) - assert result - - def test_binary_framer_transaction_full(self): - """Test a full binary frame transaction.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - msg = TEST_MESSAGE - self._binary.processIncomingPacket(msg, callback, [0,1]) - assert result - - def test_binary_framer_transaction_half(self): - """Test a half completed binary frame transaction.""" - count = 0 - result = None - def callback(data): - """Simulate callback.""" - nonlocal count, result - count += 1 - result = data - - msg_parts = (b"\x7b\x01\x03\x00", b"\x00\x00\x05\x85\xC9\x7d") - self._binary.processIncomingPacket(msg_parts[0], callback, [0,1]) - assert not result - self._binary.processIncomingPacket(msg_parts[1], callback, [0,1]) - assert result - - def test_binary_framer_populate(self): - """Test a binary frame packet build.""" - request = ModbusRequest() - self._binary.populateResult(request) - assert not request.slave_id - - def test_binary_process_incoming_packet(self): - """Test binary process incoming packet.""" - mock_data = TEST_MESSAGE - slave = 0x00 - - def mock_callback(_mock_data): - pass - - self._binary.processIncomingPacket(mock_data, mock_callback, slave) - - # Test failure: - self._binary.checkFrame = mock.MagicMock(return_value=False) - self._binary.processIncomingPacket(mock_data, mock_callback, slave)