Skip to content

Commit

Permalink
New async transaction manager. (#2453)
Browse files Browse the repository at this point in the history
  • Loading branch information
janiversen authored Nov 15, 2024
1 parent 8cc1534 commit b861b7c
Show file tree
Hide file tree
Showing 11 changed files with 309 additions and 322 deletions.
Binary file modified doc/source/_static/examples.tgz
Binary file not shown.
Binary file modified doc/source/_static/examples.zip
Binary file not shown.
69 changes: 5 additions & 64 deletions pymodbus/client/base.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
"""Base for all clients."""
from __future__ import annotations

import asyncio
import socket
from abc import abstractmethod
from collections.abc import Awaitable, Callable

from pymodbus.client.mixin import ModbusClientMixin
from pymodbus.client.modbusclientprotocol import ModbusClientProtocol
from pymodbus.exceptions import ConnectionException, ModbusIOException
from pymodbus.exceptions import ConnectionException
from pymodbus.framer import FRAMER_NAME_TO_CLASS, FramerBase, FramerType
from pymodbus.logging import Log
from pymodbus.pdu import DecodePDU, ExceptionResponse, ModbusPDU
from pymodbus.pdu import DecodePDU, ModbusPDU
from pymodbus.transaction import SyncModbusTransactionManager
from pymodbus.transport import CommParams
from pymodbus.utilities import ModbusTransactionState
Expand All @@ -37,21 +36,13 @@ def __init__(
ModbusClientMixin.__init__(self) # type: ignore[arg-type]
if comm_params:
self.comm_params = comm_params
self.retries = retries
self.ctx = ModbusClientProtocol(
framer,
(FRAMER_NAME_TO_CLASS[framer])(DecodePDU(False)),
self.comm_params,
retries,
on_connect_callback,
)

# Common variables.
self.use_udp = False
self.state = ModbusTransactionState.IDLE
self.last_frame_end: float | None = 0
self.silent_interval: float = 0
self._lock = asyncio.Lock()
self.accept_no_response_limit = retries + 3
self.count_no_responses = 0

@property
def connected(self) -> bool:
Expand Down Expand Up @@ -90,57 +81,7 @@ def execute(self, no_response_expected: bool, request: ModbusPDU):
"""
if not self.ctx.transport:
raise ConnectionException(f"Not connected[{self!s}]")
return self.async_execute(no_response_expected, request)

async def async_execute(self, no_response_expected: bool, request) -> ModbusPDU | None:
"""Execute requests asynchronously.
:meta private:
"""
request.transaction_id = self.ctx.transaction.getNextTID()
packet = self.ctx.framer.buildFrame(request)

count = 0
async with self._lock:
while count <= self.retries:
req = self.build_response(request)
self.ctx.send(packet)
if no_response_expected:
resp = None
break
try:
resp = await asyncio.wait_for(
req, timeout=self.ctx.comm_params.timeout_connect
)
break
except asyncio.exceptions.TimeoutError:
count += 1
if count > self.retries:
if self.count_no_responses >= self.accept_no_response_limit:
self.ctx.connection_lost(asyncio.TimeoutError("Server not responding"))
raise ModbusIOException(
f"ERROR: No response received of the last {self.accept_no_response_limit} request, CLOSING CONNECTION."
)
self.count_no_responses += 1
Log.error(f"No response received after {self.retries} retries, continue with next request")
return ExceptionResponse(request.function_code)

self.count_no_responses = 0
return resp

def build_response(self, request: ModbusPDU):
"""Return a deferred response for the current request.
:meta private:
"""
my_future: asyncio.Future = asyncio.Future()
request.fut = my_future
if not self.ctx.transport:
if not my_future.done():
my_future.set_exception(ConnectionException("Client is not connected"))
else:
self.ctx.transaction.addTransaction(request)
return my_future
return self.ctx.execute(no_response_expected, request)

async def __aenter__(self):
"""Implement the client with enter block.
Expand Down
52 changes: 9 additions & 43 deletions pymodbus/client/modbusclientprotocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,76 +3,42 @@

from collections.abc import Callable

from pymodbus.framer import (
FRAMER_NAME_TO_CLASS,
FramerBase,
FramerType,
)
from pymodbus.framer import FramerBase
from pymodbus.logging import Log
from pymodbus.pdu import DecodePDU
from pymodbus.transaction import ModbusTransactionManager
from pymodbus.transport import CommParams, ModbusProtocol
from pymodbus.transaction import TransactionManager
from pymodbus.transport import CommParams


class ModbusClientProtocol(ModbusProtocol):
class ModbusClientProtocol(TransactionManager):
"""**ModbusClientProtocol**.
:mod:`ModbusClientProtocol` is normally not referenced outside :mod:`pymodbus`.
"""

def __init__(
self,
framer: FramerType,
framer: FramerBase,
params: CommParams,
retries: int,
on_connect_callback: Callable[[bool], None] | None = None,
) -> None:
"""Initialize a client instance."""
ModbusProtocol.__init__(
self,
params,
False,
)
super().__init__(params, framer, retries, False)
self.on_connect_callback = on_connect_callback

# Common variables.
self.framer: FramerBase = (FRAMER_NAME_TO_CLASS[framer])(DecodePDU(False))
self.transaction = ModbusTransactionManager()

def _handle_response(self, reply):
"""Handle the processed response and link to correct deferred."""
if reply is not None:
tid = reply.transaction_id
if handler := self.transaction.getTransaction(tid):
reply.request = handler
if not handler.fut.done():
handler.fut.set_result(reply)
else:
Log.debug("Unrequested message: {}", reply, ":str")

def callback_new_connection(self):
"""Call when listener receive new connection request."""

def callback_connected(self) -> None:
"""Call when connection is succcesfull."""
super().callback_connected()
if self.on_connect_callback:
self.loop.call_soon(self.on_connect_callback, True)

def callback_disconnected(self, exc: Exception | None) -> None:
"""Call when connection is lost."""
Log.debug("callback_disconnected called: {}", exc)
super().callback_disconnected(exc)
if self.on_connect_callback:
self.loop.call_soon(self.on_connect_callback, False)

def callback_data(self, data: bytes, addr: tuple | None = None) -> int:
"""Handle received data.
returns number of bytes consumed
"""
used_len, pdu = self.framer.processIncomingFrame(data)
if pdu:
self._handle_response(pdu)
return used_len

def __str__(self):
"""Build a string representation of the connection.
Expand Down
12 changes: 12 additions & 0 deletions pymodbus/transaction/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""Transaction."""
__all__ = [
"ModbusTransactionManager",
"SyncModbusTransactionManager",
"TransactionManager",
]

from pymodbus.transaction.old_transaction import (
ModbusTransactionManager,
SyncModbusTransactionManager,
)
from pymodbus.transaction.transaction import TransactionManager
File renamed without changes.
131 changes: 131 additions & 0 deletions pymodbus/transaction/transaction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
"""Collection of transaction based abstractions."""
from __future__ import annotations

import asyncio
from collections.abc import Callable

from pymodbus.exceptions import ConnectionException, ModbusIOException
from pymodbus.framer import FramerBase
from pymodbus.logging import Log
from pymodbus.pdu import ModbusPDU
from pymodbus.transport import CommParams, ModbusProtocol


class TransactionManager(ModbusProtocol):
"""Transaction manager.
This is the central class of the library, providing a separation between API and communication:
- clients/servers calls the manager to execute requests/responses
- transport/framer/pdu is by the manager to communicate with the devices
Transaction manager handles:
- Execution of requests (client), with retries and locking
- Sending of responses (server), with retries
- Connection management (on top of what transport offers)
- No response (temporarily) from a device
Transaction manager offers:
- a simple execute interface for requests (client)
- a simple send interface for responses (server)
- external trace methods tracing:
- outgoing/incoming packets (byte stream)
- outgoing/incoming PDUs
"""

def __init__(
self,
params: CommParams,
framer: FramerBase,
retries: int,
is_server: bool,
) -> None:
"""Initialize an instance of the ModbusTransactionManager."""
super().__init__(params, is_server)
self.framer = framer
self.retries = retries
self.next_tid: int = 0
self.trace_recv_packet: Callable[[bytes | None], bytes] | None = None
self.trace_recv_pdu: Callable[[ModbusPDU | None], ModbusPDU] | None = None
self.trace_send_packet: Callable[[bytes | None], bytes] | None = None
self.trace_send_pdu: Callable[[ModbusPDU | None], ModbusPDU] | None = None
self.accept_no_response_limit = retries + 3
self.count_no_responses = 0
self._lock = asyncio.Lock()
self.response_future: asyncio.Future = asyncio.Future()

async def execute(self, no_response_expected: bool, request) -> ModbusPDU | None:
"""Execute requests asynchronously."""
if not self.transport:
Log.warning("Not connected, trying to connect!")
if not self.transport and not await self.connect():
raise ConnectionException("Client cannot connect (automatic retry continuing) !!")
async with self._lock:
request.transaction_id = self.getNextTID()
if self.trace_send_pdu:
request = self.trace_send_pdu(request) # pylint: disable=not-callable
packet = self.framer.buildFrame(request)
count_retries = 0
while count_retries <= self.retries:
if self.trace_send_packet:
packet = self.trace_send_packet(packet) # pylint: disable=not-callable
self.send(packet)
if no_response_expected:
return None
try:
response = await asyncio.wait_for(
self.response_future, timeout=self.comm_params.timeout_connect
)
self.count_no_responses = 0
self.response_future = asyncio.Future()
return response
except asyncio.exceptions.TimeoutError:
count_retries += 1
if self.count_no_responses >= self.accept_no_response_limit:
self.connection_lost(asyncio.TimeoutError("Server not responding"))
raise ModbusIOException(
f"ERROR: No response received of the last {self.accept_no_response_limit} request, CLOSING CONNECTION."
)
self.count_no_responses += 1
Log.error(f"No response received after {self.retries} retries, continue with next request")
self.response_future = asyncio.Future()
return None

def callback_new_connection(self):
"""Call when listener receive new connection request."""

def callback_connected(self) -> None:
"""Call when connection is succcesfull."""
self.count_no_responses = 0
self.next_tid = 0

def callback_disconnected(self, exc: Exception | None) -> None:
"""Call when connection is lost."""
if self.trace_recv_packet:
self.trace_recv_packet(None) # pylint: disable=not-callable
if self.trace_recv_pdu:
self.trace_recv_pdu(None) # pylint: disable=not-callable
if self.trace_send_packet:
self.trace_send_packet(None) # pylint: disable=not-callable
if self.trace_send_pdu:
self.trace_send_pdu(None) # pylint: disable=not-callable

def callback_data(self, data: bytes, addr: tuple | None = None) -> int:
"""Handle received data."""
_ = (addr)
if self.trace_recv_packet:
data = self.trace_recv_packet(data) # pylint: disable=not-callable
used_len, pdu = self.framer.processIncomingFrame(data)
if pdu:
if self.trace_recv_pdu:
pdu = self.trace_recv_pdu(pdu) # pylint: disable=not-callable
self.response_future.set_result(pdu)
return used_len

def getNextTID(self) -> int:
"""Retrieve the next transaction identifier."""
if self.next_tid >= 65000:
self.next_tid = 1
else:
self.next_tid += 1
return self.next_tid
2 changes: 1 addition & 1 deletion test/framer/test_extras.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@


class TestExtas:
"""Unittest for the pymodbus.transaction module."""
"""Test for the framer module."""

client = None
decoder = None
Expand Down
Loading

0 comments on commit b861b7c

Please sign in to comment.