diff --git a/pymodbus/transaction/transaction.py b/pymodbus/transaction/transaction.py index b8bc5dade..dbec35f29 100644 --- a/pymodbus/transaction/transaction.py +++ b/pymodbus/transaction/transaction.py @@ -3,6 +3,7 @@ import asyncio from collections.abc import Callable +from threading import RLock from pymodbus.exceptions import ConnectionException, ModbusIOException from pymodbus.framer import FramerBase @@ -39,9 +40,12 @@ def __init__( framer: FramerBase, retries: int, is_server: bool, + sync_send: Callable[[bytes], None] | None = None, + sync_recv: Callable[[int], None] | None = None, ) -> None: """Initialize an instance of the ModbusTransactionManager.""" - super().__init__(params, is_server) + is_sync = bool(sync_send) | bool(sync_recv) + super().__init__(params, is_server, is_sync=bool(sync_send)) self.framer = framer self.retries = retries self.next_tid: int = 0 @@ -51,11 +55,62 @@ def __init__( self.trace_send_pdu: Callable[[ModbusPDU | None], ModbusPDU] | None = None self.accept_no_response_limit = retries + 3 self.count_no_responses = 0 - self._lock = asyncio.Lock() + if is_sync: + if sync_send: + self.sync_send = sync_send + if sync_recv: + self.sync_recv = sync_recv + self._sync_lock = RLock() + else: + self._lock = asyncio.Lock() self.response_future: asyncio.Future = asyncio.Future() - async def execute(self, no_response_expected: bool, request) -> ModbusPDU | None: - """Execute requests asynchronously.""" + def sync_get_response(self) -> ModbusPDU | None: + """Receive data sync.""" + return None + + def sync_execute(self, no_response_expected: bool, request: ModbusPDU) -> ModbusPDU | None: + """Execute requests asynchronously. + + REMARK: this method is identical to execute, apart from the lock and sync_receive. + any changes in either method MUST be mirrored !!! + """ + if not self.transport: + Log.warning("Not connected, trying to connect!") + if not self.transport and not self.connect(): + raise ConnectionException("Client cannot connect (automatic retry continuing) !!") + with self._sync_lock: + request.transaction_id = self.getNextTID() + if self.trace_send_pdu: + request = self.trace_send_pdu(request) # pylint: disable=not-callable + packet = self.framer.buildFrame(request) + count_retries = 0 + while count_retries <= self.retries: + if self.trace_send_packet: + packet = self.trace_send_packet(packet) # pylint: disable=not-callable + self.sync_send(packet) + if no_response_expected: + return None + try: + return self.sync_get_response() + except asyncio.exceptions.TimeoutError: + count_retries += 1 + if self.count_no_responses >= self.accept_no_response_limit: + self.connection_lost(asyncio.TimeoutError("Server not responding")) + raise ModbusIOException( + f"ERROR: No response received of the last {self.accept_no_response_limit} request, CLOSING CONNECTION." + ) + self.count_no_responses += 1 + Log.error(f"No response received after {self.retries} retries, continue with next request") + self.response_future = asyncio.Future() + return None + + async def execute(self, no_response_expected: bool, request: ModbusPDU) -> ModbusPDU | None: + """Execute requests asynchronously. + + REMARK: this method is identical to sync_execute, apart from the lock and try/except. + any changes in either method MUST be mirrored !!! + """ if not self.transport: Log.warning("Not connected, trying to connect!") if not self.transport and not await self.connect(): diff --git a/pymodbus/transport/transport.py b/pymodbus/transport/transport.py index 6ccc84eee..3a4ffc26e 100644 --- a/pymodbus/transport/transport.py +++ b/pymodbus/transport/transport.py @@ -138,18 +138,24 @@ def __init__( self, params: CommParams, is_server: bool, + is_sync: bool = False ) -> None: """Initialize a transport instance. :param params: parameter dataclass :param is_server: true if object act as a server (listen/connect) + :param is_sync: true if used with sync client """ self.comm_params = params.copy() self.is_server = is_server self.is_closing = False self.transport: asyncio.BaseTransport = None # type: ignore[assignment] - self.loop: asyncio.AbstractEventLoop = asyncio.get_running_loop() + self.loop: asyncio.AbstractEventLoop + if is_sync: + self.loop = asyncio.new_event_loop() + else: + self.loop = asyncio.get_running_loop() self.recv_buffer: bytes = b"" self.call_create: Callable[[], Coroutine[Any, Any, Any]] = None # type: ignore[assignment] self.reconnect_task: asyncio.Task | None = None diff --git a/test/transaction/test_transaction.py b/test/transaction/test_transaction.py index 4db68e357..3edb33d72 100755 --- a/test/transaction/test_transaction.py +++ b/test/transaction/test_transaction.py @@ -4,8 +4,9 @@ import pytest +from pymodbus.client import ModbusBaseSyncClient from pymodbus.exceptions import ConnectionException, ModbusIOException -from pymodbus.framer import FramerRTU, FramerSocket +from pymodbus.framer import FramerRTU, FramerSocket, FramerType from pymodbus.pdu import DecodePDU from pymodbus.pdu.bit_message import ReadCoilsRequest, ReadCoilsResponse from pymodbus.transaction import TransactionManager @@ -147,3 +148,144 @@ async def test_client_protocol_execute_outside(self, use_clc, no_resp): assert not response.isError() assert isinstance(response, ReadCoilsResponse) + + +@pytest.mark.parametrize("use_port", [5098]) +class TestSyncTransaction: + """Test the pymodbus.transaction module.""" + + def test_sync_transaction_instance(self, use_clc): + """Test instantiate class.""" + client = ModbusBaseSyncClient(FramerType.SOCKET, 5, use_clc) + TransactionManager(use_clc, FramerRTU(DecodePDU(False)), 5, False, sync_recv=client.recv, sync_send=client.send) + TransactionManager(use_clc, FramerRTU(DecodePDU(True)), 5, True, sync_recv=client.recv, sync_send=client.send) + + def test_sync_transaction_manager_tid(self, use_clc): + """Test next TID.""" + client = ModbusBaseSyncClient(FramerType.SOCKET, 5, use_clc) + transact = TransactionManager(use_clc, client.framer, 5, False, sync_recv=client.recv, sync_send=client.send) + assert transact.getNextTID() == 1 + for tid in range(2, 12): + assert tid == transact.getNextTID() + assert transact.getNextTID() == 12 + transact.next_tid = 64999 + assert transact.getNextTID() == 65000 + assert transact.getNextTID() == 1 + + def test_sync_transaction_calls(self, use_clc): + """Test dummy calls from transport.""" + client = ModbusBaseSyncClient(FramerType.SOCKET, 5, use_clc) + transact = TransactionManager(use_clc, client.framer, 5, False, sync_recv=client.recv, sync_send=client.send) + transact.callback_new_connection() + transact.callback_connected() + + def test_sync_transaction_disconnect(self, use_clc): + """Test tracers in disconnect.""" + client = ModbusBaseSyncClient(FramerType.SOCKET, 5, use_clc) + transact = TransactionManager(use_clc, client.framer, 5, False, sync_recv=client.recv, sync_send=client.send) + transact.callback_disconnected(None) + transact.trace_recv_packet = mock.Mock() + transact.trace_recv_pdu = mock.Mock() + transact.trace_send_packet = mock.Mock() + transact.trace_send_pdu = mock.Mock() + transact.callback_disconnected(None) + transact.trace_recv_packet.assert_called_once_with(None) + transact.trace_recv_pdu.assert_called_once_with(None) + transact.trace_send_packet.assert_called_once_with(None) + transact.trace_send_pdu.assert_called_once_with(None) + + @pytest.mark.parametrize("test", [True, False]) + async def test_sync_transaction_data(self, use_clc, test): + """Test tracers in disconnect.""" + transact = TransactionManager(use_clc, FramerRTU(DecodePDU(False)), 5, False) + transact.framer.processIncomingFrame = mock.Mock(return_value=(0, None)) + packet = b'123' + transact.callback_data(packet) + assert not transact.response_future.done() + transact.trace_recv_packet = mock.Mock() + pdu = "dummy pdu" + + if test: + transact.framer.processIncomingFrame.return_value = (1, pdu) + transact.callback_data(packet) + transact.trace_recv_packet.assert_called_once_with(packet) + else: + transact.trace_recv_pdu = mock.Mock(return_value=pdu) + transact.framer.processIncomingFrame.return_value = (1, pdu) + transact.callback_data(packet) + transact.trace_recv_packet.assert_called_with(packet) + transact.trace_recv_pdu.assert_called_once_with(pdu) + assert transact.response_future.result() == pdu + + @pytest.mark.parametrize("scenario", range(7)) + async def test_sync_transaction_execute(self, use_clc, scenario): + """Test tracers in disconnect.""" + transact = TransactionManager(use_clc, FramerRTU(DecodePDU(False)), 5, False) + transact.send = mock.Mock() + request = ReadCoilsRequest(address=117, count=5) + response = ReadCoilsResponse(bits=[True, False, True, True, False]) + transact.retries = 0 + transact.connection_made(mock.AsyncMock()) + if scenario == 0: # transport not ok and no connect + transact.transport = None + with pytest.raises(ConnectionException): + await transact.execute(False, request) + elif scenario == 2: # transport not ok and connect, no trace + transact.transport = None + transact.connect = mock.AsyncMock(return_value=1) + await transact.execute(True, request) + elif scenario == 3: # transport ok, trace and send + transact.trace_send_pdu = mock.Mock(return_value=request) + transact.trace_send_packet = mock.Mock(return_value=b'123') + await transact.execute(True, request) + transact.trace_send_pdu.assert_called_once_with(request) + transact.trace_send_packet.assert_called_once_with(b'\x00\x01\x00u\x00\x05\xec\x02') + elif scenario == 4: # wait receive,timeout, no_responses + transact.comm_params.timeout_connect = 0.1 + transact.count_no_responses = 10 + transact.connection_lost = mock.Mock() + with pytest.raises(ModbusIOException): + await transact.execute(False, request) + elif scenario == 5: # wait receive,timeout, no_responses pass + transact.comm_params.timeout_connect = 0.1 + transact.connection_lost = mock.Mock() + assert not await transact.execute(False, request) + elif scenario == 6: # response + transact.comm_params.timeout_connect = 0.2 + transact.response_future.set_result(response) + resp = asyncio.create_task(transact.execute(False, request)) + await asyncio.sleep(0.2) + assert response == await resp + + async def test_sync_transaction_receiver(self, use_clc): + """Test tracers in disconnect.""" + transact = TransactionManager(use_clc, FramerSocket(DecodePDU(False)), 5, False) + transact.send = mock.Mock() + response = ReadCoilsResponse(bits=[True, False, True, True, False]) + transact.retries = 0 + transact.connection_made(mock.AsyncMock()) + + data = b"\x00\x00\x12\x34\x00\x06\xff\x01\x01\x02\x00\x04" + transact.data_received(data) + response = await transact.response_future + assert isinstance(response, ReadCoilsResponse) + + @pytest.mark.parametrize("no_resp", [False, True]) + async def test_sync_client_protocol_execute_outside(self, use_clc, no_resp): + """Test the transaction execute method.""" + transact = TransactionManager(use_clc, FramerSocket(DecodePDU(False)), 5, False) + transact.send = mock.Mock() + request = ReadCoilsRequest(address=117, count=5) + response = ReadCoilsResponse(bits=[True, False, True, True, False]) + transact.retries = 0 + transact.connection_made(mock.AsyncMock()) + resp = asyncio.create_task(transact.execute(no_resp, request)) + await asyncio.sleep(0.2) + data = b"\x00\x00\x12\x34\x00\x06\xff\x01\x01\x02\x00\x04" + transact.data_received(data) + response = await resp + if no_resp: + assert not response + else: + assert not response.isError() + assert isinstance(response, ReadCoilsResponse)