Skip to content

Commit

Permalink
misc
Browse files Browse the repository at this point in the history
  • Loading branch information
altendky committed Jan 18, 2021
1 parent 1a5ee53 commit 83bf250
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 61 deletions.
13 changes: 4 additions & 9 deletions pymodbus/client/asynchronous/trio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ class BaseModbusAsyncClientProtocol(AsyncModbusClientMixin):
#: Factory that created this instance.
factory = None
transport = None
data = b''

async def execute(self, request=None):
request.transaction_id = self.transaction.getNextTID()
packet = self.framer.buildPacket(request)
_logger.debug("send: " + hexlify_packets(packet))
# TODO: should we retry on trio.BusyResourceError?
await self.transport.send_all(packet)
response = await self._build_response(request.transaction_id)
with trio.fail_after(seconds=1):
response = await self._build_response(request.transaction_id)
return response

def connection_made(self, transport):
Expand All @@ -64,15 +64,10 @@ def _data_received(self, data):
'''
_logger.debug("recv: " + hexlify_packets(data))

# TODO: trying to help out the framer here by buffering up a bit but it
# is insufficient and still fails down below.
self.data += data
decoded = self.framer.decode_data(self.data)
if decoded == {}:
return
decoded = self.framer.decode_data(data)

unit = decoded.get("unit", 0)
self.framer.processIncomingPacket(self.data, self._handle_response, unit=unit)
self.framer.processIncomingPacket(data, self._handle_response, unit=unit)
self.data = b''

def _handle_response(self, reply, **kwargs):
Expand Down
66 changes: 37 additions & 29 deletions pymodbus/server/trio.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,54 +5,56 @@

import trio

from pymodbus.exceptions import NotImplementedException, NoSuchSlaveException
from pymodbus.exceptions import NoSuchSlaveException
from pymodbus.factory import ServerDecoder
from pymodbus.framer.socket_framer import ModbusSocketFramer
from pymodbus.pdu import ModbusExceptions as merror

_logger = logging.getLogger(__name__)


def execute(request, addr, context, response_send):
def execute(request, addr, context, response_send, ignore_missing_slaves, broadcast_enable):
broadcast = False
try:
if False: # self.server.broadcast_enable and request.unit_id == 0:
if broadcast_enable and request.unit_id == 0:
broadcast = True
# if broadcasting then execute on all slave contexts, note response will be ignored
for unit_id in self.server.context.slaves():
response = request.execute(self.server.context[unit_id])
for unit_id in context.slaves():
response = request.execute(context[unit_id])
else:
context = context[request.unit_id]
response = request.execute(context)
# TODO: can't be covered until non-single server contexts are supported
# except NoSuchSlaveException as ex:
# _logger.debug("requested slave does not exist: %s" % request.unit_id)
# if False: # self.server.ignore_missing_slaves:
# return # the client will simply timeout waiting for a response
# response = request.doException(merror.GatewayNoResponse)
except Exception as ex:
except NoSuchSlaveException as ex:
_logger.debug("requested slave does not exist: %s" % request.unit_id)
if ignore_missing_slaves:
return # the client will simply timeout waiting for a response
response = request.doException(merror.GatewayNoResponse)
except Exception as ex: # pragma: no cover
_logger.debug(
"Datastore unable to fulfill request: %s; %s", ex, traceback.format_exc()
)
response = request.doException(merror.SlaveFailure)

# no response when broadcasting
if not broadcast:
response.transaction_id = request.transaction_id
response.unit_id = request.unit_id
response_send.send_nowait((response, addr))


async def incoming(server_stream, framer, context, response_send):
async def incoming(server_stream, framer, context, response_send, ignore_missing_slaves, broadcast_enable):
async with response_send:
units = context.slaves()
if not isinstance(units, (list, tuple)):
if not isinstance(units, (list, tuple)): # pragma: no cover
units = [units]

async for data in server_stream:
if isinstance(data, tuple):
data, *addr = data # addr is populated when talking over UDP
else:
addr = (None,) # empty tuple
# TODO: implement UDP support
# if isinstance(data, tuple):
# data, *addr = data # addr is populated when talking over UDP
# else:
# addr = (None,) # empty tuple
addr = (None,) # empty tuple

framer.processIncomingPacket(
data=data,
Expand All @@ -61,19 +63,20 @@ async def incoming(server_stream, framer, context, response_send):
addr=addr,
context=context,
response_send=response_send,
ignore_missing_slaves=ignore_missing_slaves,
broadcast_enable=broadcast_enable,
),
unit=units,
single=context.single,
)


async def tcp_server(server_stream, context, identity):
async def tcp_server(server_stream, context, identity, ignore_missing_slaves=False, broadcast_enable=False):
if broadcast_enable:
units = context.slaves()
if 0 not in context:
units.append(0)

# if server.broadcast_enable: # pragma: no cover
# if 0 not in units:
# units.append(0)
if not context.single:
raise NotImplementedException("non-single context not yet supported")
response_send, response_receive = trio.open_memory_channel(max_buffer_size=0)
framer = ModbusSocketFramer(decoder=ServerDecoder(), client=None)

Expand All @@ -85,17 +88,22 @@ async def tcp_server(server_stream, context, identity):
framer=framer,
context=context,
response_send=response_send,
ignore_missing_slaves=ignore_missing_slaves,
broadcast_enable=broadcast_enable,
)
)

async for message, addr in response_receive:
if message.should_respond:
# self.server.control.Counter.BusMessage += 1
pdu = framer.buildPacket(message)
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug("send: [%s]- %s" % (message, b2a_hex(pdu)))
# avoids the b2a_hex() conversion
_logger.debug('send: [%s]- %s' % (message, b2a_hex(pdu)))
if addr == (None,):
await server_stream.send_all(pdu)
else:
1 / 0
self._send_(pdu, *addr)
else: # pragma: no cover
# TODO: implement UDP support
# self._send_(pdu, *addr)
pass
else: # pragma: no cover
pass
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
'tornado == 4.5.3'
],
'trio': [
'trio ~= 0.17.0',
'async_generator ~= 1.10',
],
'repl': [
Expand Down
100 changes: 77 additions & 23 deletions test/test_server_trio.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import functools
import logging

import pytest
import trio

from pymodbus.exceptions import NotImplementedException
from pymodbus.client.asynchronous.schedulers import TRIO
from pymodbus.client.asynchronous.tcp import AsyncModbusTCPClient
from pymodbus.datastore.context import ModbusServerContext, ModbusSlaveContext
from pymodbus.device import ModbusDeviceIdentification
from pymodbus.pdu import ExceptionResponse
from pymodbus.pdu import ExceptionResponse, ModbusExceptions
from pymodbus.register_read_message import ReadHoldingRegistersResponse
from pymodbus.server.trio import tcp_server
from pymodbus.register_write_message import WriteMultipleRegistersResponse
Expand All @@ -22,15 +22,32 @@ def __init__(self, context, host, port):


@pytest.fixture(name="trio_server_context")
async def trio_server_context_fixture():
slave_context = ModbusSlaveContext()
server_context = ModbusServerContext(slaves=slave_context)
async def trio_server_context_fixture(request):
node = request.node
single = node.get_closest_marker("single")
no_contexts = node.get_closest_marker("no_contexts")

if no_contexts:
slaves = {}
else:
slave_context = ModbusSlaveContext()

if single:
slaves = slave_context
else:
slaves = {0: slave_context}

server_context = ModbusServerContext(slaves=slaves, single=single)

return server_context


@pytest.fixture(name="trio_tcp_server")
async def trio_tcp_server_fixture(nursery, trio_server_context):
async def trio_tcp_server_fixture(request, nursery, trio_server_context):
node = request.node
broadcast_enable = node.get_closest_marker("broadcast_enable")
ignore_missing_slaves = node.get_closest_marker("ignore_missing_slaves")

host = "127.0.0.1"

identity = ModbusDeviceIdentification()
Expand All @@ -42,14 +59,18 @@ async def trio_tcp_server_fixture(nursery, trio_server_context):
tcp_server,
context=trio_server_context,
identity=identity,
ignore_missing_slaves=ignore_missing_slaves,
broadcast_enable=broadcast_enable,
),
host=host,
port=0,
),
)

yield RunningTrioServer(
context=trio_server_context, host=host, port=listener.socket.getsockname()[1]
context=trio_server_context,
host=host,
port=listener.socket.getsockname()[1],
)


Expand Down Expand Up @@ -105,31 +126,64 @@ async def test_write_holding_registers(trio_tcp_client, trio_tcp_server):
assert server_values == [0, value, 0]


@pytest.mark.trio
async def test_tcp_server_raises_for_non_single_context(trio_server_context):
# TODO: Remove once non-single support is implemented
trio_server_context.single = False
with pytest.raises(NotImplementedException):
await tcp_server(server_stream=None, context=trio_server_context, identity=None)


@pytest.mark.trio
async def test_large_count_excepts(trio_tcp_client):
response = await trio_tcp_client.read_holding_registers(
address=0,
count=300,
)
assert isinstance(response, ExceptionResponse)
assert response.exception_code == ModbusExceptions.IllegalValue


@pytest.mark.trio
async def test_red(trio_tcp_server):
modbus_client = AsyncModbusTCPClient(
scheduler=TRIO,
host=trio_tcp_server.host,
port=trio_tcp_server.port,
async def test_invalid_client_excepts_gateway_no_response(trio_tcp_client):
response = await trio_tcp_client.read_holding_registers(
address=0,
count=1,
unit=57,
)
assert isinstance(response, ExceptionResponse)
assert response.exception_code == ModbusExceptions.GatewayNoResponse


@pytest.mark.ignore_missing_slaves
@pytest.mark.trio
async def test_invalid_unit_times_out_when_ignoring_missing_slaves(trio_tcp_client):
with pytest.raises(trio.TooSlowError):
await trio_tcp_client.read_holding_registers(
address=0,
count=1,
unit=57,
)


@pytest.mark.broadcast_enable
@pytest.mark.trio
async def test_times_out_when_broadcast_enabled(trio_tcp_client):
with pytest.raises(trio.TooSlowError):
await trio_tcp_client.read_holding_registers(
address=0,
count=1,
unit=0,
)


@pytest.mark.broadcast_enable
@pytest.mark.no_contexts
@pytest.mark.trio
async def test_times_out_when_broadcast_enabled_and_no_contexts(trio_tcp_client):
with pytest.raises(trio.TooSlowError):
await trio_tcp_client.read_holding_registers(
address=0,
count=1,
unit=0,
)


@pytest.mark.trio
async def test_logs_server_response_send(trio_tcp_client, caplog):
with caplog.at_level(logging.DEBUG):
await trio_tcp_client.read_holding_registers(address=0, count=1)

async with modbus_client.manage_connection():
async with modbus_client.manage_connection():
pass
assert "send: [ReadRegisterResponse (1)]- b'0001000000050003020000'" in caplog.text

0 comments on commit 83bf250

Please sign in to comment.