Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update package_test_tool (add 4 test scenarios) #2107

Merged
merged 2 commits into from
Mar 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 63 additions & 27 deletions examples/package_test_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@

Send raw data packets to the server (remark data is frame+request)

*** handle_client_data(transport, data) ***
*** simulate_server(transport, request) ***

Called when data is received from the client/server (remark data is frame+request)
Called when data is received from the client (remark data is frame+request)

The function generates frame+response and sends it.

And one function which can be modified to test the server functionality:
*** simulate_client(transport, response) ***

Called when data is received from the server (remark data is frame+request)

"""
from __future__ import annotations

Expand All @@ -46,7 +49,7 @@

import pymodbus.client as modbusClient
import pymodbus.server as modbusServer
from pymodbus import Framer, pymodbus_apply_logging_config
from pymodbus import Framer, ModbusException, pymodbus_apply_logging_config
from pymodbus.datastore import (
ModbusSequentialDataBlock,
ModbusServerContext,
Expand All @@ -69,6 +72,7 @@ def __init__(
"""Initialize a stub instance."""
self.stub_handle_data = handler
super().__init__(params, is_server)
self.is_tcp = params.comm_type == CommType.TCP

async def start_run(self):
"""Call need functions to start server/client."""
Expand All @@ -78,7 +82,7 @@ async def start_run(self):

def callback_data(self, data: bytes, addr: tuple | None = None) -> int:
"""Handle received data."""
self.stub_handle_data(self, data)
self.stub_handle_data(self, self.is_tcp, data)
return len(data)

def callback_connected(self) -> None:
Expand All @@ -95,27 +99,33 @@ def callback_new_connection(self) -> ModbusProtocol:
return new_stub


test_port = 5004 # pylint: disable=invalid-name

class ClientTester: # pylint: disable=too-few-public-methods
"""Main program."""

def __init__(self, comm: CommType):
"""Initialize runtime tester."""
global test_port # pylint: disable=global-statement
self.comm = comm
host = NULLMODEM_HOST

if comm == CommType.TCP:
self.client = modbusClient.AsyncModbusTcpClient(
NULLMODEM_HOST,
port=5004,
host,
port=test_port,
)
elif comm == CommType.SERIAL:
host = f"{NULLMODEM_HOST}:{test_port}"
self.client = modbusClient.AsyncModbusSerialClient(
f"{NULLMODEM_HOST}:5004",
host,
)
else:
raise RuntimeError("ERROR: CommType not implemented")
server_params = self.client.comm_params.copy()
server_params.source_address = (f"{NULLMODEM_HOST}:5004", 5004)
self.stub = TransportStub(server_params, True, handle_client_data)
server_params.source_address = (host, test_port)
self.stub = TransportStub(server_params, True, simulate_server)
test_port += 1


async def run(self):
Expand All @@ -135,6 +145,7 @@ class ServerTester: # pylint: disable=too-few-public-methods

def __init__(self, comm: CommType):
"""Initialize runtime tester."""
global test_port # pylint: disable=global-statement
self.comm = comm
self.store = ModbusSlaveContext(
di=ModbusSequentialDataBlock(0, [17] * 100),
Expand All @@ -151,22 +162,23 @@ def __init__(self, comm: CommType):
self.context,
framer=Framer.SOCKET,
identity=self.identity,
address=(NULLMODEM_HOST, 5004),
address=(NULLMODEM_HOST, test_port),
)
elif comm == CommType.SERIAL:
self.server = modbusServer.ModbusSerialServer(
self.context,
framer=Framer.SOCKET,
identity=self.identity,
port=f"{NULLMODEM_HOST}:5004",
port=f"{NULLMODEM_HOST}:{test_port}",
)
else:
raise RuntimeError("ERROR: CommType not implemented")
client_params = self.server.comm_params.copy()
client_params.host = client_params.source_address[0]
client_params.port = client_params.source_address[1]
client_params.timeout_connect = 1.0
self.stub = TransportStub(client_params, False, handle_server_data)
self.stub = TransportStub(client_params, False, simulate_client)
test_port += 1


async def run(self):
Expand All @@ -175,7 +187,7 @@ async def run(self):
Log.debug("--> Start testing.")
await self.server.listen()
await self.stub.start_run()
await server_calls(self.stub)
await server_calls(self.stub, (self.comm == CommType.TCP))
Log.debug("--> Shutting down.")
await self.server.shutdown()

Expand All @@ -194,21 +206,43 @@ async def main(comm: CommType, use_server: bool):
async def client_calls(client):
"""Test client API."""
Log.debug("--> Client calls starting.")
_resp = await client.read_holding_registers(address=124, count=4, slave=0)

try:
resp = await client.read_holding_registers(address=124, count=4, slave=0)
except ModbusException as exc:
txt = f"ERROR: exception in pymodbus {exc}"
Log.error(txt)
return
if resp.isError():
txt = "ERROR: pymodbus returned an error!"
Log.error(txt)
await asyncio.sleep(1)
client.close()
print("---> CLIENT all done")

async def server_calls(transport: ModbusProtocol):
"""Test client API."""
async def server_calls(transport: ModbusProtocol, is_tcp: bool):
"""Test server functionality."""
Log.debug("--> Server calls starting.")
_resp = transport.send(b'\x00\x02\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01' +
b'\x07\x00\x03\x00\x00\x06\x01\x03\x00\x00\x00\x01')

if is_tcp:
request = b'\x00\x02\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01'
else:
# 2 responses:
# response = b'\x00\x02\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01' +
# b'\x07\x00\x03\x00\x00\x06\x01\x03\x00\x00\x00\x01')
# 1 response:
request = b'\x00\x02\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01'
transport.send(request)
await asyncio.sleep(1)
print("---> all done")
transport.close()
print("---> SERVER all done")

def handle_client_data(transport: ModbusProtocol, data: bytes):
def simulate_server(transport: ModbusProtocol, is_tcp: bool, request: bytes):
"""Respond to request at transport level."""
Log.debug("--> stub called with request {}.", data, ":hex")
response = b'\x01\x03\x08\x00\x05\x00\x05\x00\x00\x00\x00\x0c\xd7'
Log.debug("--> Server simulator called with request {}.", request, ":hex")
if is_tcp:
response = b'\x00\x01\x00\x00\x00\x06\x00\x03\x00\x7c\x00\x04'
else:
response = b'\x01\x03\x08\x00\x05\x00\x05\x00\x00\x00\x00\x0c\xd7'

# Multiple send is allowed, to test fragmentation
# for data in response:
Expand All @@ -217,12 +251,14 @@ def handle_client_data(transport: ModbusProtocol, data: bytes):
transport.send(response)


def handle_server_data(_transport: ModbusProtocol, data: bytes):
def simulate_client(_transport: ModbusProtocol, _is_tcp: bool, response: bytes):
"""Respond to request at transport level."""
Log.debug("--> stub called with response {}.", data, ":hex")
Log.debug("--> Client simulator called with response {}.", response, ":hex")


if __name__ == "__main__":
# True for Server test, False for Client test
# asyncio.run(main(CommType.SERIAL, False), debug=True)
asyncio.run(main(CommType.SERIAL, False), debug=True)
asyncio.run(main(CommType.SERIAL, True), debug=True)
asyncio.run(main(CommType.TCP, False), debug=True)
asyncio.run(main(CommType.TCP, True), debug=True)