Skip to content

Commit

Permalink
update package_test_tool (add 4 test scenarios) (#2107)
Browse files Browse the repository at this point in the history
  • Loading branch information
janiversen committed Apr 7, 2024
1 parent b1516a3 commit 11dc429
Showing 1 changed file with 63 additions and 27 deletions.
90 changes: 63 additions & 27 deletions examples/package_test_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@
Send raw data packets to the server (remark data is frame+request)
*** handle_client_data(transport, data) ***
*** simulate_server(transport, request) ***
Called when data is received from the client/server (remark data is frame+request)
Called when data is received from the client (remark data is frame+request)
The function generates frame+response and sends it.
And one function which can be modified to test the server functionality:
*** simulate_client(transport, response) ***
Called when data is received from the server (remark data is frame+request)
"""
from __future__ import annotations

Expand All @@ -46,7 +49,7 @@

import pymodbus.client as modbusClient
import pymodbus.server as modbusServer
from pymodbus import Framer, pymodbus_apply_logging_config
from pymodbus import Framer, ModbusException, pymodbus_apply_logging_config
from pymodbus.datastore import (
ModbusSequentialDataBlock,
ModbusServerContext,
Expand All @@ -69,6 +72,7 @@ def __init__(
"""Initialize a stub instance."""
self.stub_handle_data = handler
super().__init__(params, is_server)
self.is_tcp = params.comm_type == CommType.TCP

async def start_run(self):
"""Call need functions to start server/client."""
Expand All @@ -78,7 +82,7 @@ async def start_run(self):

def callback_data(self, data: bytes, addr: tuple | None = None) -> int:
"""Handle received data."""
self.stub_handle_data(self, data)
self.stub_handle_data(self, self.is_tcp, data)
return len(data)

def callback_connected(self) -> None:
Expand All @@ -95,27 +99,33 @@ def callback_new_connection(self) -> ModbusProtocol:
return new_stub


test_port = 5004 # pylint: disable=invalid-name

class ClientTester: # pylint: disable=too-few-public-methods
"""Main program."""

def __init__(self, comm: CommType):
"""Initialize runtime tester."""
global test_port # pylint: disable=global-statement
self.comm = comm
host = NULLMODEM_HOST

if comm == CommType.TCP:
self.client = modbusClient.AsyncModbusTcpClient(
NULLMODEM_HOST,
port=5004,
host,
port=test_port,
)
elif comm == CommType.SERIAL:
host = f"{NULLMODEM_HOST}:{test_port}"
self.client = modbusClient.AsyncModbusSerialClient(
f"{NULLMODEM_HOST}:5004",
host,
)
else:
raise RuntimeError("ERROR: CommType not implemented")
server_params = self.client.comm_params.copy()
server_params.source_address = (f"{NULLMODEM_HOST}:5004", 5004)
self.stub = TransportStub(server_params, True, handle_client_data)
server_params.source_address = (host, test_port)
self.stub = TransportStub(server_params, True, simulate_server)
test_port += 1


async def run(self):
Expand All @@ -135,6 +145,7 @@ class ServerTester: # pylint: disable=too-few-public-methods

def __init__(self, comm: CommType):
"""Initialize runtime tester."""
global test_port # pylint: disable=global-statement
self.comm = comm
self.store = ModbusSlaveContext(
di=ModbusSequentialDataBlock(0, [17] * 100),
Expand All @@ -151,22 +162,23 @@ def __init__(self, comm: CommType):
self.context,
framer=Framer.SOCKET,
identity=self.identity,
address=(NULLMODEM_HOST, 5004),
address=(NULLMODEM_HOST, test_port),
)
elif comm == CommType.SERIAL:
self.server = modbusServer.ModbusSerialServer(
self.context,
framer=Framer.SOCKET,
identity=self.identity,
port=f"{NULLMODEM_HOST}:5004",
port=f"{NULLMODEM_HOST}:{test_port}",
)
else:
raise RuntimeError("ERROR: CommType not implemented")
client_params = self.server.comm_params.copy()
client_params.host = client_params.source_address[0]
client_params.port = client_params.source_address[1]
client_params.timeout_connect = 1.0
self.stub = TransportStub(client_params, False, handle_server_data)
self.stub = TransportStub(client_params, False, simulate_client)
test_port += 1


async def run(self):
Expand All @@ -175,7 +187,7 @@ async def run(self):
Log.debug("--> Start testing.")
await self.server.listen()
await self.stub.start_run()
await server_calls(self.stub)
await server_calls(self.stub, (self.comm == CommType.TCP))
Log.debug("--> Shutting down.")
await self.server.shutdown()

Expand All @@ -194,21 +206,43 @@ async def main(comm: CommType, use_server: bool):
async def client_calls(client):
"""Test client API."""
Log.debug("--> Client calls starting.")
_resp = await client.read_holding_registers(address=124, count=4, slave=0)

try:
resp = await client.read_holding_registers(address=124, count=4, slave=0)
except ModbusException as exc:
txt = f"ERROR: exception in pymodbus {exc}"
Log.error(txt)
return
if resp.isError():
txt = "ERROR: pymodbus returned an error!"
Log.error(txt)
await asyncio.sleep(1)
client.close()
print("---> CLIENT all done")

async def server_calls(transport: ModbusProtocol):
"""Test client API."""
async def server_calls(transport: ModbusProtocol, is_tcp: bool):
"""Test server functionality."""
Log.debug("--> Server calls starting.")
_resp = transport.send(b'\x00\x02\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01' +
b'\x07\x00\x03\x00\x00\x06\x01\x03\x00\x00\x00\x01')

if is_tcp:
request = b'\x00\x02\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01'
else:
# 2 responses:
# response = b'\x00\x02\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01' +
# b'\x07\x00\x03\x00\x00\x06\x01\x03\x00\x00\x00\x01')
# 1 response:
request = b'\x00\x02\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01'
transport.send(request)
await asyncio.sleep(1)
print("---> all done")
transport.close()
print("---> SERVER all done")

def handle_client_data(transport: ModbusProtocol, data: bytes):
def simulate_server(transport: ModbusProtocol, is_tcp: bool, request: bytes):
"""Respond to request at transport level."""
Log.debug("--> stub called with request {}.", data, ":hex")
response = b'\x01\x03\x08\x00\x05\x00\x05\x00\x00\x00\x00\x0c\xd7'
Log.debug("--> Server simulator called with request {}.", request, ":hex")
if is_tcp:
response = b'\x00\x01\x00\x00\x00\x06\x00\x03\x00\x7c\x00\x04'
else:
response = b'\x01\x03\x08\x00\x05\x00\x05\x00\x00\x00\x00\x0c\xd7'

# Multiple send is allowed, to test fragmentation
# for data in response:
Expand All @@ -217,12 +251,14 @@ def handle_client_data(transport: ModbusProtocol, data: bytes):
transport.send(response)


def handle_server_data(_transport: ModbusProtocol, data: bytes):
def simulate_client(_transport: ModbusProtocol, _is_tcp: bool, response: bytes):
"""Respond to request at transport level."""
Log.debug("--> stub called with response {}.", data, ":hex")
Log.debug("--> Client simulator called with response {}.", response, ":hex")


if __name__ == "__main__":
# True for Server test, False for Client test
# asyncio.run(main(CommType.SERIAL, False), debug=True)
asyncio.run(main(CommType.SERIAL, False), debug=True)
asyncio.run(main(CommType.SERIAL, True), debug=True)
asyncio.run(main(CommType.TCP, False), debug=True)
asyncio.run(main(CommType.TCP, True), debug=True)

0 comments on commit 11dc429

Please sign in to comment.