-
Notifications
You must be signed in to change notification settings - Fork 951
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
dd6276a
commit 8062d3b
Showing
5 changed files
with
286 additions
and
27 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
"""Null modem transport. | ||
This is a special transport, mostly thought of for testing. | ||
NullModem interconnect 2 transport objects and transfers calls: | ||
- server.listen() | ||
- dummy | ||
- client.connect() | ||
- call client.connection_made() | ||
- call server.connection_made() | ||
- client/server.close() | ||
- call client.connection_lost() | ||
- call server.connection_lost() | ||
- server/client.send | ||
- call client/server.data_received() | ||
""" | ||
from __future__ import annotations | ||
|
||
import asyncio | ||
|
||
from pymodbus.logging import Log | ||
from pymodbus.transport.transport import Transport | ||
|
||
|
||
class DummyTransport(asyncio.BaseTransport): | ||
"""Use in connection_made calls.""" | ||
|
||
def close(self): | ||
"""Define dummy.""" | ||
|
||
def get_protocol(self): | ||
"""Define dummy.""" | ||
|
||
def is_closing(self): | ||
"""Define dummy.""" | ||
|
||
def set_protocol(self, _protocol): | ||
"""Define dummy.""" | ||
|
||
def abort(self): | ||
"""Define dummy.""" | ||
|
||
|
||
class NullModem(Transport): | ||
"""Transport layer. | ||
Contains methods to act as a null modem between 2 objects. | ||
(Allowing tests to be shortcut without actual network calls) | ||
""" | ||
|
||
nullmodem_client: NullModem = None | ||
nullmodem_server: NullModem = None | ||
|
||
def __init__(self, *arg): | ||
"""Overwrite init.""" | ||
self.is_server: bool = False | ||
self.other_end: NullModem = None | ||
super().__init__(*arg) | ||
|
||
async def transport_connect(self) -> bool: | ||
"""Handle generic connect and call on to specific transport connect.""" | ||
Log.debug("NullModem: Simulate connect on {}", self.comm_params.comm_name) | ||
if not self.loop: | ||
self.loop = asyncio.get_running_loop() | ||
if self.nullmodem_server: | ||
self.__class__.nullmodem_client = self | ||
self.other_end = self.nullmodem_server | ||
self.nullmodem_server.other_end = self | ||
self.cb_connection_made() | ||
self.other_end.cb_connection_made() | ||
return True | ||
return False | ||
|
||
async def transport_listen(self): | ||
"""Handle generic listen and call on to specific transport listen.""" | ||
Log.debug("NullModem: Simulate listen on {}", self.comm_params.comm_name) | ||
if not self.loop: | ||
self.loop = asyncio.get_running_loop() | ||
self.is_server = True | ||
self.__class__.nullmodem_server = self | ||
return DummyTransport() | ||
|
||
# -------------------------------- # | ||
# Helper methods for child classes # | ||
# -------------------------------- # | ||
async def send(self, data: bytes) -> bool: | ||
"""Send request. | ||
:param data: non-empty bytes object with data to send. | ||
""" | ||
Log.debug("NullModem: simulate send {}", data, ":hex") | ||
self.other_end.data_received(data) | ||
return True | ||
|
||
def close(self, reconnect: bool = False) -> None: | ||
"""Close connection. | ||
:param reconnect: (default false), try to reconnect | ||
""" | ||
self.recv_buffer = b"" | ||
if not reconnect: | ||
if self.nullmodem_client: | ||
self.nullmodem_client.cb_connection_lost(None) | ||
if self.nullmodem_server: | ||
self.nullmodem_server.cb_connection_lost(None) | ||
self.__class__.nullmodem_client = None | ||
self.__class__.nullmodem_server = None | ||
|
||
# ----------------- # | ||
# The magic methods # | ||
# ----------------- # | ||
def __str__(self) -> str: | ||
"""Build a string representation of the connection.""" | ||
return f"{self.__class__.__name__}({self.comm_params.comm_name})" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
"""Test transport.""" | ||
|
||
from pymodbus.transport.nullmodem import DummyTransport | ||
|
||
|
||
class TestNullModemTransport: | ||
"""Test null modem module.""" | ||
|
||
async def test_str_magic(self, nullmodem, params): | ||
"""Test magic.""" | ||
str(nullmodem) | ||
assert str(nullmodem) == f"NullModem({params.comm_name})" | ||
|
||
def test_DummyTransport(self): | ||
"""Test DummyTransport class.""" | ||
socket = DummyTransport() | ||
socket.close() | ||
socket.get_protocol() | ||
socket.is_closing() | ||
socket.set_protocol(None) | ||
socket.abort() | ||
|
||
def test_class_variables(self, nullmodem, nullmodem_server): | ||
"""Test connection_made().""" | ||
assert not nullmodem.nullmodem_client | ||
assert not nullmodem.nullmodem_server | ||
assert not nullmodem_server.nullmodem_client | ||
assert not nullmodem_server.nullmodem_server | ||
nullmodem.__class__.nullmodem_client = self | ||
nullmodem.is_server = False | ||
nullmodem_server.__class__.nullmodem_server = self | ||
nullmodem_server.is_server = True | ||
|
||
assert nullmodem.nullmodem_client == nullmodem_server.nullmodem_client | ||
assert nullmodem.nullmodem_server == nullmodem_server.nullmodem_server | ||
|
||
async def test_transport_connect(self, nullmodem): | ||
"""Test connection_made().""" | ||
nullmodem.loop = None | ||
assert not await nullmodem.transport_connect() | ||
assert not nullmodem.nullmodem_server | ||
assert not nullmodem.nullmodem_client | ||
assert nullmodem.loop | ||
nullmodem.cb_connection_made.assert_not_called() | ||
nullmodem.cb_connection_lost.assert_not_called() | ||
nullmodem.cb_handle_data.assert_not_called() | ||
|
||
async def test_transport_listen(self, nullmodem_server): | ||
"""Test connection_made().""" | ||
nullmodem_server.loop = None | ||
assert await nullmodem_server.transport_listen() | ||
assert nullmodem_server.is_server | ||
assert nullmodem_server.nullmodem_server | ||
assert not nullmodem_server.nullmodem_client | ||
assert nullmodem_server.loop | ||
nullmodem_server.cb_connection_made.assert_not_called() | ||
nullmodem_server.cb_connection_lost.assert_not_called() | ||
nullmodem_server.cb_handle_data.assert_not_called() | ||
|
||
async def test_connected(self, nullmodem, nullmodem_server): | ||
"""Test connection is correct.""" | ||
assert await nullmodem_server.transport_listen() | ||
assert await nullmodem.transport_connect() | ||
assert nullmodem.nullmodem_client | ||
assert nullmodem.nullmodem_server | ||
assert nullmodem.loop | ||
assert not nullmodem.is_server | ||
assert nullmodem_server.is_server | ||
nullmodem.cb_connection_made.assert_called_once() | ||
nullmodem.cb_connection_lost.assert_not_called() | ||
nullmodem.cb_handle_data.assert_not_called() | ||
nullmodem_server.cb_connection_made.assert_called_once() | ||
nullmodem_server.cb_connection_lost.assert_not_called() | ||
nullmodem_server.cb_handle_data.assert_not_called() | ||
|
||
async def test_client_close(self, nullmodem, nullmodem_server): | ||
"""Test close().""" | ||
assert await nullmodem_server.transport_listen() | ||
assert await nullmodem.transport_connect() | ||
nullmodem.close() | ||
assert not nullmodem.nullmodem_client | ||
assert not nullmodem.nullmodem_server | ||
nullmodem.cb_connection_made.assert_called_once() | ||
nullmodem.cb_connection_lost.assert_called_once() | ||
nullmodem.cb_handle_data.assert_not_called() | ||
nullmodem_server.cb_connection_made.assert_called_once() | ||
nullmodem_server.cb_connection_lost.assert_called_once() | ||
nullmodem_server.cb_handle_data.assert_not_called() | ||
|
||
async def test_server_close(self, nullmodem, nullmodem_server): | ||
"""Test close().""" | ||
assert await nullmodem_server.transport_listen() | ||
assert await nullmodem.transport_connect() | ||
nullmodem_server.close() | ||
assert not nullmodem.nullmodem_client | ||
assert not nullmodem.nullmodem_server | ||
nullmodem.cb_connection_made.assert_called_once() | ||
nullmodem.cb_connection_lost.assert_called_once() | ||
nullmodem.cb_handle_data.assert_not_called() | ||
nullmodem_server.cb_connection_made.assert_called_once() | ||
nullmodem_server.cb_connection_lost.assert_called_once() | ||
nullmodem_server.cb_handle_data.assert_not_called() | ||
|
||
async def test_data(self, nullmodem, nullmodem_server): | ||
"""Test data exchange.""" | ||
data = b"abcd" | ||
assert await nullmodem_server.transport_listen() | ||
assert await nullmodem.transport_connect() | ||
assert await nullmodem.send(data) | ||
assert nullmodem_server.recv_buffer == data | ||
assert not nullmodem.recv_buffer | ||
nullmodem.cb_handle_data.assert_not_called() | ||
nullmodem_server.cb_handle_data.assert_called_once() | ||
assert await nullmodem_server.send(data) | ||
assert nullmodem_server.recv_buffer == data | ||
assert nullmodem.recv_buffer == data | ||
nullmodem.cb_handle_data.assert_called_once() | ||
nullmodem_server.cb_handle_data.assert_called_once() |