diff --git a/pymodbus/client/base.py b/pymodbus/client/base.py index 95f4f8160..643f469ff 100644 --- a/pymodbus/client/base.py +++ b/pymodbus/client/base.py @@ -11,7 +11,7 @@ from pymodbus.framer import FRAMER_NAME_TO_CLASS, FramerBase, FramerType from pymodbus.logging import Log from pymodbus.pdu import DecodePDU, ModbusPDU -from pymodbus.transaction import SyncModbusTransactionManager +from pymodbus.transaction.old_transaction import SyncModbusTransactionManager from pymodbus.transport import CommParams from pymodbus.utilities import ModbusTransactionState diff --git a/pymodbus/transaction/__init__.py b/pymodbus/transaction/__init__.py index abd75682c..444ca29ae 100644 --- a/pymodbus/transaction/__init__.py +++ b/pymodbus/transaction/__init__.py @@ -1,12 +1,6 @@ """Transaction.""" __all__ = [ - "ModbusTransactionManager", - "SyncModbusTransactionManager", "TransactionManager", ] -from pymodbus.transaction.old_transaction import ( - ModbusTransactionManager, - SyncModbusTransactionManager, -) from pymodbus.transaction.transaction import TransactionManager diff --git a/pymodbus/transaction/transaction.py b/pymodbus/transaction/transaction.py index b8bc5dade..2a514942d 100644 --- a/pymodbus/transaction/transaction.py +++ b/pymodbus/transaction/transaction.py @@ -3,6 +3,7 @@ import asyncio from collections.abc import Callable +from threading import RLock from pymodbus.exceptions import ConnectionException, ModbusIOException from pymodbus.framer import FramerBase @@ -39,9 +40,10 @@ def __init__( framer: FramerBase, retries: int, is_server: bool, + sync_client = None, ) -> None: """Initialize an instance of the ModbusTransactionManager.""" - super().__init__(params, is_server) + super().__init__(params, is_server, is_sync=bool(sync_client)) self.framer = framer self.retries = retries self.next_tid: int = 0 @@ -51,14 +53,69 @@ def __init__( self.trace_send_pdu: Callable[[ModbusPDU | None], ModbusPDU] | None = None self.accept_no_response_limit = retries + 3 self.count_no_responses = 0 - self._lock = asyncio.Lock() + if sync_client: + self.sync_client = sync_client + self._sync_lock = RLock() + else: + self._lock = asyncio.Lock() self.response_future: asyncio.Future = asyncio.Future() - async def execute(self, no_response_expected: bool, request) -> ModbusPDU | None: - """Execute requests asynchronously.""" + def sync_get_response(self) -> ModbusPDU | None: + """Receive until PDU is correct or timeout.""" + databuffer = b'' + while True: + if not (data := self.sync_client.recv(None)): + raise asyncio.exceptions.TimeoutError() + databuffer += data + used_len, pdu = self.framer.processIncomingFrame(databuffer) + databuffer = databuffer[used_len:] + if pdu: + return pdu + + def sync_execute(self, no_response_expected: bool, request: ModbusPDU) -> ModbusPDU | None: + """Execute requests asynchronously. + + REMARK: this method is identical to execute, apart from the lock and sync_receive. + any changes in either method MUST be mirrored !!! + """ + if not self.transport: + Log.warning("Not connected, trying to connect!") + if not self.sync_client.connect(): + raise ConnectionException("Client cannot connect (automatic retry continuing) !!") + with self._sync_lock: + request.transaction_id = self.getNextTID() + if self.trace_send_pdu: + request = self.trace_send_pdu(request) # pylint: disable=not-callable + packet = self.framer.buildFrame(request) + count_retries = 0 + while count_retries <= self.retries: + if self.trace_send_packet: + packet = self.trace_send_packet(packet) # pylint: disable=not-callable + self.sync_client.send(packet) + if no_response_expected: + return None + try: + return self.sync_get_response() + except asyncio.exceptions.TimeoutError: + count_retries += 1 + if self.count_no_responses >= self.accept_no_response_limit: + self.connection_lost(asyncio.TimeoutError("Server not responding")) + raise ModbusIOException( + f"ERROR: No response received of the last {self.accept_no_response_limit} request, CLOSING CONNECTION." + ) + self.count_no_responses += 1 + Log.error(f"No response received after {self.retries} retries, continue with next request") + return None + + async def execute(self, no_response_expected: bool, request: ModbusPDU) -> ModbusPDU | None: + """Execute requests asynchronously. + + REMARK: this method is identical to sync_execute, apart from the lock and try/except. + any changes in either method MUST be mirrored !!! + """ if not self.transport: Log.warning("Not connected, trying to connect!") - if not self.transport and not await self.connect(): + if not await self.connect(): raise ConnectionException("Client cannot connect (automatic retry continuing) !!") async with self._lock: request.transaction_id = self.getNextTID() @@ -101,14 +158,12 @@ def callback_connected(self) -> None: def callback_disconnected(self, exc: Exception | None) -> None: """Call when connection is lost.""" - if self.trace_recv_packet: - self.trace_recv_packet(None) # pylint: disable=not-callable - if self.trace_recv_pdu: - self.trace_recv_pdu(None) # pylint: disable=not-callable - if self.trace_send_packet: - self.trace_send_packet(None) # pylint: disable=not-callable - if self.trace_send_pdu: - self.trace_send_pdu(None) # pylint: disable=not-callable + for call in (self.trace_recv_packet, + self.trace_recv_pdu, + self.trace_send_packet, + self.trace_send_pdu): + if call: + call(None) # pylint: disable=not-callable def callback_data(self, data: bytes, addr: tuple | None = None) -> int: """Handle received data.""" diff --git a/pymodbus/transport/transport.py b/pymodbus/transport/transport.py index 6ccc84eee..3a4ffc26e 100644 --- a/pymodbus/transport/transport.py +++ b/pymodbus/transport/transport.py @@ -138,18 +138,24 @@ def __init__( self, params: CommParams, is_server: bool, + is_sync: bool = False ) -> None: """Initialize a transport instance. :param params: parameter dataclass :param is_server: true if object act as a server (listen/connect) + :param is_sync: true if used with sync client """ self.comm_params = params.copy() self.is_server = is_server self.is_closing = False self.transport: asyncio.BaseTransport = None # type: ignore[assignment] - self.loop: asyncio.AbstractEventLoop = asyncio.get_running_loop() + self.loop: asyncio.AbstractEventLoop + if is_sync: + self.loop = asyncio.new_event_loop() + else: + self.loop = asyncio.get_running_loop() self.recv_buffer: bytes = b"" self.call_create: Callable[[], Coroutine[Any, Any, Any]] = None # type: ignore[assignment] self.reconnect_task: asyncio.Task | None = None diff --git a/test/transaction/test_old_transaction.py b/test/transaction/test_old_transaction.py index a7fc88d33..a2ebe4493 100755 --- a/test/transaction/test_old_transaction.py +++ b/test/transaction/test_old_transaction.py @@ -1,10 +1,6 @@ """Test transaction.""" from unittest import mock -from pymodbus.client import ModbusTcpClient -from pymodbus.exceptions import ( - ModbusIOException, -) from pymodbus.framer import ( FramerAscii, FramerRTU, @@ -12,10 +8,7 @@ FramerTLS, ) from pymodbus.pdu import DecodePDU, ModbusPDU -from pymodbus.transaction import ( - ModbusTransactionManager, - SyncModbusTransactionManager, -) +from pymodbus.transaction.old_transaction import SyncModbusTransactionManager TEST_MESSAGE = b"\x7b\x01\x03\x00\x00\x00\x05\x85\xC9\x7d" @@ -51,54 +44,6 @@ def setup_method(self): # Modbus transaction manager # ----------------------------------------------------------------------- # - @mock.patch.object(ModbusTransactionManager, "getTransaction") - def test_execute(self, mock_get_transaction): - """Test execute.""" - client = ModbusTcpClient("localhost") - client.recv = mock.Mock() - client.framer = self._ascii - client.framer._buffer = b"deadbeef" # pylint: disable=protected-access - client.framer.processIncomingFrame = mock.MagicMock() - client.framer.processIncomingFrame.return_value = 0, None - client.framer.buildFrame = mock.MagicMock() - client.framer.buildFrame.return_value = b"deadbeef" - client.send = mock.MagicMock() - client.send.return_value = len(b"deadbeef") - request = mock.MagicMock() - request.get_response_pdu_size.return_value = 10 - request.slave_id = 1 - request.function_code = 222 - trans = SyncModbusTransactionManager(client, 3) - assert trans.retries == 3 - - client.recv.side_effect=iter([b"abcdef", None]) - mock_get_transaction.return_value = b"response" - trans.retries = 0 - response = trans.execute(False, request) - assert isinstance(response, ModbusIOException) - # No response - client.recv.side_effect=iter([b"abcdef", None]) - trans.transactions = {} - mock_get_transaction.return_value = None - response = trans.execute(False, request) - assert isinstance(response, ModbusIOException) - - # No response with retries - client.recv.side_effect=iter([b"", b"abcdef"]) - response = trans.execute(False, request) - assert isinstance(response, ModbusIOException) - - # wrong handle_local_echo - client.recv.side_effect=iter([b"abcdef", b"deadbe", b"123456"]) - client.comm_params.handle_local_echo = True - assert trans.execute(False, request).message == "[Input/Output] SEND failed" - client.comm_params.handle_local_echo = False - - # retry on invalid response - client.recv.side_effect=iter([b"", b"abcdef", b"deadbe", b"123456"]) - response = trans.execute(False, request) - assert isinstance(response, ModbusIOException) - def test_transaction_manager_tid(self): """Test the transaction manager TID.""" for tid in range(1, self._manager.getNextTID() + 10): diff --git a/test/transaction/test_transaction.py b/test/transaction/test_transaction.py index 4db68e357..c2ce96059 100755 --- a/test/transaction/test_transaction.py +++ b/test/transaction/test_transaction.py @@ -4,8 +4,9 @@ import pytest +from pymodbus.client import ModbusBaseSyncClient from pymodbus.exceptions import ConnectionException, ModbusIOException -from pymodbus.framer import FramerRTU, FramerSocket +from pymodbus.framer import FramerRTU, FramerSocket, FramerType from pymodbus.pdu import DecodePDU from pymodbus.pdu.bit_message import ReadCoilsRequest, ReadCoilsResponse from pymodbus.transaction import TransactionManager @@ -74,7 +75,7 @@ async def test_transaction_data(self, use_clc, test): transact.trace_recv_pdu.assert_called_once_with(pdu) assert transact.response_future.result() == pdu - @pytest.mark.parametrize("scenario", range(7)) + @pytest.mark.parametrize("scenario", range(6)) async def test_transaction_execute(self, use_clc, scenario): """Test tracers in disconnect.""" transact = TransactionManager(use_clc, FramerRTU(DecodePDU(False)), 5, False) @@ -87,27 +88,27 @@ async def test_transaction_execute(self, use_clc, scenario): transact.transport = None with pytest.raises(ConnectionException): await transact.execute(False, request) - elif scenario == 2: # transport not ok and connect, no trace + elif scenario == 1: # transport not ok and connect, no trace transact.transport = None transact.connect = mock.AsyncMock(return_value=1) await transact.execute(True, request) - elif scenario == 3: # transport ok, trace and send + elif scenario == 2: # transport ok, trace and send transact.trace_send_pdu = mock.Mock(return_value=request) transact.trace_send_packet = mock.Mock(return_value=b'123') await transact.execute(True, request) transact.trace_send_pdu.assert_called_once_with(request) transact.trace_send_packet.assert_called_once_with(b'\x00\x01\x00u\x00\x05\xec\x02') - elif scenario == 4: # wait receive,timeout, no_responses + elif scenario == 3: # wait receive,timeout, no_responses transact.comm_params.timeout_connect = 0.1 transact.count_no_responses = 10 transact.connection_lost = mock.Mock() with pytest.raises(ModbusIOException): await transact.execute(False, request) - elif scenario == 5: # wait receive,timeout, no_responses pass + elif scenario == 4: # wait receive,timeout, no_responses pass transact.comm_params.timeout_connect = 0.1 transact.connection_lost = mock.Mock() assert not await transact.execute(False, request) - elif scenario == 6: # response + else: # if scenario == 5: # response transact.comm_params.timeout_connect = 0.2 transact.response_future.set_result(response) resp = asyncio.create_task(transact.execute(False, request)) @@ -133,17 +134,118 @@ async def test_client_protocol_execute_outside(self, use_clc, no_resp): transact = TransactionManager(use_clc, FramerSocket(DecodePDU(False)), 5, False) transact.send = mock.Mock() request = ReadCoilsRequest(address=117, count=5) - response = ReadCoilsResponse(bits=[True, False, True, True, False]) transact.retries = 0 transact.connection_made(mock.AsyncMock()) resp = asyncio.create_task(transact.execute(no_resp, request)) await asyncio.sleep(0.2) data = b"\x00\x00\x12\x34\x00\x06\xff\x01\x01\x02\x00\x04" transact.data_received(data) - response = await resp + result = await resp + if no_resp: + assert not result + else: + assert not result.isError() + assert isinstance(result, ReadCoilsResponse) + + +@pytest.mark.parametrize("use_port", [5098]) +class TestSyncTransaction: + """Test the pymodbus.transaction module.""" + + def test_sync_transaction_instance(self, use_clc): + """Test instantiate class.""" + client = ModbusBaseSyncClient(FramerType.SOCKET, 5, use_clc) + TransactionManager(use_clc, FramerRTU(DecodePDU(False)), 5, False, sync_client=client) + TransactionManager(use_clc, FramerRTU(DecodePDU(True)), 5, True, sync_client=client) + + + @pytest.mark.parametrize("scenario", range(6)) + async def test_sync_transaction_execute(self, use_clc, scenario): + """Test tracers in disconnect.""" + client = ModbusBaseSyncClient(FramerType.SOCKET, 5, use_clc) + transact = TransactionManager(use_clc, FramerRTU(DecodePDU(False)), 5, False, sync_client=client) + transact.send = mock.Mock() + transact.sync_client.connect = mock.Mock(return_value=True) + request = ReadCoilsRequest(address=117, count=5) + response = ReadCoilsResponse(bits=[True, False, True, True, False, False, False, False]) + transact.retries = 0 + if scenario == 0: # transport not ok and no connect + transact.transport = None + transact.sync_client.connect = mock.Mock(return_value=False) + with pytest.raises(ConnectionException): + transact.sync_execute(False, request) + elif scenario == 1: # transport not ok and connect, no trace + transact.transport = None + transact.sync_client.connect = mock.Mock(return_value=True) + transact.sync_execute(True, request) + elif scenario == 2: # transport ok, trace and send + transact.trace_send_pdu = mock.Mock(return_value=request) + transact.trace_send_packet = mock.Mock(return_value=b'123') + transact.sync_execute(True, request) + transact.trace_send_pdu.assert_called_once_with(request) + transact.trace_send_packet.assert_called_once_with(b'\x00\x01\x00u\x00\x05\xec\x02') + elif scenario == 3: # wait receive,timeout, no_responses + transact.comm_params.timeout_connect = 0.1 + transact.count_no_responses = 10 + with pytest.raises(ModbusIOException): + transact.sync_execute(False, request) + elif scenario == 4: # wait receive,timeout, no_responses pass + transact.comm_params.timeout_connect = 0.1 + assert not transact.sync_execute(False, request) + else: # if scenario == 5 # response + transact.transport = 1 + resp_bytes = transact.framer.buildFrame(response) + transact.sync_client.recv = mock.Mock(return_value=resp_bytes) + transact.sync_client.send = mock.Mock() + transact.comm_params.timeout_connect = 0.2 + resp = transact.sync_execute(False, request) + assert response.bits == resp.bits + + def test_sync_transaction_receiver(self, use_clc): + """Test tracers in disconnect.""" + client = ModbusBaseSyncClient(FramerType.SOCKET, 5, use_clc) + transact = TransactionManager(use_clc, FramerRTU(DecodePDU(False)), 5, False, sync_client=client) + transact.sync_client.send = mock.Mock() + request = ReadCoilsRequest(address=117, count=5) + response = ReadCoilsResponse(bits=[True, False, True, True, False, False, False, False]) + transact.retries = 0 + transact.transport = 1 + resp_bytes = transact.framer.buildFrame(response) + transact.sync_client.recv = mock.Mock(return_value=resp_bytes) + transact.sync_client.send = mock.Mock() + transact.comm_params.timeout_connect = 0.2 + resp = transact.sync_execute(False, request) + assert response.bits == resp.bits + + @pytest.mark.parametrize("no_resp", [False, True]) + def test_sync_client_protocol_execute_outside(self, use_clc, no_resp): + """Test the transaction execute method.""" + client = ModbusBaseSyncClient(FramerType.SOCKET, 5, use_clc) + transact = TransactionManager(use_clc, FramerRTU(DecodePDU(False)), 5, False, sync_client=client) + request = ReadCoilsRequest(address=117, count=5) + response = ReadCoilsResponse(bits=[True, False, True, True, False, False, False, False]) + transact.retries = 0 + transact.transport = 1 + resp_bytes = transact.framer.buildFrame(response) + transact.sync_client.recv = mock.Mock(return_value=resp_bytes) + transact.sync_client.send = mock.Mock() + result = transact.sync_execute(no_resp, request) if no_resp: - assert not response + assert not result else: - assert not response.isError() + assert not result.isError() assert isinstance(response, ReadCoilsResponse) + def test_sync_client_protocol_execute_no_pdu(self, use_clc): + """Test the transaction execute method.""" + client = ModbusBaseSyncClient(FramerType.SOCKET, 5, use_clc) + transact = TransactionManager(use_clc, FramerRTU(DecodePDU(False)), 5, False, sync_client=client) + request = ReadCoilsRequest(address=117, count=5) + response = ReadCoilsResponse(bits=[True, False, True, True, False, False, False, False]) + transact.retries = 0 + transact.transport = 1 + resp_bytes = transact.framer.buildFrame(response)[:-1] + transact.sync_client.recv = mock.Mock(side_effect=[resp_bytes, b'']) + transact.sync_client.send = mock.Mock() + result = transact.sync_execute(False, request) + assert not result