diff --git a/test/conftest.py b/test/conftest.py index f1514a852f..0f7d356fe2 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,6 +1,7 @@ """Configure pytest.""" import functools import platform +from collections import deque import pytest @@ -132,6 +133,64 @@ def setblocking(self, _flag): """Set blocking.""" return None +class mockSocket2: # pylint: disable=invalid-name + """Mock socket.""" + + timeout = 2 + + def __init__(self): + """Initialize.""" + self.receive = deque() + self.send = deque() + + def mock_prepare_receive(self, msg): + """Store message.""" + self.receive.append(msg); + + def mock_read_sent(self): + self.send.popleft() + + def mock_retrieve(self, size): + """Get message.""" + if len(self.receive) == 0 or not size: + return b""; + if size >= len(self.receive[0]): + retval = self.receive.popleft() + else: + retval = self.receive[0][0:size] + self.data[0] = self.receive[0][size:-1] + return retval + + def close(self): + """Close.""" + return True + + def recv(self, size): + """Receive.""" + return self.mock_retrieve(size) + + def read(self, size): + """Read.""" + return self.mock_retrieve(size) + + def send(self, msg): + """Send.""" + self.mock_store(msg) + return len(msg) + + def recvfrom(self, size): + """Receive from.""" + return [self.mock_retrieve(size)] + + def sendto(self, msg, *_args): + """Send to.""" + self.send.append(msg) + return len(msg) + + def setblocking(self, _flag): + """Set blocking.""" + return None + def run_coroutine(coro): """Run a coroutine as top-level task by iterating through all yielded steps.""" diff --git a/test/test_client_sync.py b/test/test_client_sync.py index c76cb790d9..c211dd69d5 100755 --- a/test/test_client_sync.py +++ b/test/test_client_sync.py @@ -2,6 +2,7 @@ import ssl from itertools import count from test.conftest import mockSocket +from test.conftest import mockSocket2 from unittest import mock import pytest @@ -79,6 +80,27 @@ def test_udp_client_recv(self): assert client.recv(0) == b"" assert client.recv(4) == b"\x00" * 4 + def test_udp_client_recv_duplicate(self): + """Test the udp client receive method""" + client = ModbusUdpClient("127.0.0.1") + + client.socket = mockSocket2() + client.socket.mock_prepare_receive(b"\x00\x01\x00\x00\x00\x05\x01\x04\x02\x00\x03"); # Response 1 + reply1 = client.read_input_registers(0x820, 1, 1) + client.socket.mock_prepare_receive(b"\x00\x01\x00\x00\x00\x05\x01\x04\x02\x00\x03"); # Duplicate response 1 + client.socket.mock_prepare_receive(b"\x00\x02\x00\x00\x00\x07\x01\x04\x04\x00\x03\xf6\x3e") # Response 2 + reply2 = client.read_input_registers(0x820, 2, 1) + reply3 = client.read_input_registers(0x820, 100, 1) + + print(reply1.registers) + print(reply2.registers) + print(reply3.registers) + print(reply1.transaction_id) + print(reply2.transaction_id) + print(reply3.transaction_id) + + assert 1 == 0 + def test_udp_client_repr(self): """Test udp client representation.""" client = ModbusUdpClient("127.0.0.1")