diff --git a/pymodbus/client/tcp.py b/pymodbus/client/tcp.py index d38b8e871..99c94bef6 100644 --- a/pymodbus/client/tcp.py +++ b/pymodbus/client/tcp.py @@ -241,14 +241,16 @@ def recv(self, size: int | None) -> bytes: time_ = time.time() # If size isn't specified continue to read until timeout expires. - if size: - recv_size = size - data_length - + if not size: + break # Timeout is reduced also if some data has been received in order # to avoid infinite loops when there isn't an expected response # size and the slave sends noisy data continuously. if time_ > end: break + + recv_size = size - data_length + self.last_frame_end = round(time.time(), 6) return b"".join(data) diff --git a/pymodbus/client/udp.py b/pymodbus/client/udp.py index c00064da2..a9b6d2a50 100644 --- a/pymodbus/client/udp.py +++ b/pymodbus/client/udp.py @@ -206,7 +206,7 @@ def recv(self, size: int | None) -> bytes: if not self.socket: raise ConnectionException(str(self)) if size is None: - size = 0 + size = 4096 data = self.socket.recvfrom(size)[0] self.last_frame_end = round(time.time(), 6) return data diff --git a/pymodbus/framer/tls.py b/pymodbus/framer/tls.py index 471e1fdef..a1f2ac991 100644 --- a/pymodbus/framer/tls.py +++ b/pymodbus/framer/tls.py @@ -8,10 +8,16 @@ class FramerTLS(FramerBase): """Modbus TLS frame type. Layout:: - [ Function Code] [ Data ] - 1b Nb + + [ MBAP Header ] [ Function Code] [ Data ] + [ tid ][ pid ][ length ][ uid ] + 2b 2b 2b 1b 1b Nb + + length = uid + function code + data """ + MIN_SIZE = 8 + def decode(self, data: bytes) -> tuple[int, int, int, bytes]: """Decode MDAP+PDU.""" tid = int.from_bytes(data[0:2], 'big') diff --git a/pymodbus/transaction.py b/pymodbus/transaction.py index dbcc0dda0..6c2baa8a5 100644 --- a/pymodbus/transaction.py +++ b/pymodbus/transaction.py @@ -7,26 +7,17 @@ "SyncModbusTransactionManager", ] -import struct -import time -from contextlib import suppress from threading import RLock from typing import TYPE_CHECKING from pymodbus.exceptions import ( - ConnectionException, - InvalidMessageReceivedException, ModbusIOException, ) from pymodbus.framer import ( - FramerAscii, - FramerRTU, FramerSocket, - FramerTLS, ) from pymodbus.logging import Log -from pymodbus.pdu import ExceptionResponse, ModbusPDU -from pymodbus.transport import CommType +from pymodbus.pdu import ModbusPDU from pymodbus.utilities import ModbusTransactionState, hexlify_packets @@ -113,22 +104,7 @@ def reset(self): class SyncModbusTransactionManager(ModbusTransactionManager): - """Implement a transaction for a manager. - - The transaction protocol can be represented by the following pseudo code:: - - count = 0 - do - result = send(message) - if (timeout or result == bad) - count++ - else break - while (count < 3) - - This module helps to abstract this away from the framer and protocol. - - Results are keyed based on the supplied transaction id. - """ + """Implement a transaction for a manager.""" def __init__(self, client: ModbusBaseSyncClient, retries): """Initialize an instance of the ModbusTransactionManager.""" @@ -137,278 +113,101 @@ def __init__(self, client: ModbusBaseSyncClient, retries): self.retries = retries self._transaction_lock = RLock() self.databuffer = b'' - if client: - self._set_adu_size() - - def _set_adu_size(self): - """Set adu size.""" - # base ADU size of modbus frame in bytes - if isinstance(self.client.framer, FramerSocket): - self.base_adu_size = 7 # tid(2), pid(2), length(2), uid(1) - elif isinstance(self.client.framer, FramerRTU): - self.base_adu_size = 3 # address(1), CRC(2) - elif isinstance(self.client.framer, FramerAscii): - self.base_adu_size = 7 # start(1)+ Address(2), LRC(2) + end(2) - elif isinstance(self.client.framer, FramerTLS): - self.base_adu_size = 0 # no header and footer - else: - self.base_adu_size = -1 - def _calculate_response_length(self, expected_pdu_size): - """Calculate response length.""" - if self.base_adu_size == -1: - return None - return self.base_adu_size + expected_pdu_size - - def _calculate_exception_length(self): - """Return the length of the Modbus Exception Response according to the type of Framer.""" - if isinstance(self.client.framer, (FramerSocket, FramerTLS)): - return self.base_adu_size + 2 # Fcode(1), ExceptionCode(1) - if isinstance(self.client.framer, FramerAscii): - return self.base_adu_size + 4 # Fcode(2), ExceptionCode(2) - if isinstance(self.client.framer, FramerRTU): - return self.base_adu_size + 2 # Fcode(1), ExceptionCode(1) - return None - - def _validate_response(self, response): - """Validate Incoming response against request.""" - if not response: + def send_request(self, request: ModbusPDU) -> bool: + """Build and send request.""" + self.client.connect() + packet = self.client.framer.buildFrame(request) + Log.debug("SEND: {}", packet, ":hex") + if (size := self.client.send(packet)) != len(packet): + Log.error(f"Only sent {size} of {len(packet)} bytes") + return False + if self.client.comm_params.handle_local_echo and self.client.recv(size) != packet: + Log.error("Wrong local echo") return False return True + def receive_response(self) -> ModbusPDU | None: + """Receive until PDU is correct or timeout.""" + while True: + if not (data := self.client.recv(None)): + return None + self.databuffer += data + used_len, pdu = self.client.framer.processIncomingFrame(self.databuffer) + self.databuffer = self.databuffer[used_len:] + if pdu: + return pdu + def execute(self, no_response_expected: bool, request: ModbusPDU): # noqa: C901 """Start the producer to send the next request to consumer.write(Frame(request)).""" with self._transaction_lock: - try: - Log.debug( - "Current transaction state - {}", - ModbusTransactionState.to_string(self.client.state), - ) - if isinstance(self.client.framer, FramerSocket): - request.transaction_id = self.getNextTID() - else: - request.transaction_id = 0 - Log.debug("Running transaction {}", request.transaction_id) - if _buffer := hexlify_packets( - self.databuffer - ): - Log.debug("Clearing current Frame: - {}", _buffer) - self.databuffer = b'' - expected_response_length = None - if not isinstance(self.client.framer, FramerSocket): - response_pdu_size = request.get_response_pdu_size() - if isinstance(self.client.framer, FramerAscii): - response_pdu_size *= 2 - if response_pdu_size: - expected_response_length = ( - self._calculate_response_length(response_pdu_size) - ) - full = False - if self.client.comm_params.comm_type == CommType.UDP: - full = True - if not expected_response_length: - expected_response_length = 1024 - response, last_exception = self._transact( - no_response_expected, - request, - expected_response_length, - full=full, - ) - if no_response_expected: - return None - self.databuffer += response - used_len, pdu = self.client.framer.processIncomingFrame(self.databuffer) - self.databuffer = self.databuffer[used_len:] - if pdu: - self.addTransaction(pdu) - if not (result := self.getTransaction(request.transaction_id)): - if len(self.transactions): - result = self.getTransaction(0) - else: - last_exception = last_exception or ( - "No Response received from the remote slave" - "/Unable to decode response" - ) - result = ModbusIOException( - last_exception, request.function_code - ) - self.client.close() - if hasattr(self.client, "state"): - Log.debug( - "Changing transaction state from " - '"PROCESSING REPLY" to ' - '"TRANSACTION_COMPLETE"' - ) - self.client.state = ModbusTransactionState.TRANSACTION_COMPLETE - return result - except ModbusIOException as exc: - # Handle decode errors method - Log.error("Modbus IO exception {}", exc) - self.client.state = ModbusTransactionState.TRANSACTION_COMPLETE - self.client.close() - return exc - - def _retry_transaction(self, no_response_expected, retries, reason, packet, response_length, full=False): - """Retry transaction.""" - Log.debug("Retry on {} response - {}", reason, retries) - Log.debug('Changing transaction state from "WAITING_FOR_REPLY" to "RETRYING"') - self.client.state = ModbusTransactionState.RETRYING - self.client.connect() - if hasattr(self.client, "_in_waiting"): - if ( - in_waiting := self.client._in_waiting() # pylint: disable=protected-access - ): - if response_length == in_waiting: - result = self._recv(response_length, full) - return result, None - return self._transact(no_response_expected, packet, response_length, full=full) - - def _transact(self, no_response_expected: bool, request: ModbusPDU, response_length, full=False): - """Do a Write and Read transaction.""" - last_exception = None - try: - self.client.connect() - packet = self.client.framer.buildFrame(request) - Log.debug("SEND: {}", packet, ":hex") - size = self._send(packet) - if ( - isinstance(size, bytes) - and self.client.state == ModbusTransactionState.RETRYING + Log.debug( + "Current transaction state - {}", + ModbusTransactionState.to_string(self.client.state), + ) + if isinstance(self.client.framer, FramerSocket): + request.transaction_id = self.getNextTID() + else: + request.transaction_id = 0 + Log.debug("Running transaction {}", request.transaction_id) + if _buffer := hexlify_packets( + self.databuffer ): - Log.debug( - "Changing transaction state from " - '"RETRYING" to "PROCESSING REPLY"' - ) - self.client.state = ModbusTransactionState.PROCESSING_REPLY - return size, None - if self.client.comm_params.handle_local_echo is True: - if self._recv(size, full) != packet: - return b"", "Wrong local echo" - if no_response_expected: - if size: + Log.debug("Clearing current Frame: - {}", _buffer) + self.databuffer = b'' + + retry = self.retries + 1 + exp_txt = "" + while retry > 0: + if not self.send_request(request): + Log.debug('Changing transaction state from "SENDING" to "RETRYING"') + exp_txt = 'SEND failed' + Log.error(exp_txt + ', retrying') + self.client.state = ModbusTransactionState.RETRYING + retry -= 1 + continue + if no_response_expected: Log.debug( 'Changing transaction state from "SENDING" ' - 'to "TRANSACTION_COMPLETE"' + 'to "TRANSACTION_COMPLETE" (no response expected)' ) self.client.state = ModbusTransactionState.TRANSACTION_COMPLETE - return b"", None - if size: - Log.debug( - 'Changing transaction state from "SENDING" ' - 'to "WAITING FOR REPLY"' - ) + return None + Log.debug('Changing transaction state from "SENDING" to "WAITING_FOR_REPLY"') self.client.state = ModbusTransactionState.WAITING_FOR_REPLY - result = self._recv(response_length, full) - # result2 = self._recv(response_length, full) - Log.debug("RECV: {}", result, ":hex") - except (OSError, ModbusIOException, InvalidMessageReceivedException, ConnectionException) as msg: - self.client.close() - Log.debug("Transaction failed. ({}) ", msg) - last_exception = msg - result = b"" - return result, last_exception - - def _send(self, packet: bytes, _retrying=False): - """Send.""" - return self.client.send(packet) - - def _recv(self, expected_response_length, full) -> bytes: # noqa: C901 - """Receive.""" - total = None - if not full: - exception_length = self._calculate_exception_length() - if isinstance(self.client.framer, FramerSocket): - min_size = 8 - elif isinstance(self.client.framer, FramerRTU): - min_size = 4 - elif isinstance(self.client.framer, FramerAscii): - min_size = 5 - else: - min_size = expected_response_length - - read_min = self.client.recv(min_size) - if min_size and len(read_min) != min_size: - msg_start = "Incomplete message" if read_min else "No response" - raise InvalidMessageReceivedException( - f"{msg_start} received, expected at least {min_size} bytes " - f"({len(read_min)} received)" - ) - if read_min: - if isinstance(self.client.framer, FramerSocket): - func_code = int(read_min[-1]) - elif isinstance(self.client.framer, FramerRTU): - func_code = int(read_min[1]) - elif isinstance(self.client.framer, FramerAscii): - func_code = int(read_min[3:5], 16) + if not (pdu := self.receive_response()): + Log.debug('Changing transaction state from "WAITING_FOR_REPLY" to "RETRYING"') + exp_txt = 'RECEIVE failed' + Log.error(exp_txt + ', retrying') + self.client.state = ModbusTransactionState.RETRYING + retry -= 1 + continue + + print(pdu) + break + + if not retry: + return ModbusIOException(exp_txt, request.function_code) + + if pdu: + self.addTransaction(pdu) + if not (result := self.getTransaction(request.transaction_id)): + if len(self.transactions): + result = self.getTransaction(0) else: - func_code = -1 - - if func_code < 0x80: # Not an error - if isinstance(self.client.framer, FramerSocket): - length = struct.unpack(">H", read_min[4:6])[0] - 1 - expected_response_length = 7 + length - elif expected_response_length is None and isinstance( - self.client.framer, FramerRTU - ): - with suppress( - IndexError # response length indeterminate with available bytes - ): - expected_response_length = ( - self._get_expected_response_length( - read_min - ) - ) - if expected_response_length is not None: - expected_response_length -= min_size - total = expected_response_length + min_size - else: - expected_response_length = exception_length - min_size - total = expected_response_length + min_size - else: - total = expected_response_length - retries = 0 - missing_len = expected_response_length - result = read_min - while missing_len and retries < self.retries: - if retries: - time.sleep(0.1) - data = self.client.recv(expected_response_length) - result += data - missing_len -= len(data) - retries += 1 - else: - read_min = b"" - total = expected_response_length - result = self.client.recv(expected_response_length) - result = read_min + result - actual = len(result) - if total is not None and actual != total: - msg_start = "Incomplete message" if actual else "No response" - Log.debug( - "{} received, Expected {} bytes Received {} bytes !!!!", - msg_start, - total, - actual, - ) - elif not actual: - # If actual == 0 and total is not None then the above - # should be triggered, so total must be None here - Log.debug("No response received to unbounded read !!!!") - if self.client.state != ModbusTransactionState.PROCESSING_REPLY: - Log.debug( - "Changing transaction state from " - '"WAITING FOR REPLY" to "PROCESSING REPLY"' - ) - self.client.state = ModbusTransactionState.PROCESSING_REPLY - return result - - def _get_expected_response_length(self, data) -> int: - """Get the expected response length. - - :param data: Message data read so far - :raises IndexError: If not enough data to read byte count - :return: Total frame size - """ - if not (pdu_class := self.client.framer.decoder.lookupPduClass(data)): - pdu_class = ExceptionResponse - return pdu_class.calculateRtuFrameSize(data) + last_exception = ( + "No Response received from the remote slave" + "/Unable to decode response" + ) + result = ModbusIOException( + last_exception, request.function_code + ) + self.client.close() + if hasattr(self.client, "state"): + Log.debug( + "Changing transaction state from " + '"PROCESSING REPLY" to ' + '"TRANSACTION_COMPLETE"' + ) + self.client.state = ModbusTransactionState.TRANSACTION_COMPLETE + return result diff --git a/test/sub_current/test_transaction.py b/test/sub_current/test_transaction.py index 5c4afb0f4..4084a918b 100755 --- a/test/sub_current/test_transaction.py +++ b/test/sub_current/test_transaction.py @@ -1,6 +1,7 @@ """Test transaction.""" from unittest import mock +from pymodbus.client import ModbusTcpClient from pymodbus.exceptions import ( ModbusIOException, ) @@ -42,58 +43,19 @@ def setup_method(self): self._tls = FramerTLS(self.decoder) self._rtu = FramerRTU(self.decoder) self._ascii = FramerAscii(self.decoder) - self._manager = SyncModbusTransactionManager(None, 3) + client = mock.MagicMock() + client.framer = self._rtu + self._manager = SyncModbusTransactionManager(client, 3) # ----------------------------------------------------------------------- # # Modbus transaction manager # ----------------------------------------------------------------------- # - def test_calculate_expected_response_length(self): - """Test calculate expected response length.""" - self._manager.client = mock.MagicMock() - self._manager.client.framer = mock.MagicMock() - self._manager._set_adu_size() # pylint: disable=protected-access - assert not self._manager._calculate_response_length( # pylint: disable=protected-access - 0 - ) - self._manager.base_adu_size = 10 - assert ( - self._manager._calculate_response_length(5) # pylint: disable=protected-access - == 15 - ) - - def test_calculate_exception_length(self): - """Test calculate exception length.""" - for framer, exception_length in ( - ("ascii", 11), - ("rtu", 5), - ("tcp", 9), - ("tls", 2), - ("dummy", None), - ): - self._manager.client = mock.MagicMock() - if framer == "ascii": - self._manager.client.framer = self._ascii - elif framer == "rtu": - self._manager.client.framer = self._rtu - elif framer == "tcp": - self._manager.client.framer = self._tcp - elif framer == "tls": - self._manager.client.framer = self._tls - else: - self._manager.client.framer = mock.MagicMock() - - self._manager._set_adu_size() # pylint: disable=protected-access - assert ( - self._manager._calculate_exception_length() # pylint: disable=protected-access - == exception_length - ) - - @mock.patch.object(SyncModbusTransactionManager, "_recv") @mock.patch.object(ModbusTransactionManager, "getTransaction") - def test_execute(self, mock_get_transaction, mock_recv): + def test_execute(self, mock_get_transaction): """Test execute.""" - client = mock.MagicMock() + client = ModbusTcpClient("localhost") + client.recv = mock.Mock() client.framer = self._ascii client.framer._buffer = b"deadbeef" # pylint: disable=protected-access client.framer.processIncomingFrame = mock.MagicMock() @@ -107,54 +69,36 @@ def test_execute(self, mock_get_transaction, mock_recv): request.slave_id = 1 request.function_code = 222 trans = SyncModbusTransactionManager(client, 3) - mock_recv.reset_mock( - return_value=b"abcdef" - ) assert trans.retries == 3 + client.recv.side_effect=iter([b"abcdef", None]) mock_get_transaction.return_value = b"response" + trans.retries = 0 response = trans.execute(False, request) - assert response == b"response" + assert isinstance(response, ModbusIOException) # No response - mock_recv.reset_mock( - return_value=b"abcdef" - ) + client.recv.side_effect=iter([b"abcdef", None]) trans.transactions = {} mock_get_transaction.return_value = None response = trans.execute(False, request) assert isinstance(response, ModbusIOException) # No response with retries - mock_recv.reset_mock( - side_effect=iter([b"", b"abcdef"]) - ) + client.recv.side_effect=iter([b"", b"abcdef"]) response = trans.execute(False, request) assert isinstance(response, ModbusIOException) # wrong handle_local_echo - mock_recv.reset_mock( - side_effect=iter([b"abcdef", b"deadbe", b"123456"]) - ) + client.recv.side_effect=iter([b"abcdef", b"deadbe", b"123456"]) client.comm_params.handle_local_echo = True - assert trans.execute(False, request).message == "[Input/Output] Wrong local echo" + assert trans.execute(False, request).message == "[Input/Output] SEND failed" client.comm_params.handle_local_echo = False # retry on invalid response - mock_recv.reset_mock( - side_effect=iter([b"", b"abcdef", b"deadbe", b"123456"]) - ) + client.recv.side_effect=iter([b"", b"abcdef", b"deadbe", b"123456"]) response = trans.execute(False, request) assert isinstance(response, ModbusIOException) - # Unable to decode response - mock_recv.reset_mock( - side_effect=ModbusIOException() - ) - client.framer.processIncomingFrame.side_effect = mock.MagicMock( - side_effect=ModbusIOException() - ) - assert isinstance(trans.execute(False, request), ModbusIOException) - def test_transaction_manager_tid(self): """Test the transaction manager TID.""" for tid in range(1, self._manager.getNextTID() + 10):