Skip to content

Commit

Permalink
Duplicate transactions in UDP. (#1486)
Browse files Browse the repository at this point in the history
  • Loading branch information
janiversen authored Apr 13, 2023
1 parent f9febb9 commit a838255
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 39 deletions.
19 changes: 8 additions & 11 deletions pymodbus/framer/socket_framer.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def decode_data(self, data):
}
return {}

def processIncomingPacket(self, data, callback, slave, **kwargs):
def processIncomingPacket(self, data, callback, slave, tid: int = None, **kwargs):
"""Process new packet pattern.
This takes in a new request packet, adds it to the current
Expand All @@ -148,12 +148,6 @@ def processIncomingPacket(self, data, callback, slave, **kwargs):
The processed and decoded messages are pushed to the callback
function to process and send.
:param data: The new packet data
:param callback: The function to send results to
:param slave: Process if slave id matches, ignore otherwise (could be a
list of slave ids (server) or single slave id(client/server)
:param kwargs:
"""
if not isinstance(slave, (list, tuple)):
slave = [slave]
Expand All @@ -165,7 +159,7 @@ def processIncomingPacket(self, data, callback, slave, **kwargs):
if len(self._buffer):
# Possible error ???
if self._header["len"] < 2:
self._process(callback, error=True)
self._process(callback, tid, error=True)
break
if not self.checkFrame():
Log.debug("Frame check failed, ignoring!!")
Expand All @@ -176,9 +170,9 @@ def processIncomingPacket(self, data, callback, slave, **kwargs):
Log.debug("Not a valid slave id - {}, ignoring!!", header_txt)
self.resetFrame()
continue
self._process(callback)
self._process(callback, tid)

def _process(self, callback, error=False):
def _process(self, callback, tid, error=False):
"""Process incoming packets irrespective error condition."""
data = self.getRawFrame() if error else self.getFrame()
if (result := self.decoder.decode(data)) is None:
Expand All @@ -187,7 +181,10 @@ def _process(self, callback, error=False):
raise InvalidMessageReceivedException(result)
self.populateResult(result)
self.advanceFrame()
callback(result) # defer or push to a thread?
if tid and tid != result.transaction_id:
self.resetFrame()
else:
callback(result) # defer or push to a thread?

def resetFrame(self):
"""Reset the entire message frame.
Expand Down
5 changes: 4 additions & 1 deletion pymodbus/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,10 @@ def execute(self, request): # pylint: disable=too-complex
tid=request.transaction_id,
)
self.client.framer.processIncomingPacket(
response, addTransaction, request.slave_id
response,
addTransaction,
request.slave_id,
tid=request.transaction_id,
)
if not (response := self.getTransaction(request.transaction_id)):
if len(self.transactions):
Expand Down
29 changes: 2 additions & 27 deletions test/test_client_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,37 +84,12 @@ def test_udp_client_recv_duplicate(self):
test_msg = b"\x00\x01\x00\x00\x00\x05\x01\x04\x02\x00\x03"
client = ModbusUdpClient("127.0.0.1")
client.socket = mockSocket(copy_send=False)

# test normal receive
client.socket.mock_prepare_receive(test_msg)
reply_ok = client.read_input_registers(0x820, 1, 1)
assert not reply_ok.isError()
reply_timeout = client.read_input_registers(0x820, 1, 1)
assert reply_timeout.isError()
client.close()

# test duplicate receive
client = ModbusUdpClient("127.0.0.1")
client.socket = mockSocket(copy_send=False)
client.socket.mock_prepare_receive(test_msg)
client.socket.mock_prepare_receive(test_msg)
reply_ok = client.read_input_registers(0x820, 1, 1)
assert not reply_ok.isError()
# ERROR hanging transaction --> reply_timeout = client.read_input_registers(0x820, 1, 1)
# ERROR hanging transaction --> assert reply_timeout.isError()
client.close()

# test duplicate receive with garbage
client = ModbusUdpClient("127.0.0.1")
client.socket = mockSocket(copy_send=False)
client.socket.mock_prepare_receive(test_msg)
client.socket.mock_prepare_receive(test_msg + b"\xf6\x3e")
reply_ok = client.read_input_registers(0x820, 1, 1)
assert not reply_ok.isError()
# ERROR hanging transaction --> reply_timeout = client.read_input_registers(0x820, 1, 1)
# ERROR hanging transaction --> assert reply_timeout.isError()
# ERROR hanging transaction --> reply_timeout = client.read_input_registers(0x820, 1, 1)
# ERROR hanging transaction --> assert reply_timeout.isError()
reply_none = client.read_input_registers(0x40, 10, 1)
assert reply_none.isError()
client.close()

def test_udp_client_repr(self):
Expand Down

0 comments on commit a838255

Please sign in to comment.