diff --git a/test/message/test_ascii.py b/test/message/test_ascii.py index 7addbffe04..5e46c258bc 100644 --- a/test/message/test_ascii.py +++ b/test/message/test_ascii.py @@ -14,21 +14,6 @@ def prepare_frame(): return MessageAscii([1], False) - def test_check_LRC(self): - """Test check_LRC.""" - data = b'\x12\x34\x23\x45\x34\x56\x45\x67' - assert MessageAscii.check_LRC(data, 0x1C) - - def test_check_noLRC(self): - """Test check_LRC.""" - data = b'\x12\x34\x23\x45\x34\x56\x45\x67' - assert not MessageAscii.check_LRC(data, 0x0C) - - def test_compute_LRC(self): - """Test compute_LRC.""" - data = b'\x12\x34\x23\x45\x34\x56\x45\x67' - assert MessageAscii.compute_LRC(data) == 0x1c - def test_roundtrip_LRC(self): """Test combined compute/check LRC.""" data = b'\x12\x34\x23\x45\x34\x56\x45\x67' diff --git a/test/message/test_message.py b/test/message/test_message.py index 436c5a5363..dce85fb105 100644 --- a/test/message/test_message.py +++ b/test/message/test_message.py @@ -5,6 +5,8 @@ import pytest from pymodbus.message import MessageType +from pymodbus.message.ascii import MessageAscii +from pymodbus.message.rtu import MessageRTU from pymodbus.transport import CommParams @@ -96,3 +98,22 @@ async def test_encode(self, msg, data, dev_id, tid, res_data): """Test decode method in all types.""" t_data = msg.msg_handle.encode(data, dev_id, tid) assert res_data == t_data + + + @pytest.mark.parametrize( + ("func", "lrc", "expect"), + [(MessageAscii.check_LRC, 0x1c, True), + (MessageAscii.check_LRC, 0x0c, False), + (MessageAscii.compute_LRC, None, 0x1c), + (MessageRTU.check_CRC, 0xE2DB, True), + (MessageRTU.check_CRC, 0xDBE2, False), + (MessageRTU.compute_CRC, None, 0xE2DB), + ] + ) + def test_LRC_CRC(self, func, lrc, expect): + """Test check_LRC.""" + data = b'\x12\x34\x23\x45\x34\x56\x45\x67' + if lrc: + assert expect == func(data, lrc) + else: + assert expect == func(data) diff --git a/test/message/test_rtu.py b/test/message/test_rtu.py index b31f72fcdc..4ae1244d44 100644 --- a/test/message/test_rtu.py +++ b/test/message/test_rtu.py @@ -13,28 +13,12 @@ def prepare_frame(): """Return message object.""" return MessageRTU([1], False) - def test_crc16_table(self): """Test the crc16 table is prefilled.""" assert len(MessageRTU.crc16_table) == 256 assert isinstance(MessageRTU.crc16_table[0], int) assert isinstance(MessageRTU.crc16_table[255], int) - def test_check_CRC(self): - """Test check_CRC.""" - data = b'\x12\x34\x23\x45\x34\x56\x45\x67' - assert MessageRTU.check_CRC(data, 0xE2DB) - - def test_check_noCRC(self): - """Test check_CRC.""" - data = b'\x12\x34\x23\x45\x34\x56\x45\x67' - assert not MessageRTU.check_CRC(data, 0xDBE2) - - def test_compute_CRC(self): - """Test compute_CRC.""" - data = b'\x12\x34\x23\x45\x34\x56\x45\x67' - assert MessageRTU.compute_CRC(data) == 0xE2DB - def test_roundtrip_CRC(self): """Test combined compute/check CRC.""" data = b'\x12\x34\x23\x45\x34\x56\x45\x67' diff --git a/test/message/to_do_framers_py b/test/message/to_do_framers.py similarity index 100% rename from test/message/to_do_framers_py rename to test/message/to_do_framers.py diff --git a/test/message/to_do_server_multidrop_py b/test/message/to_do_server_multidrop.py similarity index 100% rename from test/message/to_do_server_multidrop_py rename to test/message/to_do_server_multidrop.py diff --git a/test/message/to_do_transaction_py b/test/message/to_do_transaction.py similarity index 57% rename from test/message/to_do_transaction_py rename to test/message/to_do_transaction.py index 48a182560a..233730dd12 100755 --- a/test/message/to_do_transaction_py +++ b/test/message/to_do_transaction.py @@ -1,12 +1,8 @@ """Test transaction.""" -from binascii import a2b_hex from itertools import count from unittest import mock -import pytest - from pymodbus.exceptions import ( - InvalidMessageReceivedException, ModbusIOException, ) from pymodbus.factory import ServerDecoder @@ -239,107 +235,128 @@ class Request: # pylint: disable=too-few-public-methods # ----------------------------------------------------------------------- # # TCP tests # ----------------------------------------------------------------------- # - @pytest.mark.skip() def test_tcp_framer_transaction_ready(self): """Test a tcp frame transaction.""" - msg = b"\x00\x01\x12\x34\x00\x04\xff\x02\x12\x34" - assert not self._tcp.isFrameReady() - assert not self._tcp.checkFrame() + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x01\x02\x00\x08" + self._tcp.processIncomingPacket(msg, callback, [1]) self._tcp._buffer = msg # pylint: disable=protected-access - assert self._tcp.isFrameReady() - assert self._tcp.checkFrame() - self._tcp.advanceFrame() - assert not self._tcp.isFrameReady() - assert not self._tcp.checkFrame() - # assert self._ascii.getFrame() == b"" - - @pytest.mark.skip() + def test_tcp_framer_transaction_full(self): """Test a full tcp frame transaction.""" - msg = b"\x00\x01\x12\x34\x00\x04\xff\x02\x12\x34" - self._tcp._buffer = msg # pylint: disable=protected-access - assert self._tcp.checkFrame() - result = self._tcp.getFrame() - assert result == msg[7:] - self._tcp.advanceFrame() + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x01\x02\x00\x08" + self._tcp.processIncomingPacket(msg, callback, [0, 1]) + assert result.function_code.to_bytes(1,'big') + result.encode() == msg[7:] - @pytest.mark.skip() def test_tcp_framer_transaction_half(self): """Test a half completed tcp frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + msg1 = b"\x00\x01\x12\x34\x00" - msg2 = b"\x04\xff\x02\x12\x34" - self._tcp._buffer = msg1 # pylint: disable=protected-access - assert not self._tcp.checkFrame() - result = self._tcp.getFrame() - assert result == b"" - self._tcp._buffer += msg2 - assert self._tcp.checkFrame() - result = self._tcp.getFrame() - assert result == msg2[2:] - self._tcp.advanceFrame() - - @pytest.mark.skip() + msg2 = b"\x06\xff\x02\x01\x02\x00\x08" + self._tcp.processIncomingPacket(msg1, callback, [0, 1]) + assert not result + self._tcp.processIncomingPacket(msg2, callback, [0, 1]) + assert result + assert result.function_code.to_bytes(1,'big') + result.encode() == msg2[2:] + def test_tcp_framer_transaction_half2(self): """Test a half completed tcp frame transaction.""" - msg1 = b"\x00\x01\x12\x34\x00\x04\xff" - msg2 = b"\x02\x12\x34" - self._tcp._buffer = msg1 # pylint: disable=protected-access - assert not self._tcp.checkFrame() - result = self._tcp.getFrame() - assert result == b"" - self._tcp._buffer += msg2 - assert self._tcp.checkFrame() - result = self._tcp.getFrame() - assert msg2 == result - self._tcp.advanceFrame() - - @pytest.mark.skip() + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg1 = b"\x00\x01\x12\x34\x00\x06\xff" + msg2 = b"\x02\x01\x02\x00\x08" + self._tcp.processIncomingPacket(msg1, callback, [0, 1]) + assert not result + self._tcp.processIncomingPacket(msg2, callback, [0, 1]) + assert result + assert result.function_code.to_bytes(1,'big') + result.encode() == msg2 + def test_tcp_framer_transaction_half3(self): """Test a half completed tcp frame transaction.""" - msg1 = b"\x00\x01\x12\x34\x00\x04\xff\x02\x12" - msg2 = b"\x34" - self._tcp._buffer = msg1 # pylint: disable=protected-access - assert not self._tcp.checkFrame() - result = self._tcp.getFrame() - assert result == msg1[7:] - self._tcp._buffer += msg2 - assert self._tcp.checkFrame() - result = self._tcp.getFrame() - assert result == msg1[7:] + msg2 - self._tcp.advanceFrame() - - @pytest.mark.skip() + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg1 = b"\x00\x01\x12\x34\x00\x06\xff\x02\x01\x02\x00" + msg2 = b"\x08" + self._tcp.processIncomingPacket(msg1, callback, [0, 1]) + assert not result + self._tcp.processIncomingPacket(msg2, callback, [0, 1]) + assert result + assert result.function_code.to_bytes(1,'big') + result.encode() == msg1[7:] + msg2 + def test_tcp_framer_transaction_short(self): """Test that we can get back on track after an invalid message.""" - msg1 = b"\x99\x99\x99\x99\x00\x01\x00\x01" - msg2 = b"\x00\x01\x12\x34\x00\x04\xff\x02\x12\x34" - self._tcp._buffer = msg1 # pylint: disable=protected-access - assert not self._tcp.checkFrame() - result = self._tcp.getFrame() - assert result == b"" - self._tcp.advanceFrame() - self._tcp._buffer += msg2 - assert len(self._tcp._buffer) == 10 # pylint: disable=protected-access - assert self._tcp.checkFrame() - result = self._tcp.getFrame() - assert result == msg2[7:] - self._tcp.advanceFrame() - - @pytest.mark.skip() + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + # msg1 = b"\x99\x99\x99\x99\x00\x01\x00\x17" + msg1 = b'' + msg2 = b"\x00\x01\x12\x34\x00\x06\xff\x02\x01\x02\x00\x08" + self._tcp.processIncomingPacket(msg1, callback, [0, 1]) + assert not result + self._tcp.processIncomingPacket(msg2, callback, [0, 1]) + assert result + assert result.function_code.to_bytes(1,'big') + result.encode() == msg2[7:] + def test_tcp_framer_populate(self): """Test a tcp frame packet build.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + expected = ModbusRequest() expected.transaction_id = 0x0001 expected.protocol_id = 0x1234 expected.slave_id = 0xFF - msg = b"\x00\x01\x12\x34\x00\x04\xff\x02\x12\x34" - self._tcp._buffer = msg # pylint: disable=protected-access - assert self._tcp.checkFrame() - actual = ModbusRequest() - self._tcp.populateResult(actual) - for name in ("transaction_id", "protocol_id", "slave_id"): - assert getattr(expected, name) == getattr(actual, name) - self._tcp.advanceFrame() + msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x12\x34\x01\x02" + self._tcp.processIncomingPacket(msg, callback, [0, 1]) + # assert self._tcp.checkFrame() + # actual = ModbusRequest() + # self._tcp.populateResult(actual) + # for name in ("transaction_id", "protocol_id", "slave_id"): + # assert getattr(expected, name) == getattr(actual, name) def test_tcp_framer_packet(self): """Test a tcp frame packet build.""" @@ -358,63 +375,68 @@ def test_tcp_framer_packet(self): # ----------------------------------------------------------------------- # # TLS tests # ----------------------------------------------------------------------- # - @pytest.mark.skip() def test_framer_tls_framer_transaction_ready(self): """Test a tls frame transaction.""" - msg = b"\x01\x12\x34\x00\x08" - assert not self._tls.isFrameReady() - assert not self._tls.checkFrame() - self._tls._buffer = msg # pylint: disable=protected-access - assert self._tls.isFrameReady() - assert self._tls.checkFrame() - self._tls.advanceFrame() - assert not self._tls.isFrameReady() - assert not self._tls.checkFrame() - assert self._tls.getFrame() == b"" - - @pytest.mark.skip() + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x12\x34\x01\x02" + self._tcp.processIncomingPacket(msg[0:4], callback, [0, 1]) + assert not result + self._tcp.processIncomingPacket(msg[4:], callback, [0, 1]) + assert result + def test_framer_tls_framer_transaction_full(self): """Test a full tls frame transaction.""" - msg = b"\x01\x12\x34\x00\x08" - self._tls._buffer = msg # pylint: disable=protected-access - assert self._tls.checkFrame() - result = self._tls.getFrame() - assert result == msg[0:] - self._tls.advanceFrame() - - @pytest.mark.skip() + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x12\x34\x01\x02" + self._tcp.processIncomingPacket(msg, callback, [0, 1]) + assert result + def test_framer_tls_framer_transaction_half(self): """Test a half completed tls frame transaction.""" - msg1 = b"" - msg2 = b"\x01\x12\x34\x00\x08" - self._tls._buffer = msg1 # pylint: disable=protected-access - assert not self._tls.checkFrame() - result = self._tls.getFrame() - assert result == b"" - self._tls._buffer += msg2 - assert self._tls.checkFrame() - result = self._tls.getFrame() - assert result == msg2[0:] - self._tls.advanceFrame() - - @pytest.mark.skip() + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x12\x34\x01\x02" + self._tcp.processIncomingPacket(msg[0:8], callback, [0, 1]) + assert not result + self._tcp.processIncomingPacket(msg[8:], callback, [0, 1]) + assert result + def test_framer_tls_framer_transaction_short(self): """Test that we can get back on track after an invalid message.""" - msg1 = b"" - msg2 = b"\x01\x12\x34\x00\x08" - self._tls._buffer = msg1 # pylint: disable=protected-access - assert not self._tls.checkFrame() - result = self._tls.getFrame() - assert result == b"" - self._tls.advanceFrame() - self._tls._buffer = msg2 # pylint: disable=protected-access - assert len(self._tls._buffer) == 5 # pylint: disable=protected-access - assert self._tls.checkFrame() - result = self._tls.getFrame() - assert result == msg2[0:] - self._tls.advanceFrame() - - @pytest.mark.skip() + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x12\x34\x01\x02" + self._tcp.processIncomingPacket(msg[0:2], callback, [0, 1]) + assert not result + self._tcp.processIncomingPacket(msg[2:], callback, [0, 1]) + assert result + def test_framer_tls_framer_decode(self): """Testmessage decoding.""" msg1 = b"" @@ -423,12 +445,10 @@ def test_framer_tls_framer_decode(self): assert not result result = self._tls.decode_data(msg2) assert result == {"fcode": 1} - self._tls.advanceFrame() - @pytest.mark.skip() def test_framer_tls_incoming_packet(self): """Framer tls incoming packet.""" - msg = b"\x01\x12\x34\x00\x08" + msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x12\x34\x01\x02" slave = 0x01 msg_result = None @@ -439,60 +459,59 @@ def mock_callback(result): msg_result = result.encode() - self._tls.isFrameReady = mock.MagicMock(return_value=False) - self._tls.processIncomingPacket(msg, mock_callback, slave) - assert msg == self._tls._buffer # pylint: disable=protected-access - self._tls.advanceFrame() - - self._tls.isFrameReady = mock.MagicMock(return_value=True) - x = mock.MagicMock(return_value=False) - self._tls._validate_slave_id = x # pylint: disable=protected-access - self._tls.processIncomingPacket(msg, mock_callback, slave) - assert not self._tls._buffer # pylint: disable=protected-access - self._tls.advanceFrame() - x = mock.MagicMock(return_value=True) - self._tls._validate_slave_id = x # pylint: disable=protected-access self._tls.processIncomingPacket(msg, mock_callback, slave) - assert msg[1:] == msg_result - self._tls.advanceFrame() + # assert msg == msg_result + + # self._tls.isFrameReady = mock.MagicMock(return_value=True) + # x = mock.MagicMock(return_value=False) + # self._tls._validate_slave_id = x + # self._tls.processIncomingPacket(msg, mock_callback, slave) + # assert not self._tls._buffer + # self._tls.advanceFrame() + # x = mock.MagicMock(return_value=True) + # self._tls._validate_slave_id = x + # self._tls.processIncomingPacket(msg, mock_callback, slave) + # assert msg[1:] == msg_result + # self._tls.advanceFrame() - @pytest.mark.skip() def test_framer_tls_process(self): """Framer tls process.""" + # class MockResult: + # """Mock result.""" - class MockResult: # pylint: disable=too-few-public-methods - """Mock result.""" - - def __init__(self, code): - """Init.""" - self.function_code = code + # def __init__(self, code): + # """Init.""" + # self.function_code = code - def mock_callback(_arg): - """Mock callback.""" + # def mock_callback(_arg): + # """Mock callback.""" - self._tls.decoder.decode = mock.MagicMock(return_value=None) - with pytest.raises(ModbusIOException): - self._tls._process(mock_callback) # pylint: disable=protected-access + # self._tls.decoder.decode = mock.MagicMock(return_value=None) + # with pytest.raises(ModbusIOException): + # self._tls._process(mock_callback) - result = MockResult(0x01) - self._tls.decoder.decode = mock.MagicMock(return_value=result) - with pytest.raises(InvalidMessageReceivedException): - self._tls._process( # pylint: disable=protected-access - mock_callback, error=True - ) - self._tls._process(mock_callback) # pylint: disable=protected-access - assert not self._tls._buffer # pylint: disable=protected-access + # result = MockResult(0x01) + # self._tls.decoder.decode = mock.MagicMock(return_value=result) + # with pytest.raises(InvalidMessageReceivedException): + # self._tls._process( + # mock_callback, error=True + # ) + # self._tls._process(mock_callback) + # assert not self._tls._buffer - @pytest.mark.skip() def test_framer_tls_framer_populate(self): """Test a tls frame packet build.""" - ModbusRequest() - msg = b"\x01\x12\x34\x00\x08" - self._tls._buffer = msg # pylint: disable=protected-access - assert self._tls.checkFrame() - actual = ModbusRequest() - self._tls.populateResult(actual) - self._tls.advanceFrame() + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x12\x34\x01\x02" + self._tcp.processIncomingPacket(msg, callback, [0, 1]) + assert result def test_framer_tls_framer_packet(self): """Test a tls frame packet build.""" @@ -508,59 +527,68 @@ def test_framer_tls_framer_packet(self): # ----------------------------------------------------------------------- # # RTU tests # ----------------------------------------------------------------------- # - @pytest.mark.skip() def test_rtu_framer_transaction_ready(self): """Test if the checks for a complete frame work.""" - assert not self._rtu.isFrameReady() + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data msg_parts = [b"\x00\x01\x00", b"\x00\x00\x01\xfc\x1b"] - self._rtu._buffer = msg_parts[0] # pylint: disable=protected-access - assert not self._rtu.isFrameReady() - assert not self._rtu.checkFrame() - - self._rtu._buffer += msg_parts[1] - assert self._rtu.isFrameReady() - assert self._rtu.checkFrame() + self._rtu.processIncomingPacket(msg_parts[0], callback, [0, 1]) + assert not result + self._rtu.processIncomingPacket(msg_parts[1], callback, [0, 1]) + assert result - @pytest.mark.skip() def test_rtu_framer_transaction_full(self): """Test a full rtu frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + msg = b"\x00\x01\x00\x00\x00\x01\xfc\x1b" - stripped_msg = msg[1:-2] - self._rtu._buffer = msg # pylint: disable=protected-access - assert self._rtu.checkFrame() - result = self._rtu.getFrame() - assert stripped_msg == result - self._rtu.advanceFrame() - - @pytest.mark.skip() + self._rtu.processIncomingPacket(msg, callback, [0, 1]) + assert result + def test_rtu_framer_transaction_half(self): """Test a half completed rtu frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + msg_parts = [b"\x00\x01\x00", b"\x00\x00\x01\xfc\x1b"] - stripped_msg = b"".join(msg_parts)[1:-2] - self._rtu._buffer = msg_parts[0] # pylint: disable=protected-access - assert not self._rtu.checkFrame() - self._rtu._buffer += msg_parts[1] - assert self._rtu.isFrameReady() - assert self._rtu.checkFrame() - result = self._rtu.getFrame() - assert stripped_msg == result - self._rtu.advanceFrame() - - @pytest.mark.skip() + self._rtu.processIncomingPacket(msg_parts[0], callback, [0, 1]) + assert not result + self._rtu.processIncomingPacket(msg_parts[1], callback, [0, 1]) + assert result + def test_rtu_framer_populate(self): """Test a rtu frame packet build.""" - request = ModbusRequest() - msg = b"\x00\x01\x00\x00\x00\x01\xfc\x1b" - self._rtu._buffer = msg # pylint: disable=protected-access - self._rtu.populateHeader() - self._rtu.populateResult(request) + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + msg = b"\x00\x01\x00\x00\x00\x01\xfc\x1b" + self._rtu.processIncomingPacket(msg, callback, [0, 1]) header_dict = self._rtu._header # pylint: disable=protected-access assert len(msg) == header_dict["len"] assert int(msg[0]) == header_dict["uid"] assert msg[-2:] == header_dict["crc"] - assert not request.slave_id def test_rtu_framer_packet(self): """Test a rtu frame packet build.""" @@ -574,103 +602,96 @@ def test_rtu_framer_packet(self): assert expected == actual ModbusRequest.encode = old_encode - @pytest.mark.skip() def test_rtu_decode_exception(self): """Test that the RTU framer can decode errors.""" - message = b"\x00\x90\x02\x9c\x01" - self._rtu._buffer = message # pylint: disable=protected-access - result = self._rtu.checkFrame() + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b"\x00\x90\x02\x9c\x01" + self._rtu.processIncomingPacket(msg, callback, [0, 1]) assert result - @pytest.mark.skip() def test_process(self): """Test process.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data - class MockResult: # pylint: disable=too-few-public-methods - """Mock result.""" - - def __init__(self, code): - self.function_code = code - - def mock_callback(_arg): - """Mock callback.""" - - mock_result = MockResult(code=0) - self._rtu.getFrame = mock.MagicMock() - self._rtu.decoder = mock.MagicMock() - self._rtu.decoder.decode = mock.MagicMock(return_value=mock_result) - self._rtu.populateResult = mock.MagicMock() - self._rtu.advanceFrame = mock.MagicMock() - - self._rtu._process(mock_callback) # pylint: disable=protected-access - self._rtu.populateResult.assert_called_with(mock_result) - self._rtu.advanceFrame.assert_called_with() - assert self._rtu.advanceFrame.called - - # Check errors - self._rtu.decoder.decode = mock.MagicMock(return_value=None) - with pytest.raises(ModbusIOException): - self._rtu._process(mock_callback) # pylint: disable=protected-access + msg = b"\x00\x01\x00\x00\x00\x01\xfc\x1b" + self._rtu.processIncomingPacket(msg, callback, [0, 1]) + assert result - @pytest.mark.skip() def test_rtu_process_incoming_packets(self): """Test rtu process incoming packets.""" - mock_data = b"\x00\x01\x00\x00\x00\x01\xfc\x1b" - slave = 0x00 + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data - def mock_callback(): - """Mock callback.""" - - self._rtu._buffer = mock.MagicMock() # pylint: disable=protected-access - self._rtu._process = mock.MagicMock() # pylint: disable=protected-access - self._rtu.isFrameReady = mock.MagicMock(return_value=False) - self._rtu._buffer = mock_data # pylint: disable=protected-access + msg = b"\x00\x01\x00\x00\x00\x01\xfc\x1b" + slave = 0x00 - self._rtu.processIncomingPacket(mock_data, mock_callback, slave) + self._rtu.processIncomingPacket(msg, callback, slave) + assert result # ----------------------------------------------------------------------- # # ASCII tests # ----------------------------------------------------------------------- # - @pytest.mark.skip() def test_ascii_framer_transaction_ready(self): """Test a ascii frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + msg = b":F7031389000A60\r\n" - assert not self._ascii.isFrameReady() - assert not self._ascii.checkFrame() - self._ascii._buffer = msg # pylint: disable=protected-access - assert self._ascii.isFrameReady() - assert self._ascii.checkFrame() - self._ascii.advanceFrame() - assert not self._ascii.isFrameReady() - assert not self._ascii.checkFrame() - assert not self._ascii.getFrame() - - @pytest.mark.skip() + self._ascii.processIncomingPacket(msg, callback, [0,1]) + assert result + def test_ascii_framer_transaction_full(self): """Test a full ascii frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + msg = b"sss:F7031389000A60\r\n" - pack = a2b_hex(msg[6:-4]) - self._ascii._buffer = msg # pylint: disable=protected-access - assert self._ascii.checkFrame() - result = self._ascii.getFrame() - assert pack == result - self._ascii.advanceFrame() - - @pytest.mark.skip() + self._ascii.processIncomingPacket(msg, callback, [0,1]) + assert result + def test_ascii_framer_transaction_half(self): """Test a half completed ascii frame transaction.""" - msg1 = b"sss:F7031389" - msg2 = b"000A60\r\n" - pack = a2b_hex(msg1[6:] + msg2[:-4]) - self._ascii._buffer = msg1 # pylint: disable=protected-access - assert not self._ascii.checkFrame() - result = self._ascii.getFrame() + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg_parts = (b"sss:F7031389", b"000A60\r\n") + self._ascii.processIncomingPacket(msg_parts[0], callback, [0,1]) assert not result - self._ascii._buffer += msg2 - assert self._ascii.checkFrame() - result = self._ascii.getFrame() - assert pack == result - self._ascii.advanceFrame() + self._ascii.processIncomingPacket(msg_parts[1], callback, [0,1]) + assert result def test_ascii_framer_populate(self): """Test a ascii frame packet build.""" @@ -692,62 +713,64 @@ def test_ascii_framer_packet(self): def test_ascii_process_incoming_packets(self): """Test ascii process incoming packet.""" - mock_data = b":F7031389000A60\r\n" - slave = 0x00 - - def mock_callback(_mock_data, *_args, **_kwargs): - """Mock callback.""" - - self._ascii.processIncomingPacket(mock_data, mock_callback, slave) + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data - # Test failure: - self._ascii.checkFrame = mock.MagicMock(return_value=False) - self._ascii.processIncomingPacket(mock_data, mock_callback, slave) + msg = b":F7031389000A60\r\n" + self._ascii.processIncomingPacket(msg, callback, [0,1]) + assert result # ----------------------------------------------------------------------- # # Binary tests # ----------------------------------------------------------------------- # - @pytest.mark.skip() def test_binary_framer_transaction_ready(self): """Test a binary frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + msg = TEST_MESSAGE - assert not self._binary.isFrameReady() - assert not self._binary.checkFrame() - self._binary._buffer = msg # pylint: disable=protected-access - assert self._binary.isFrameReady() - assert self._binary.checkFrame() - self._binary.advanceFrame() - assert not self._binary.isFrameReady() - assert not self._binary.checkFrame() - assert not self._binary.getFrame() - - @pytest.mark.skip() + self._binary.processIncomingPacket(msg, callback, [0,1]) + assert result + def test_binary_framer_transaction_full(self): """Test a full binary frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + msg = TEST_MESSAGE - pack = msg[2:-3] - self._binary._buffer = msg # pylint: disable=protected-access - assert self._binary.checkFrame() - result = self._binary.getFrame() - assert pack == result - self._binary.advanceFrame() - - @pytest.mark.skip() + self._binary.processIncomingPacket(msg, callback, [0,1]) + assert result + def test_binary_framer_transaction_half(self): """Test a half completed binary frame transaction.""" - msg1 = b"\x7b\x01\x03\x00" - msg2 = b"\x00\x00\x05\x85\xC9\x7d" - pack = msg1[2:] + msg2[:-3] - self._binary._buffer = msg1 # pylint: disable=protected-access - assert not self._binary.checkFrame() - result = self._binary.getFrame() + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg_parts = (b"\x7b\x01\x03\x00", b"\x00\x00\x05\x85\xC9\x7d") + self._binary.processIncomingPacket(msg_parts[0], callback, [0,1]) assert not result - self._binary._buffer += msg2 - - assert self._binary.checkFrame() - result = self._binary.getFrame() - assert pack == result - self._binary.advanceFrame() + self._binary.processIncomingPacket(msg_parts[1], callback, [0,1]) + assert result def test_binary_framer_populate(self): """Test a binary frame packet build."""