From 8b7d8018cbc3bb22df8b7070cbac8f450017ca17 Mon Sep 17 00:00:00 2001 From: jan iversen Date: Mon, 9 Oct 2023 19:02:07 +0200 Subject: [PATCH 1/6] Reduce transport_serial. --- pymodbus/transport/transport_serial.py | 145 +++++++++---------------- 1 file changed, 54 insertions(+), 91 deletions(-) diff --git a/pymodbus/transport/transport_serial.py b/pymodbus/transport/transport_serial.py index f5b28d5e3..6422f30c5 100644 --- a/pymodbus/transport/transport_serial.py +++ b/pymodbus/transport/transport_serial.py @@ -18,18 +18,49 @@ def __init__(self, loop, protocol, *args, **kwargs): self.async_loop = loop self._protocol: asyncio.BaseProtocol = protocol self.sync_serial = serial.serial_for_url(*args, **kwargs) - self._closing = False self._write_buffer = [] - self.set_write_buffer_limits() self._has_reader = False self._has_writer = False self._poll_wait_time = 0.0005 - - # Asynchronous I/O requires non-blocking devices self.sync_serial.timeout = 0 self.sync_serial.write_timeout = 0 - loop.call_soon(protocol.connection_made, self) - loop.call_soon(self._ensure_reader) + + def setup(self): + """Prepare to read/write""" + self.async_loop.call_soon(self._protocol.connection_made, self) + if os.name == "nt": + self._has_reader = self.async_loop.call_later( + self._poll_wait_time, self._poll_read + ) + else: + self.async_loop.add_reader(self.sync_serial.fileno(), self._read_ready) + self._has_reader = True + + def close(self, exc=None): + """Close the transport gracefully.""" + if self._has_reader: + if os.name == "nt": + self._has_reader.cancel() + else: + self.async_loop.remove_reader(self.sync_serial.fileno()) + self._has_reader = False + self._remove_writer() + self.async_loop.call_soon(self._call_connection_lost, exc) + + def write(self, data): + """Write some data to the transport.""" + self._write_buffer.append(data) + if not self._has_writer: + if os.name == "nt": + self._has_writer = self.async_loop.call_soon(self._poll_write) + else: + self.async_loop.add_writer(self.sync_serial.fileno(), self._write_ready) + self._has_writer = True + + def flush(self): + """Clear output buffer and stops any more data being written""" + self._remove_writer() + self._write_buffer.clear() # ------------------------------------------------ # Dummy methods needed to please asyncio.Transport. @@ -75,80 +106,44 @@ def pause_reading(self): def resume_reading(self): """Resume receiver.""" - # ------------------------------------------------ - def is_closing(self): """Return True if the transport is closing or closed.""" - return self._closing + return False - def close(self): - """Close the transport gracefully.""" - if self._closing: - return - self._closing = True - self._remove_reader() - self._remove_writer() - self.async_loop.call_soon(self._call_connection_lost, None) + def abort(self): + """Close the transport immediately.""" + self.close() + + # ------------------------------------------------ def _read_ready(self): """Test if there are data waiting.""" try: data = self.sync_serial.read(1024) except serial.SerialException as exc: - self.async_loop.call_soon(self._call_connection_lost, exc) - self.close() + self.close(exc) else: if data: self._protocol.data_received(data) - def write(self, data): - """Write some data to the transport.""" - if self._closing: - return - - self._write_buffer.append(data) - self._ensure_writer() - - def abort(self): - """Close the transport immediately.""" - self.close() - - def flush(self): - """Clear output buffer and stops any more data being written""" - self._remove_writer() - self._write_buffer.clear() - def _write_ready(self): """Asynchronously write buffered data.""" data = b"".join(self._write_buffer) - assert data, "Write buffer should not be empty" - - self._write_buffer.clear() - try: - nlen = self.sync_serial.write(data) + if nlen := self.sync_serial.write(data) < len(data): + self._write_buffer = data[nlen:] + return + self._write_buffer.clear() + self._remove_writer() except (BlockingIOError, InterruptedError): - self._write_buffer.append(data) + return except serial.SerialException as exc: - self.async_loop.call_soon(self._call_connection_lost, exc) - self.abort() - else: - if nlen == len(data): - assert not self.get_write_buffer_size() - self._remove_writer() - if self._closing and not self.get_write_buffer_size(): - self.close() - return - - assert 0 <= nlen < len(data) - data = data[nlen:] - self._write_buffer.append(data) # Try again later - assert self._has_writer + self.close(exc) if os.name == "nt": def _poll_read(self): - if self._has_reader and not self._closing: + if self._has_reader: try: self._has_reader = self.async_loop.call_later( self._poll_wait_time, self._poll_read @@ -159,28 +154,13 @@ def _poll_read(self): self.async_loop.call_soon(self._call_connection_lost, exc) self.abort() - def _ensure_reader(self): - if not self._has_reader and not self._closing: - self._has_reader = self.async_loop.call_later( - self._poll_wait_time, self._poll_read - ) - - def _remove_reader(self): - if self._has_reader: - self._has_reader.cancel() - self._has_reader = False - def _poll_write(self): - if self._has_writer and not self._closing: + if self._has_writer: self._has_writer = self.async_loop.call_later( self._poll_wait_time, self._poll_write ) self._write_ready() - def _ensure_writer(self): - if not self._has_writer and not self._closing: - self._has_writer = self.async_loop.call_soon(self._poll_write) - def _remove_writer(self): if self._has_writer: self._has_writer.cancel() @@ -188,21 +168,6 @@ def _remove_writer(self): else: - def _ensure_reader(self): - if (not self._has_reader) and (not self._closing): - self.async_loop.add_reader(self.sync_serial.fileno(), self._read_ready) - self._has_reader = True - - def _remove_reader(self): - if self._has_reader: - self.async_loop.remove_reader(self.sync_serial.fileno()) - self._has_reader = False - - def _ensure_writer(self): - if (not self._has_writer) and (not self._closing): - self.async_loop.add_writer(self.sync_serial.fileno(), self._write_ready) - self._has_writer = True - def _remove_writer(self): if self._has_writer: self.async_loop.remove_writer(self.sync_serial.fileno()) @@ -210,9 +175,6 @@ def _remove_writer(self): def _call_connection_lost(self, exc): """Close the connection.""" - assert self._closing - assert not self._has_writer - assert not self._has_reader if self.sync_serial: with contextlib.suppress(Exception): self.sync_serial.flush() @@ -231,4 +193,5 @@ async def create_serial_connection(loop, protocol_factory, *args, **kwargs): """Create a connection to a new serial port instance.""" protocol = protocol_factory() transport = SerialTransport(loop, protocol, *args, **kwargs) + loop.call_soon(transport.setup) return transport, protocol From 409b800bcc79f64efbcd3988aa87e68b70c49ba0 Mon Sep 17 00:00:00 2001 From: jan iversen Date: Mon, 9 Oct 2023 22:18:24 +0200 Subject: [PATCH 2/6] new I. --- pymodbus/transport/transport_serial.py | 33 ++++++++++---------------- test/sub_transport/test_basic.py | 1 + 2 files changed, 14 insertions(+), 20 deletions(-) diff --git a/pymodbus/transport/transport_serial.py b/pymodbus/transport/transport_serial.py index 6422f30c5..91021f755 100644 --- a/pymodbus/transport/transport_serial.py +++ b/pymodbus/transport/transport_serial.py @@ -38,6 +38,11 @@ def setup(self): def close(self, exc=None): """Close the transport gracefully.""" + if not self.sync_serial: + return + with contextlib.suppress(Exception): + self.sync_serial.flush() + if self._has_reader: if os.name == "nt": self._has_reader.cancel() @@ -45,7 +50,11 @@ def close(self, exc=None): self.async_loop.remove_reader(self.sync_serial.fileno()) self._has_reader = False self._remove_writer() - self.async_loop.call_soon(self._call_connection_lost, exc) + self.sync_serial.close() + self.sync_serial = None + with contextlib.suppress(Exception): + self._protocol.connection_lost(exc) + self._write_buffer.clear() def write(self, data): """Write some data to the transport.""" @@ -121,7 +130,7 @@ def _read_ready(self): try: data = self.sync_serial.read(1024) except serial.SerialException as exc: - self.close(exc) + self.close(exc=exc) else: if data: self._protocol.data_received(data) @@ -138,7 +147,7 @@ def _write_ready(self): except (BlockingIOError, InterruptedError): return except serial.SerialException as exc: - self.close(exc) + self.close(exc=exc) if os.name == "nt": @@ -151,8 +160,7 @@ def _poll_read(self): if self.sync_serial.in_waiting: self._read_ready() except serial.SerialException as exc: - self.async_loop.call_soon(self._call_connection_lost, exc) - self.abort() + self.close(exc=exc) def _poll_write(self): if self._has_writer: @@ -173,21 +181,6 @@ def _remove_writer(self): self.async_loop.remove_writer(self.sync_serial.fileno()) self._has_writer = False - def _call_connection_lost(self, exc): - """Close the connection.""" - if self.sync_serial: - with contextlib.suppress(Exception): - self.sync_serial.flush() - - self.sync_serial.close() - self.sync_serial = None - if self._protocol: - with contextlib.suppress(Exception): - self._protocol.connection_lost(exc) - - self._write_buffer.clear() - self._write_buffer.clear() - async def create_serial_connection(loop, protocol_factory, *args, **kwargs): """Create a connection to a new serial port instance.""" diff --git a/test/sub_transport/test_basic.py b/test/sub_transport/test_basic.py index 788818ed0..de30abacc 100644 --- a/test/sub_transport/test_basic.py +++ b/test/sub_transport/test_basic.py @@ -323,6 +323,7 @@ async def test_external_methods(self): comm.write(b"abcd") comm.flush() comm.close() + comm = SerialTransport(mock.MagicMock(), mock.Mock(), "dummy") comm.abort() assert await create_serial_connection( asyncio.get_running_loop(), mock.Mock, url="dummy" From 2e464b7137e4ad2001047b92131e730468ac1d6d Mon Sep 17 00:00:00 2001 From: jan iversen Date: Mon, 9 Oct 2023 22:22:44 +0200 Subject: [PATCH 3/6] II. --- pymodbus/transport/transport_serial.py | 25 ++++++++----------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/pymodbus/transport/transport_serial.py b/pymodbus/transport/transport_serial.py index 91021f755..82a70ee8b 100644 --- a/pymodbus/transport/transport_serial.py +++ b/pymodbus/transport/transport_serial.py @@ -49,12 +49,11 @@ def close(self, exc=None): else: self.async_loop.remove_reader(self.sync_serial.fileno()) self._has_reader = False - self._remove_writer() + self.flush() self.sync_serial.close() self.sync_serial = None with contextlib.suppress(Exception): self._protocol.connection_lost(exc) - self._write_buffer.clear() def write(self, data): """Write some data to the transport.""" @@ -68,7 +67,12 @@ def write(self, data): def flush(self): """Clear output buffer and stops any more data being written""" - self._remove_writer() + if self._has_writer: + if os.name == "nt": + self._has_writer.cancel() + else: + self.async_loop.remove_writer(self.sync_serial.fileno()) + self._has_writer = False self._write_buffer.clear() # ------------------------------------------------ @@ -142,8 +146,7 @@ def _write_ready(self): if nlen := self.sync_serial.write(data) < len(data): self._write_buffer = data[nlen:] return - self._write_buffer.clear() - self._remove_writer() + self.flush() except (BlockingIOError, InterruptedError): return except serial.SerialException as exc: @@ -169,18 +172,6 @@ def _poll_write(self): ) self._write_ready() - def _remove_writer(self): - if self._has_writer: - self._has_writer.cancel() - self._has_writer = False - - else: - - def _remove_writer(self): - if self._has_writer: - self.async_loop.remove_writer(self.sync_serial.fileno()) - self._has_writer = False - async def create_serial_connection(loop, protocol_factory, *args, **kwargs): """Create a connection to a new serial port instance.""" From 46333ef6822acb67c0c8a0c404709bb83b69458f Mon Sep 17 00:00:00 2001 From: jan iversen Date: Mon, 9 Oct 2023 22:24:09 +0200 Subject: [PATCH 4/6] III. --- pymodbus/transport/transport_serial.py | 34 ++++++++++++-------------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/pymodbus/transport/transport_serial.py b/pymodbus/transport/transport_serial.py index 82a70ee8b..d00b527ac 100644 --- a/pymodbus/transport/transport_serial.py +++ b/pymodbus/transport/transport_serial.py @@ -152,25 +152,23 @@ def _write_ready(self): except serial.SerialException as exc: self.close(exc=exc) - if os.name == "nt": - - def _poll_read(self): - if self._has_reader: - try: - self._has_reader = self.async_loop.call_later( - self._poll_wait_time, self._poll_read - ) - if self.sync_serial.in_waiting: - self._read_ready() - except serial.SerialException as exc: - self.close(exc=exc) - - def _poll_write(self): - if self._has_writer: - self._has_writer = self.async_loop.call_later( - self._poll_wait_time, self._poll_write + def _poll_read(self): + if self._has_reader: + try: + self._has_reader = self.async_loop.call_later( + self._poll_wait_time, self._poll_read ) - self._write_ready() + if self.sync_serial.in_waiting: + self._read_ready() + except serial.SerialException as exc: + self.close(exc=exc) + + def _poll_write(self): + if self._has_writer: + self._has_writer = self.async_loop.call_later( + self._poll_wait_time, self._poll_write + ) + self._write_ready() async def create_serial_connection(loop, protocol_factory, *args, **kwargs): From 2a49b79cde27ed4ac38c9d25260f664f2eec830a Mon Sep 17 00:00:00 2001 From: jan iversen Date: Mon, 9 Oct 2023 22:36:28 +0200 Subject: [PATCH 5/6] III. --- pymodbus/transport/transport_serial.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pymodbus/transport/transport_serial.py b/pymodbus/transport/transport_serial.py index d00b527ac..29a29aafa 100644 --- a/pymodbus/transport/transport_serial.py +++ b/pymodbus/transport/transport_serial.py @@ -145,12 +145,13 @@ def _write_ready(self): try: if nlen := self.sync_serial.write(data) < len(data): self._write_buffer = data[nlen:] - return + return True self.flush() except (BlockingIOError, InterruptedError): - return + return True except serial.SerialException as exc: self.close(exc=exc) + return False def _poll_read(self): if self._has_reader: @@ -164,11 +165,12 @@ def _poll_read(self): self.close(exc=exc) def _poll_write(self): - if self._has_writer: + if not self._has_writer: + return + if self._write_ready(): self._has_writer = self.async_loop.call_later( self._poll_wait_time, self._poll_write ) - self._write_ready() async def create_serial_connection(loop, protocol_factory, *args, **kwargs): From 5e1964ee86be05dbc56f50a36da8da2172618655 Mon Sep 17 00:00:00 2001 From: jan iversen Date: Tue, 10 Oct 2023 09:37:17 +0200 Subject: [PATCH 6/6] W 1. --- pymodbus/transport/transport_serial.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pymodbus/transport/transport_serial.py b/pymodbus/transport/transport_serial.py index 29a29aafa..09f4a7663 100644 --- a/pymodbus/transport/transport_serial.py +++ b/pymodbus/transport/transport_serial.py @@ -132,12 +132,10 @@ def abort(self): def _read_ready(self): """Test if there are data waiting.""" try: - data = self.sync_serial.read(1024) + if data := self.sync_serial.read(1024): + self._protocol.data_received(data) except serial.SerialException as exc: self.close(exc=exc) - else: - if data: - self._protocol.data_received(data) def _write_ready(self): """Asynchronously write buffered data."""