diff --git a/pymodbus/framer/ascii_framer.py b/pymodbus/framer/ascii_framer.py index d0eaad1e9..957ef94f8 100644 --- a/pymodbus/framer/ascii_framer.py +++ b/pymodbus/framer/ascii_framer.py @@ -45,6 +45,9 @@ def __init__(self, decoder, client=None): # ----------------------------------------------------------------------- # # Private Helper Functions # ----------------------------------------------------------------------- # + def _process(self, callback, error=False): + """Process incoming packets irrespective error condition.""" + def decode_data(self, data): """Decode data.""" if len(data) > 1: diff --git a/pymodbus/framer/binary_framer.py b/pymodbus/framer/binary_framer.py index 123933a22..2ebb17c68 100644 --- a/pymodbus/framer/binary_framer.py +++ b/pymodbus/framer/binary_framer.py @@ -56,6 +56,9 @@ def __init__(self, decoder, client=None): # ----------------------------------------------------------------------- # # Private Helper Functions # ----------------------------------------------------------------------- # + def _process(self, callback, error=False): + """Process incoming packets irrespective error condition.""" + def decode_data(self, data): """Decode data.""" if len(data) > self._hsize: diff --git a/test/test_framers.py b/test/test_framers.py index 6e24a17e6..e3580cea0 100644 --- a/test/test_framers.py +++ b/test/test_framers.py @@ -35,12 +35,6 @@ def fixture_ascii_framer(): return ModbusAsciiFramer(ClientDecoder()) -# @pytest.fixture -# def binary_framer(): -# """Binary framer.""" -# return ModbusBinaryFramer(ClientDecoder()) - - @pytest.mark.parametrize( "framer", [ @@ -98,14 +92,6 @@ def test_framer_initialization(framer): ] -@pytest.mark.parametrize("data", [(b"", {}), (b"abcd", {"fcode": 98, "slave": 97})]) -def test_decode_data(rtu_framer, data): - """Test decode data.""" - data, expected = data - decoded = rtu_framer.decode_data(data) - assert decoded == expected - - @pytest.mark.parametrize( "data", [ @@ -301,12 +287,6 @@ def test_rtu_incoming_packet(rtu_framer, data): assert mock_reset.call_count == (1 if reset_called else 0) -def test_build_packet(rtu_framer): - """Test build packet.""" - message = ReadCoilsRequest(1, 10) - assert rtu_framer.buildPacket(message) == TEST_MESSAGE - - def test_send_packet(rtu_framer): """Test send packet.""" message = TEST_MESSAGE @@ -403,3 +383,60 @@ def _handle_response(_reply): framer = ModbusSocketFramer(ClientDecoder()) framer.processIncomingPacket(message, _handle_response, slave=0) assert response_ok, "Response is valid, but not accepted" + +# ---- 100% coverage +@pytest.mark.parametrize( + ("framer", "message"), + [ + (ModbusAsciiFramer, b':00010001000AF4\r\n',), + (ModbusBinaryFramer, b'{\x00\x01\x00\x01\x00\n\xec\x1c}',), + (ModbusRtuFramer, b"\x00\x01\x00\x01\x00\n\xec\x1c",), + (ModbusSocketFramer, b'\x00\x00\x00\x00\x00\x06\x00\x01\x00\x01\x00\n',), + ] +) +def test_build_packet(framer, message): + """Test build packet.""" + test_framer = framer(ClientDecoder()) + request = ReadCoilsRequest(1, 10) + assert test_framer.buildPacket(request) == message + + +@pytest.mark.parametrize( + ("framer", "message"), + [ + (ModbusAsciiFramer, b':00010001000AF4\r\n',), + (ModbusBinaryFramer, b'{\x00\x01\x00\x01\x00\n\xec\x1c}',), + (ModbusRtuFramer, b"\x00\x01\x00\x01\x00\n\xec\x1c",), + (ModbusSocketFramer, b'\x00\x00\x00\x00\x00\x06\x00\x01\x00\x01\x00\n',), + ] +) +def test_processincomingpacket(framer, message): + """Test processIncomingPacket.""" + test_framer = framer(ClientDecoder()) + with mock.patch.object( + framer, + "_process", + wraps=test_framer._process, # pylint: disable=protected-access + ), mock.patch.object( + test_framer, "resetFrame", wraps=test_framer.resetFrame + ): + test_framer.processIncomingPacket(message, mock.Mock(), 0x01) + +@pytest.mark.parametrize( + ("framer", "message"), + [ + (ModbusAsciiFramer, b':61620001000AF4\r\n',), + (ModbusBinaryFramer, b'{\x61\x62\x00\x01\x00\n\xec\x1c}',), + (ModbusRtuFramer, b"\x61\x62\x00\x01\x00\n\xec\x1c",), + (ModbusSocketFramer, b'\x00\x00\x00\x00\x00\x06\x61\x62\x00\x01\x00\n',), + ] +) +@pytest.mark.parametrize("expected", [{"fcode": 98, "slave": 97}]) +def test_decode_data(framer, message, expected): + """Test decode data.""" + test_framer = framer(ClientDecoder()) + decoded = test_framer.decode_data(b'') + assert decoded == {} + decoded = test_framer.decode_data(message) + assert decoded["fcode"] == expected["fcode"] + assert decoded["slave"] == expected["slave"]