diff --git a/examples/common/asyncio_server.py b/examples/common/asyncio_server.py new file mode 100755 index 000000000..e55fce6b5 --- /dev/null +++ b/examples/common/asyncio_server.py @@ -0,0 +1,158 @@ +#!/usr/bin/env python +""" +Pymodbus Asyncio Server Example +-------------------------------------------------------------------------- + +The asyncio server is implemented in pure python without any third +party libraries (unless you need to use the serial protocols which require +asyncio-pyserial). This is helpful in constrained or old environments where using +twisted is just not feasible. What follows is an example of its use: +""" +# --------------------------------------------------------------------------- # +# import the various server implementations +# --------------------------------------------------------------------------- # +from pymodbus.server.asyncio import StartTcpServer +from pymodbus.server.asyncio import StartUdpServer +from pymodbus.server.asyncio import StartSerialServer + +from pymodbus.device import ModbusDeviceIdentification +from pymodbus.datastore import ModbusSequentialDataBlock, ModbusSparseDataBlock +from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext + +from pymodbus.transaction import ModbusRtuFramer, ModbusBinaryFramer +# --------------------------------------------------------------------------- # +# configure the service logging +# --------------------------------------------------------------------------- # +import logging +FORMAT = ('%(asctime)-15s %(threadName)-15s' + ' %(levelname)-8s %(module)-15s:%(lineno)-8s %(message)s') +logging.basicConfig(format=FORMAT) +log = logging.getLogger() +log.setLevel(logging.DEBUG) + + +async def run_server(): + # ----------------------------------------------------------------------- # + # initialize your data store + # ----------------------------------------------------------------------- # + # The datastores only respond to the addresses that they are initialized to + # Therefore, if you initialize a DataBlock to addresses of 0x00 to 0xFF, a + # request to 0x100 will respond with an invalid address exception. This is + # because many devices exhibit this kind of behavior (but not all):: + # + # block = ModbusSequentialDataBlock(0x00, [0]*0xff) + # + # Continuing, you can choose to use a sequential or a sparse DataBlock in + # your data context. The difference is that the sequential has no gaps in + # the data while the sparse can. Once again, there are devices that exhibit + # both forms of behavior:: + # + # block = ModbusSparseDataBlock({0x00: 0, 0x05: 1}) + # block = ModbusSequentialDataBlock(0x00, [0]*5) + # + # Alternately, you can use the factory methods to initialize the DataBlocks + # or simply do not pass them to have them initialized to 0x00 on the full + # address range:: + # + # store = ModbusSlaveContext(di = ModbusSequentialDataBlock.create()) + # store = ModbusSlaveContext() + # + # Finally, you are allowed to use the same DataBlock reference for every + # table or you may use a separate DataBlock for each table. + # This depends if you would like functions to be able to access and modify + # the same data or not:: + # + # block = ModbusSequentialDataBlock(0x00, [0]*0xff) + # store = ModbusSlaveContext(di=block, co=block, hr=block, ir=block) + # + # The server then makes use of a server context that allows the server to + # respond with different slave contexts for different unit ids. By default + # it will return the same context for every unit id supplied (broadcast + # mode). + # However, this can be overloaded by setting the single flag to False and + # then supplying a dictionary of unit id to context mapping:: + # + # slaves = { + # 0x01: ModbusSlaveContext(...), + # 0x02: ModbusSlaveContext(...), + # 0x03: ModbusSlaveContext(...), + # } + # context = ModbusServerContext(slaves=slaves, single=False) + # + # The slave context can also be initialized in zero_mode which means that a + # request to address(0-7) will map to the address (0-7). The default is + # False which is based on section 4.4 of the specification, so address(0-7) + # will map to (1-8):: + # + # store = ModbusSlaveContext(..., zero_mode=True) + # ----------------------------------------------------------------------- # + store = ModbusSlaveContext( + di=ModbusSequentialDataBlock(0, [17]*100), + co=ModbusSequentialDataBlock(0, [17]*100), + hr=ModbusSequentialDataBlock(0, [17]*100), + ir=ModbusSequentialDataBlock(0, [17]*100)) + + context = ModbusServerContext(slaves=store, single=True) + + # ----------------------------------------------------------------------- # + # initialize the server information + # ----------------------------------------------------------------------- # + # If you don't set this or any fields, they are defaulted to empty strings. + # ----------------------------------------------------------------------- # + identity = ModbusDeviceIdentification() + identity.VendorName = 'Pymodbus' + identity.ProductCode = 'PM' + identity.VendorUrl = 'http://github.com/riptideio/pymodbus/' + identity.ProductName = 'Pymodbus Server' + identity.ModelName = 'Pymodbus Server' + identity.MajorMinorRevision = '2.2.0' + + # ----------------------------------------------------------------------- # + # run the server you want + # ----------------------------------------------------------------------- # + # Tcp: + # immediately start serving: + server = await StartTcpServer(context, identity=identity, address=("0.0.0.0", 5020), + allow_reuse_address=True) + + # deferred start: + # server = await StartTcpServer(context, identity=identity, address=("0.0.0.0", 5020), + # allow_reuse_address=True, defer_start=True) + + # asyncio.get_event_loop().call_later(20, lambda : server.) + # await server.serve_forever() + + + + + # TCP with different framer + # StartTcpServer(context, identity=identity, + # framer=ModbusRtuFramer, address=("0.0.0.0", 5020)) + + # Udp: + # server = await StartUdpServer(context, identity=identity, address=("0.0.0.0", 5020), + # allow_reuse_address=True, defer_start=True) + # + # await server.serve_forever() + + + # !!! SERIAL SERVER NOT IMPLEMENTED !!! + # Ascii: + # StartSerialServer(context, identity=identity, + # port='/dev/ttyp0', timeout=1) + + # RTU: + # StartSerialServer(context, framer=ModbusRtuFramer, identity=identity, + # port='/dev/ttyp0', timeout=.005, baudrate=9600) + + # Binary + # StartSerialServer(context, + # identity=identity, + # framer=ModbusBinaryFramer, + # port='/dev/ttyp0', + # timeout=1) + + +if __name__ == "__main__": + asyncio.run(run_server()) + diff --git a/pymodbus/client/sync.py b/pymodbus/client/sync.py index 04d7778e3..d413279e9 100644 --- a/pymodbus/client/sync.py +++ b/pymodbus/client/sync.py @@ -1,6 +1,5 @@ import socket import select -import serial import time import sys from functools import partial @@ -425,6 +424,7 @@ def __init__(self, method='ascii', **kwargs): :param strict: Use Inter char timeout for baudrates <= 19200 (adhere to modbus standards) """ + import serial self.method = method self.socket = None BaseModbusClient.__init__(self, self.__implementation(method, self), diff --git a/pymodbus/server/asyncio.py b/pymodbus/server/asyncio.py new file mode 100755 index 000000000..0652c3372 --- /dev/null +++ b/pymodbus/server/asyncio.py @@ -0,0 +1,629 @@ +""" +Implementation of a Threaded Modbus Server +------------------------------------------ + +""" +from binascii import b2a_hex +import socket +import traceback + +import asyncio +from pymodbus.constants import Defaults +from pymodbus.utilities import hexlify_packets +from pymodbus.factory import ServerDecoder +from pymodbus.datastore import ModbusServerContext +from pymodbus.device import ModbusControlBlock +from pymodbus.device import ModbusDeviceIdentification +from pymodbus.transaction import * +from pymodbus.exceptions import NotImplementedException, NoSuchSlaveException +from pymodbus.pdu import ModbusExceptions as merror +from pymodbus.compat import socketserver, byte2int + +# --------------------------------------------------------------------------- # +# Logging +# --------------------------------------------------------------------------- # +import logging +_logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- # +# Protocol Handlers +# --------------------------------------------------------------------------- # + +class ModbusBaseRequestHandler(asyncio.BaseProtocol): + """ Implements modbus slave wire protocol + This uses the asyncio.Protocol to implement the client handler. + + When a connection is established, the asyncio.Protocol.connection_made + callback is called. This callback will setup the connection and + create and schedule an asyncio.Task and assign it to running_task. + + running_task will be canceled upon connection_lost event. + """ + def __init__(self, owner): + self.server = owner + self.running = False + self.receive_queue = asyncio.Queue() + self.handler_task = None # coroutine to be run on asyncio loop + + def connection_made(self, transport): + """ + asyncio.BaseProtocol callback for socket establish + + For streamed protocols (TCP) this will also correspond to an + entire conversation; however for datagram protocols (UDP) this + corresponds to the socket being opened + """ + try: + _logger.debug("Socket [%s:%s] opened" % transport.get_extra_info('sockname')) + self.transport = transport + self.running = True + self.framer = self.server.framer(self.server.decoder, client=None) + + # schedule the connection handler on the event loop + self.handler_task = asyncio.create_task(self.handle()) + except Exception as ex: # pragma: no cover + _logger.debug("Datastore unable to fulfill request: " + "%s; %s", ex, traceback.format_exc()) + + def connection_lost(self, exc): + """ + asyncio.BaseProtocol callback for socket tear down + + For streamed protocols any break in the network connection will + be reported here; for datagram protocols, only a teardown of the + socket itself will result in this call. + """ + try: + self.handler_task.cancel() + + if exc is None: + if hasattr(self, "client_address"): # TCP connection + _logger.debug("Disconnected from client [%s:%s]" % self.client_address) + else: + _logger.debug("Disconnected from client [%s]" % self.transport.get_extra_info("peername")) + else: # pragma: no cover + __logger.debug("Client Disconnection [%s:%s] due to %s" % (*self.client_address, exc)) + + self.running = False + + except Exception as ex: # pragma: no cover + _logger.debug("Datastore unable to fulfill request: " + "%s; %s", ex, traceback.format_exc()) + + async def handle(self): + """Asyncio coroutine which represents a single conversation between + the modbus slave and master + + Once the client connection is established, the data chunks will be + fed to this coroutine via the asyncio.Queue object which is fed by + the ModbusBaseRequestHandler class's callback Future. + + This callback future gets data from either asyncio.DatagramProtocol.datagram_received + or from asyncio.BaseProtocol.data_received. + + This function will execute without blocking in the while-loop and + yield to the asyncio event loop when the frame is exhausted. + As a result, multiple clients can be interleaved without any + interference between them. + + For ModbusConnectedRequestHandler, each connection will be given an + instance of the handle() coroutine and this instance will be put in the + active_connections dict. Calling server_close will individually cancel + each running handle() task. + + For ModbusDisconnectedRequestHandler, a single handle() coroutine will + be started and maintained. Calling server_close will cancel that task. + + """ + reset_frame = False + while self.running: + try: + units = self.server.context.slaves() + data = await self._recv_() # this is an asyncio.Queue await, it will never fail + if isinstance(data, tuple): + data, *addr = data # addr is populated when talking over UDP + else: + addr = (None,) # empty tuple + + if not isinstance(units, (list, tuple)): + units = [units] + # if broadcast is enabled make sure to + # process requests to address 0 + if self.server.broadcast_enable: # pragma: no cover + if 0 not in units: + units.append(0) + + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug('Handling data: ' + hexlify_packets(data)) + + single = self.server.context.single + self.framer.processIncomingPacket(data=data, + callback=lambda x: self.execute(x, *addr), + unit=units, + single=single) + + except asyncio.CancelledError: + # catch and ignore cancelation errors + if isinstance(self, ModbusConnectedRequestHandler): + _logger.debug("Handler for stream [%s:%s] has been canceled" % self.client_address) + else: + _logger.debug("Handler for UDP socket [%s] has been canceled" % self.protocol._sock.getsockname()[1]) + + except Exception as e: + # force TCP socket termination as processIncomingPacket should handle applicaiton layer errors + # for UDP sockets, simply reset the frame + if isinstance(self, ModbusConnectedRequestHandler): + _logger.info("Unknown exception '%s' on stream [%s:%s] forcing disconnect" % (e, *self.client_address)) + self.transport.close() + else: + _logger.error("Unknown error occurred %s" % e) + reset_frame = True # graceful recovery + finally: + if reset_frame: + self.framer.resetFrame() + reset_frame = False + + def execute(self, request, *addr): + """ The callback to call with the resulting message + + :param request: The decoded request message + """ + broadcast = False + try: + if self.server.broadcast_enable and request.unit_id == 0: + broadcast = True + # if broadcasting then execute on all slave contexts, note response will be ignored + for unit_id in self.server.context.slaves(): + response = request.execute(self.server.context[unit_id]) + else: + context = self.server.context[request.unit_id] + response = request.execute(context) + except NoSuchSlaveException as ex: + _logger.debug("requested slave does " + "not exist: %s" % request.unit_id ) + if self.server.ignore_missing_slaves: + return # the client will simply timeout waiting for a response + response = request.doException(merror.GatewayNoResponse) + except Exception as ex: + _logger.debug("Datastore unable to fulfill request: " + "%s; %s", ex, traceback.format_exc()) + response = request.doException(merror.SlaveFailure) + # no response when broadcasting + if not broadcast: + response.transaction_id = request.transaction_id + response.unit_id = request.unit_id + self.send(response, *addr) + + + def send(self, message, *addr): + if message.should_respond: + # self.server.control.Counter.BusMessage += 1 + pdu = self.framer.buildPacket(message) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug('send: [%s]- %s' % (message, b2a_hex(pdu))) + if addr == (None,): + self._send_(pdu) + else: + self._send_(pdu, *addr) + + # ----------------------------------------------------------------------- # + # Derived class implementations + # ----------------------------------------------------------------------- # + + def _send_(self, data): # pragma: no cover + """ Send a request (string) to the network + + :param message: The unencoded modbus response + """ + raise NotImplementedException("Method not implemented " + "by derived class") + async def _recv_(self): # pragma: no cover + """ Receive data from the network + + :return: + """ + raise NotImplementedException("Method not implemented " + "by derived class") + + +class ModbusConnectedRequestHandler(ModbusBaseRequestHandler,asyncio.Protocol): + """ Implements the modbus server protocol + + This uses asyncio.Protocol to implement + the client handler for a connected protocol (TCP). + """ + + def connection_made(self, transport): + """ asyncio.BaseProtocol: Called when a connection is made. """ + super().connection_made(transport) + + self.client_address = transport.get_extra_info('peername') + self.server.active_connections[self.client_address] = self + _logger.debug("TCP client connection established [%s:%s]" % self.client_address) + + def connection_lost(self, exc): + """ asyncio.BaseProtocol: Called when the connection is lost or closed.""" + super().connection_lost(exc) + _logger.debug("TCP client disconnected [%s:%s]" % self.client_address) + if self.client_address in self.server.active_connections: + self.server.active_connections.pop(self.client_address) + + + def data_received(self,data): + """ + asyncio.Protocol: (TCP) Called when some data is received. + data is a non-empty bytes object containing the incoming data. + """ + self.receive_queue.put_nowait(data) + + async def _recv_(self): + return await self.receive_queue.get() + + def _send_(self, data): + """ tcp send """ + self.transport.write(data) + + +class ModbusDisconnectedRequestHandler(ModbusBaseRequestHandler, asyncio.DatagramProtocol): + """ Implements the modbus server protocol + + This uses the socketserver.BaseRequestHandler to implement + the client handler for a disconnected protocol (UDP). The + only difference is that we have to specify who to send the + resulting packet data to. + """ + def __init__(self,owner): + super().__init__(owner) + self.server.on_connection_terminated = asyncio.get_event_loop().create_future() + + def connection_lost(self,exc): + super().connection_lost(exc) + self.server.on_connection_terminated.set_result(True) + + def datagram_received(self,data, addr): + """ + asyncio.DatagramProtocol: Called when a datagram is received. + data is a bytes object containing the incoming data. addr + is the address of the peer sending the data; the exact + format depends on the transport. + """ + self.receive_queue.put_nowait((data, addr)) + + def error_received(self,exc): # pragma: no cover + """ + asyncio.DatagramProtocol: Called when a previous send + or receive operation raises an OSError. exc is the + OSError instance. + + This method is called in rare conditions, + when the transport (e.g. UDP) detects that a datagram could + not be delivered to its recipient. In many conditions + though, undeliverable datagrams will be silently dropped. + """ + _logger.error("datagram connection error [%s]" % exc) + + async def _recv_(self): + return await self.receive_queue.get() + + def _send_(self, data, addr): + self.transport.sendto(data, addr=addr) + +class ModbusServerFactory: + """ + Builder class for a modbus server + + This also holds the server datastore so that it is persisted between connections + """ + + def __init__(self, store, framer=None, identity=None, **kwargs): + import warnings + warnings.warn("deprecated API for asyncio. ServerFactory's are a twisted construct and don't have an equivalent in asyncio", + DeprecationWarning) + + +# --------------------------------------------------------------------------- # +# Server Implementations +# --------------------------------------------------------------------------- # +class ModbusTcpServer: + """ + A modbus threaded tcp socket server + + We inherit and overload the socket server so that we + can control the client threads as well as have a single + server context instance. + """ + + def __init__(self, + context, + framer=None, + identity=None, + address=None, + handler=None, + allow_reuse_address=False, + allow_reuse_port=False, + defer_start=False, + backlog=20, + loop=None, + **kwargs): + """ Overloaded initializer for the socket server + + If the identify structure is not passed in, the ModbusControlBlock + uses its own empty structure. + + :param context: The ModbusServerContext datastore + :param framer: The framer strategy to use + :param identity: An optional identify structure + :param address: An optional (interface, port) to bind to. + :param handler: A handler for each client session; default is + ModbusConnectedRequestHandler. The handler class + receives connection create/teardown events + :param allow_reuse_address: Whether the server will allow the + reuse of an address. + :param allow_reuse_port: Whether the server will allow the + reuse of a port. + :param backlog: is the maximum number of queued connections + passed to listen(). Defaults to 20, increase if many + connections are being made and broken to your Modbus slave + :param loop: optional asyncio event loop to run in. Will default to + asyncio.get_event_loop() supplied value if None. + :param ignore_missing_slaves: True to not send errors on a request + to a missing slave + :param broadcast_enable: True to treat unit_id 0 as broadcast address, + False to treat 0 as any other unit_id + """ + self.active_connections = {} + self.loop = loop or asyncio.get_event_loop() + self.allow_reuse_address = allow_reuse_address + self.decoder = ServerDecoder() + self.framer = framer or ModbusSocketFramer + self.context = context or ModbusServerContext() + self.control = ModbusControlBlock() + self.address = address or ("", Defaults.Port) + self.handler = handler or ModbusConnectedRequestHandler + self.handler.server = self + self.ignore_missing_slaves = kwargs.get('ignore_missing_slaves', + Defaults.IgnoreMissingSlaves) + self.broadcast_enable = kwargs.get('broadcast_enable', + Defaults.broadcast_enable) + + if isinstance(identity, ModbusDeviceIdentification): + self.control.Identity.update(identity) + + self.serving = self.loop.create_future() # asyncio future that will be done once server has started + self.server = None # constructors cannot be declared async, so we have to defer the initialization of the server + self.server_factory = self.loop.create_server(lambda : self.handler(self), + *self.address, + reuse_address=allow_reuse_address, + reuse_port=allow_reuse_port, + backlog=backlog, + start_serving=not defer_start) + + async def serve_forever(self): + if self.server is None: + self.server = await self.server_factory + self.serving.set_result(True) + await self.server.serve_forever() + else: + raise RuntimeError("Can't call serve_forever on an already running server object") + + def server_close(self): + for k,v in self.active_connections.items(): + _logger.warning(f"aborting active session {k}") + v.handler_task.cancel() + self.active_connections = {} + self.server.close() + + +class ModbusUdpServer: + """ + A modbus threaded udp socket server + + We inherit and overload the socket server so that we + can control the client threads as well as have a single + server context instance. + """ + + def __init__(self, context, framer=None, identity=None, address=None, + handler=None, allow_reuse_address=False, + allow_reuse_port=False, + defer_start=False, + backlog=20, + loop=None, + **kwargs): + """ Overloaded initializer for the socket server + + If the identify structure is not passed in, the ModbusControlBlock + uses its own empty structure. + + :param context: The ModbusServerContext datastore + :param framer: The framer strategy to use + :param identity: An optional identify structure + :param address: An optional (interface, port) to bind to. + :param handler: A handler for each client session; default is + ModbusDisonnectedRequestHandler + :param ignore_missing_slaves: True to not send errors on a request + to a missing slave + :param broadcast_enable: True to treat unit_id 0 as broadcast address, + False to treat 0 as any other unit_id + """ + self.loop = loop or asyncio.get_event_loop() + self.decoder = ServerDecoder() + self.framer = framer or ModbusSocketFramer + self.context = context or ModbusServerContext() + self.control = ModbusControlBlock() + self.address = address or ("", Defaults.Port) + self.handler = handler or ModbusDisconnectedRequestHandler + self.ignore_missing_slaves = kwargs.get('ignore_missing_slaves', + Defaults.IgnoreMissingSlaves) + self.broadcast_enable = kwargs.get('broadcast_enable', + Defaults.broadcast_enable) + + if isinstance(identity, ModbusDeviceIdentification): + self.control.Identity.update(identity) + + self.protocol = None + self.endpoint = None + self.on_connection_terminated = None + self.stop_serving = self.loop.create_future() + self.serving = self.loop.create_future() # asyncio future that will be done once server has started + self.server_factory = self.loop.create_datagram_endpoint(lambda: self.handler(self), + local_addr=self.address, + reuse_address=allow_reuse_address, + reuse_port=allow_reuse_port, + allow_broadcast=True) + + async def serve_forever(self): + if self.protocol is None: + self.protocol, self.endpoint = await self.server_factory + self.serving.set_result(True) + await self.stop_serving + else: + raise RuntimeError("Can't call serve_forever on an already running server object") + + def server_close(self): + self.stop_serving.set_result(True) + if self.endpoint.handler_task is not None: + self.endpoint.handler_task.cancel() + + self.protocol.close() + + + +class ModbusSerialServer(object): + """ + A modbus threaded serial socket server + + We inherit and overload the socket server so that we + can control the client threads as well as have a single + server context instance. + """ + + handler = None + + def __init__(self, context, framer=None, identity=None, **kwargs): # pragma: no cover + """ Overloaded initializer for the socket server + + If the identify structure is not passed in, the ModbusControlBlock + uses its own empty structure. + + :param context: The ModbusServerContext datastore + :param framer: The framer strategy to use + :param identity: An optional identify structure + :param port: The serial port to attach to + :param stopbits: The number of stop bits to use + :param bytesize: The bytesize of the serial messages + :param parity: Which kind of parity to use + :param baudrate: The baud rate to use for the serial device + :param timeout: The timeout to use for the serial device + :param ignore_missing_slaves: True to not send errors on a request + to a missing slave + :param broadcast_enable: True to treat unit_id 0 as broadcast address, + False to treat 0 as any other unit_id + """ + raise NotImplementedException + +# --------------------------------------------------------------------------- # +# Creation Factories +# --------------------------------------------------------------------------- # +async def StartTcpServer(context=None, identity=None, address=None, + custom_functions=[], defer_start=True, **kwargs): + """ A factory to start and run a tcp modbus server + + :param context: The ModbusServerContext datastore + :param identity: An optional identify structure + :param address: An optional (interface, port) to bind to. + :param custom_functions: An optional list of custom function classes + supported by server instance. + :param defer_start: if set, a coroutine which can be started and stopped + will be returned. Otherwise, the server will be immediately spun + up without the ability to shut it off from within the asyncio loop + :param ignore_missing_slaves: True to not send errors on a request to a + missing slave + :return: an initialized but inactive server object coroutine + """ + framer = kwargs.pop("framer", ModbusSocketFramer) + server = ModbusTcpServer(context, framer, identity, address, **kwargs) + + for f in custom_functions: + server.decoder.register(f) # pragma: no cover + + if not defer_start: + await server.serve_forever() + + return server + + + + +async def StartUdpServer(context=None, identity=None, address=None, + custom_functions=[], defer_start=True, **kwargs): + """ A factory to start and run a udp modbus server + + :param context: The ModbusServerContext datastore + :param identity: An optional identify structure + :param address: An optional (interface, port) to bind to. + :param custom_functions: An optional list of custom function classes + supported by server instance. + :param framer: The framer to operate with (default ModbusSocketFramer) + :param ignore_missing_slaves: True to not send errors on a request + to a missing slave + """ + framer = kwargs.pop('framer', ModbusSocketFramer) + server = ModbusUdpServer(context, framer, identity, address, **kwargs) + + for f in custom_functions: + server.decoder.register(f) # pragma: no cover + + if not defer_start: + await server.serve_forever() # pragma: no cover + + return server + + + +def StartSerialServer(context=None, identity=None, custom_functions=[], + **kwargs):# pragma: no cover + """ A factory to start and run a serial modbus server + + :param context: The ModbusServerContext datastore + :param identity: An optional identify structure + :param custom_functions: An optional list of custom function classes + supported by server instance. + :param framer: The framer to operate with (default ModbusAsciiFramer) + :param port: The serial port to attach to + :param stopbits: The number of stop bits to use + :param bytesize: The bytesize of the serial messages + :param parity: Which kind of parity to use + :param baudrate: The baud rate to use for the serial device + :param timeout: The timeout to use for the serial device + :param ignore_missing_slaves: True to not send errors on a request to a + missing slave + """ + raise NotImplementedException + import serial + framer = kwargs.pop('framer', ModbusAsciiFramer) + server = ModbusSerialServer(context, framer, identity, **kwargs) + for f in custom_functions: + server.decoder.register(f) + server.serve_forever() + +def StopServer(): + """ + Helper method to stop Async Server + """ + import warnings + warnings.warn("deprecated API for asyncio. Call server_close() on server object returned by StartXxxServer", + DeprecationWarning) + + + +# --------------------------------------------------------------------------- # +# Exported symbols +# --------------------------------------------------------------------------- # + + +__all__ = [ + "StartTcpServer", "StartUdpServer", "StartSerialServer" +] + diff --git a/requirements-tests.txt b/requirements-tests.txt index 515eba29e..2ca42aa2d 100644 --- a/requirements-tests.txt +++ b/requirements-tests.txt @@ -16,3 +16,4 @@ verboselogs >= 1.5 tornado==4.5.3 Twisted>=17.1.0 zope.interface>=4.4.0 +asynctest>=0.10.0 diff --git a/test/test_server_asyncio.py b/test/test_server_asyncio.py new file mode 100755 index 000000000..26927a2ab --- /dev/null +++ b/test/test_server_asyncio.py @@ -0,0 +1,565 @@ +#!/usr/bin/env python +from pymodbus.compat import IS_PYTHON3 +import pytest +import asynctest +import asyncio +import logging +_logger = logging.getLogger() +if IS_PYTHON3: # Python 3 + from asynctest.mock import patch, Mock, MagicMock + +from pymodbus.device import ModbusDeviceIdentification +from pymodbus.factory import ServerDecoder +from pymodbus.server.asynchronous import ModbusTcpProtocol, ModbusUdpProtocol +from pymodbus.server.asyncio import StartTcpServer, StartUdpServer, StartSerialServer, StopServer, ModbusServerFactory +from pymodbus.server.asyncio import ModbusConnectedRequestHandler, ModbusBaseRequestHandler +from pymodbus.datastore import ModbusSequentialDataBlock +from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext +from pymodbus.compat import byte2int +from pymodbus.transaction import ModbusSocketFramer +from pymodbus.exceptions import NoSuchSlaveException, ModbusIOException + +import sys +#---------------------------------------------------------------------------# +# Fixture +#---------------------------------------------------------------------------# +import platform +from distutils.version import LooseVersion + +IS_DARWIN = platform.system().lower() == "darwin" +OSX_SIERRA = LooseVersion("10.12") +if IS_DARWIN: + IS_HIGH_SIERRA_OR_ABOVE = LooseVersion(platform.mac_ver()[0]) + SERIAL_PORT = '/dev/ptyp0' if not IS_HIGH_SIERRA_OR_ABOVE else '/dev/ttyp0' +else: + IS_HIGH_SIERRA_OR_ABOVE = False + SERIAL_PORT = "/dev/ptmx" + +@pytest.mark.skipif(not IS_PYTHON3, reason="requires python3.4 or above") +class AsyncioServerTest(asynctest.TestCase): + ''' + This is the unittest for the pymodbus.server.asyncio module + + The scope of this unit test is the life-cycle management of the network + connections and server objects. + + This unittest suite does not attempt to test any of the underlying protocol details + ''' + + #-----------------------------------------------------------------------# + # Setup/TearDown + #-----------------------------------------------------------------------# + def setUp(self): + ''' + Initialize the test environment by setting up a dummy store and context + ''' + self.store = ModbusSlaveContext( di=ModbusSequentialDataBlock(0, [17]*100), + co=ModbusSequentialDataBlock(0, [17]*100), + hr=ModbusSequentialDataBlock(0, [17]*100), + ir=ModbusSequentialDataBlock(0, [17]*100)) + self.context = ModbusServerContext(slaves=self.store, single=True) + + def tearDown(self): + ''' Cleans up the test environment ''' + pass + + #-----------------------------------------------------------------------# + # Test ModbusConnectedRequestHandler + #-----------------------------------------------------------------------# + @asyncio.coroutine + def testStartTcpServer(self): + ''' Test that the modbus tcp asyncio server starts correctly ''' + identity = ModbusDeviceIdentification(info={0x00: 'VendorName'}) + self.loop = asynctest.Mock(self.loop) + server = yield from StartTcpServer(context=self.context,loop=self.loop,identity=identity) + self.assertEqual(server.control.Identity.VendorName, 'VendorName') + self.loop.create_server.assert_called_once() + + @asyncio.coroutine + def testTcpServerServeNoDefer(self): + ''' Test StartTcpServer without deferred start (immediate execution of server) ''' + with patch('asyncio.base_events.Server.serve_forever', new_callable=asynctest.CoroutineMock) as serve: + server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0), loop=self.loop, defer_start=False) + serve.assert_awaited() + + @asyncio.coroutine + def testTcpServerServeForever(self): + ''' Test StartTcpServer serve_forever() method ''' + with patch('asyncio.base_events.Server.serve_forever', new_callable=asynctest.CoroutineMock) as serve: + server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0), loop=self.loop) + yield from server.serve_forever() + serve.assert_awaited() + + @asyncio.coroutine + def testTcpServerServeForeverTwice(self): + ''' Call on serve_forever() twice should result in a runtime error ''' + server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0), loop=self.loop) + server_task = asyncio.create_task(server.serve_forever()) + yield from server.serving + with self.assertRaises(RuntimeError): + yield from server.serve_forever() + server.server_close() + + @asyncio.coroutine + def testTcpServerReceiveData(self): + ''' Test data sent on socket is received by internals - doesn't not process data ''' + data = b'\x01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x19' + server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop) + server_task = asyncio.create_task(server.serve_forever()) + yield from server.serving + with patch('pymodbus.transaction.ModbusSocketFramer.processIncomingPacket', new_callable=Mock) as process: + # process = server.framer.processIncomingPacket = Mock() + connected = self.loop.create_future() + random_port = server.server.sockets[0].getsockname()[1] # get the random server port + + class BasicClient(asyncio.BaseProtocol): + def connection_made(self, transport): + self.transport = transport + self.transport.write(data) + connected.set_result(True) + + def eof_received(self): + pass + + transport, protocol = yield from self.loop.create_connection(BasicClient, host='127.0.0.1',port=random_port) + yield from asyncio.sleep(0.1) # this may be better done by making an internal hook in the actual implementation + # if this unit test fails on a machine, see if increasing the sleep time makes a difference, if it does + # blame author for a fix + + process.assert_called_once() + self.assertTrue( process.call_args[1]["data"] == data ) + server.server_close() + + @asyncio.coroutine + def testTcpServerRoundtrip(self): + ''' Test sending and receiving data on tcp socket ''' + data = b"\x01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01" # unit 1, read register + expected_response = b'\x01\x00\x00\x00\x00\x05\x01\x03\x02\x00\x11' # value of 17 as per context + server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop) + server_task = asyncio.create_task(server.serve_forever()) + yield from server.serving + + random_port = server.server.sockets[0].getsockname()[1] # get the random server port + + connected, done = self.loop.create_future(),self.loop.create_future() + received_value = None + + class BasicClient(asyncio.BaseProtocol): + def connection_made(self, transport): + self.transport = transport + self.transport.write(data) + connected.set_result(True) + + def data_received(self, data): + nonlocal received_value, done + received_value = data + done.set_result(True) + + def eof_received(self): + pass + + transport, protocol = yield from self.loop.create_connection(BasicClient, host='127.0.0.1',port=random_port) + yield from asyncio.wait_for(done, timeout=0.1) + + self.assertEqual(received_value, expected_response) + + transport.close() + yield from asyncio.sleep(0) + server.server_close() + + @asyncio.coroutine + def testTcpServerConnectionLost(self): + ''' Test tcp stream interruption ''' + data = b"\x01\x00\x00\x00\x00\x06\x01\x01\x00\x00\x00\x01" + server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop) + server_task = asyncio.create_task(server.serve_forever()) + yield from server.serving + + random_port = server.server.sockets[0].getsockname()[1] # get the random server port + + step1 = self.loop.create_future() + done = self.loop.create_future() + received_value = None + + class BasicClient(asyncio.BaseProtocol): + def connection_made(self, transport): + self.transport = transport + step1.set_result(True) + + transport, protocol = yield from self.loop.create_connection(BasicClient, host='127.0.0.1',port=random_port) + yield from step1 + + self.assertTrue( len(server.active_connections) == 1 ) + + protocol.transport.close() # close isn't synchronous and there's no notification that it's done + # so we have to wait a bit + yield from asyncio.sleep(0.1) + self.assertTrue( len(server.active_connections) == 0 ) + server.server_close() + + @asyncio.coroutine + def testTcpServerCloseActiveConnection(self): + ''' Test server_close() while there are active TCP connections ''' + data = b"\x01\x00\x00\x00\x00\x06\x01\x01\x00\x00\x00\x01" + server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop) + server_task = asyncio.create_task(server.serve_forever()) + yield from server.serving + + random_port = server.server.sockets[0].getsockname()[1] # get the random server port + + step1 = self.loop.create_future() + done = self.loop.create_future() + received_value = None + + class BasicClient(asyncio.BaseProtocol): + def connection_made(self, transport): + self.transport = transport + step1.set_result(True) + + transport, protocol = yield from self.loop.create_connection(BasicClient, host='127.0.0.1',port=random_port) + yield from step1 + + server.server_close() + + # close isn't synchronous and there's no notification that it's done + # so we have to wait a bit + yield from asyncio.sleep(0.0) + self.assertTrue( len(server.active_connections) == 0 ) + + @asyncio.coroutine + def testTcpServerException(self): + ''' Sending garbage data on a TCP socket should drop the connection ''' + garbage = b'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF' + server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop) + server_task = asyncio.create_task(server.serve_forever()) + yield from server.serving + with patch('pymodbus.transaction.ModbusSocketFramer.processIncomingPacket', + new_callable=lambda : Mock(side_effect=Exception)) as process: + connect, receive, eof = self.loop.create_future(),self.loop.create_future(),self.loop.create_future() + received_data = None + random_port = server.server.sockets[0].getsockname()[1] # get the random server port + + class BasicClient(asyncio.BaseProtocol): + def connection_made(self, transport): + _logger.debug("Client connected") + self.transport = transport + transport.write(garbage) + connect.set_result(True) + + def data_received(self, data): + _logger.debug("Client received data") + receive.set_result(True) + received_data = data + + def eof_received(self): + _logger.debug("Client stream eof") + eof.set_result(True) + + transport, protocol = yield from self.loop.create_connection(BasicClient, host='127.0.0.1',port=random_port) + yield from asyncio.wait_for(connect, timeout=0.1) + yield from asyncio.wait_for(eof, timeout=0.1) + # neither of these should timeout if the test is successful + server.server_close() + + @asyncio.coroutine + def testTcpServerNoSlave(self): + ''' Test unknown slave unit exception ''' + context = ModbusServerContext(slaves={0x01: self.store, 0x02: self.store }, single=False) + data = b"\x01\x00\x00\x00\x00\x06\x05\x03\x00\x00\x00\x01" # get slave 5 function 3 (holding register) + server = yield from StartTcpServer(context=context,address=("127.0.0.1", 0),loop=self.loop) + server_task = asyncio.create_task(server.serve_forever()) + yield from server.serving + connect, receive, eof = self.loop.create_future(),self.loop.create_future(),self.loop.create_future() + received_data = None + random_port = server.server.sockets[0].getsockname()[1] # get the random server port + + class BasicClient(asyncio.BaseProtocol): + def connection_made(self, transport): + _logger.debug("Client connected") + self.transport = transport + transport.write(data) + connect.set_result(True) + + def data_received(self, data): + _logger.debug("Client received data") + receive.set_result(True) + received_data = data + + def eof_received(self): + _logger.debug("Client stream eof") + eof.set_result(True) + + transport, protocol = yield from self.loop.create_connection(BasicClient, host='127.0.0.1',port=random_port) + yield from asyncio.wait_for(connect, timeout=0.1) + self.assertFalse(eof.done()) + server.server_close() + + @asyncio.coroutine + def testTcpServerModbusError(self): + ''' Test sending garbage data on a TCP socket should drop the connection ''' + data = b"\x01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01" # get slave 5 function 3 (holding register) + server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop) + server_task = asyncio.create_task(server.serve_forever()) + yield from server.serving + with patch("pymodbus.register_read_message.ReadHoldingRegistersRequest.execute", + side_effect=NoSuchSlaveException): + connect, receive, eof = self.loop.create_future(),self.loop.create_future(),self.loop.create_future() + received_data = None + random_port = server.server.sockets[0].getsockname()[1] # get the random server port + + class BasicClient(asyncio.BaseProtocol): + def connection_made(self, transport): + _logger.debug("Client connected") + self.transport = transport + transport.write(data) + connect.set_result(True) + + def data_received(self, data): + _logger.debug("Client received data") + receive.set_result(True) + received_data = data + + def eof_received(self): + _logger.debug("Client stream eof") + eof.set_result(True) + + transport, protocol = yield from self.loop.create_connection(BasicClient, host='127.0.0.1',port=random_port) + yield from asyncio.wait_for(connect, timeout=0.1) + yield from asyncio.wait_for(receive, timeout=0.1) + self.assertFalse(eof.done()) + transport.close() + server.server_close() + + @asyncio.coroutine + def testTcpServerInternalException(self): + ''' Test sending garbage data on a TCP socket should drop the connection ''' + data = b"\x01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01" # get slave 5 function 3 (holding register) + server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop) + server_task = asyncio.create_task(server.serve_forever()) + yield from server.serving + with patch("pymodbus.register_read_message.ReadHoldingRegistersRequest.execute", + side_effect=Exception): + connect, receive, eof = self.loop.create_future(),self.loop.create_future(),self.loop.create_future() + received_data = None + random_port = server.server.sockets[0].getsockname()[1] # get the random server port + + class BasicClient(asyncio.BaseProtocol): + def connection_made(self, transport): + _logger.debug("Client connected") + self.transport = transport + transport.write(data) + connect.set_result(True) + + def data_received(self, data): + _logger.debug("Client received data") + receive.set_result(True) + received_data = data + + def eof_received(self): + _logger.debug("Client stream eof") + eof.set_result(True) + + transport, protocol = yield from self.loop.create_connection(BasicClient, host='127.0.0.1',port=random_port) + yield from asyncio.wait_for(connect, timeout=0.1) + yield from asyncio.wait_for(receive, timeout=0.1) + self.assertFalse(eof.done()) + + transport.close() + server.server_close() + + + + #-----------------------------------------------------------------------# + # Test ModbusUdpProtocol + #-----------------------------------------------------------------------# + + @asyncio.coroutine + def testStartUdpServer(self): + ''' Test that the modbus udp asyncio server starts correctly ''' + identity = ModbusDeviceIdentification(info={0x00: 'VendorName'}) + self.loop = asynctest.Mock(self.loop) + server = yield from StartUdpServer(context=self.context,loop=self.loop,identity=identity) + self.assertEqual(server.control.Identity.VendorName, 'VendorName') + self.loop.create_datagram_endpoint.assert_called_once() + + # async def testUdpServerServeNoDefer(self): + # ''' Test StartUdpServer without deferred start - NOT IMPLEMENTED - this test is hard to do without additional + # internal plumbing added to the implementation ''' + # asyncio.base_events.Server.serve_forever = asynctest.CoroutineMock() + # server = yield from StartUdpServer(address=("127.0.0.1", 0), loop=self.loop, defer_start=False) + # server.server.serve_forever.assert_awaited() + + @asyncio.coroutine + def testUdpServerServeForeverStart(self): + ''' Test StartUdpServer serve_forever() method ''' + with patch('asyncio.base_events.Server.serve_forever', new_callable=asynctest.CoroutineMock) as serve: + server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0), loop=self.loop) + yield from server.serve_forever() + serve.assert_awaited() + + @asyncio.coroutine + def testUdpServerServeForeverClose(self): + ''' Test StartUdpServer serve_forever() method ''' + server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0), loop=self.loop) + server_task = asyncio.create_task(server.serve_forever()) + yield from server.serving + + self.assertTrue(asyncio.isfuture(server.on_connection_terminated)) + self.assertFalse(server.on_connection_terminated.done()) + + server.server_close() + self.assertTrue(server.protocol.is_closing()) + + @asyncio.coroutine + def testUdpServerServeForeverTwice(self): + ''' Call on serve_forever() twice should result in a runtime error ''' + identity = ModbusDeviceIdentification(info={0x00: 'VendorName'}) + server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0), + loop=self.loop,identity=identity) + server_task = asyncio.create_task(server.serve_forever()) + yield from server.serving + with self.assertRaises(RuntimeError): + yield from server.serve_forever() + server.server_close() + + @asyncio.coroutine + def testUdpServerReceiveData(self): + ''' Test that the sending data on datagram socket gets data pushed to framer ''' + server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop) + server_task = asyncio.create_task(server.serve_forever()) + yield from server.serving + with patch('pymodbus.transaction.ModbusSocketFramer.processIncomingPacket',new_callable=Mock) as process: + + server.endpoint.datagram_received(data=b"12345", addr=("127.0.0.1", 12345)) + yield from asyncio.sleep(0.1) + process.seal() + + process.assert_called_once() + self.assertTrue( process.call_args[1]["data"] == b"12345" ) + + server.server_close() + + @asyncio.coroutine + def testUdpServerSendData(self): + ''' Test that the modbus udp asyncio server correctly sends data outbound ''' + identity = ModbusDeviceIdentification(info={0x00: 'VendorName'}) + data = b'x\01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x19' + server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0)) + server_task = asyncio.create_task(server.serve_forever()) + yield from server.serving + random_port = server.protocol._sock.getsockname()[1] + received = server.endpoint.datagram_received = Mock(wraps=server.endpoint.datagram_received) + done = self.loop.create_future() + received_value = None + + class BasicClient(asyncio.DatagramProtocol): + def connection_made(self, transport): + self.transport = transport + self.transport.sendto(data) + + def datagram_received(self, data, addr): + nonlocal received_value, done + print("received") + received_value = data + done.set_result(True) + self.transport.close() + + transport, protocol = yield from self.loop.create_datagram_endpoint( BasicClient, + remote_addr=('127.0.0.1', random_port)) + + yield from asyncio.sleep(0.1) + + received.assert_called_once() + self.assertEqual(received.call_args[0][0], data) + + server.server_close() + + self.assertTrue(server.protocol.is_closing()) + yield from asyncio.sleep(0.1) + + @asyncio.coroutine + def testUdpServerRoundtrip(self): + ''' Test sending and receiving data on udp socket''' + data = b"\x01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01" # unit 1, read register + expected_response = b'\x01\x00\x00\x00\x00\x05\x01\x03\x02\x00\x11' # value of 17 as per context + server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop) + server_task = asyncio.create_task(server.serve_forever()) + yield from server.serving + + random_port = server.protocol._sock.getsockname()[1] + + connected, done = self.loop.create_future(),self.loop.create_future() + received_value = None + + class BasicClient(asyncio.DatagramProtocol): + def connection_made(self, transport): + self.transport = transport + self.transport.sendto(data) + + def datagram_received(self, data, addr): + nonlocal received_value, done + print("received") + received_value = data + done.set_result(True) + + transport, protocol = yield from self.loop.create_datagram_endpoint( BasicClient, + remote_addr=('127.0.0.1', random_port)) + yield from asyncio.wait_for(done, timeout=0.1) + + self.assertEqual(received_value, expected_response) + + transport.close() + yield from asyncio.sleep(0) + server.server_close() + + @asyncio.coroutine + def testUdpServerException(self): + ''' Test sending garbage data on a TCP socket should drop the connection ''' + garbage = b'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF' + server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop) + server_task = asyncio.create_task(server.serve_forever()) + yield from server.serving + with patch('pymodbus.transaction.ModbusSocketFramer.processIncomingPacket', + new_callable=lambda: Mock(side_effect=Exception)) as process: + connect, receive, eof = self.loop.create_future(),self.loop.create_future(),self.loop.create_future() + received_data = None + random_port = server.protocol._sock.getsockname()[1] # get the random server port + + class BasicClient(asyncio.DatagramProtocol): + def connection_made(self, transport): + _logger.debug("Client connected") + self.transport = transport + transport.sendto(garbage) + connect.set_result(True) + + def datagram_received(self, data, addr): + nonlocal receive + _logger.debug("Client received data") + receive.set_result(True) + received_data = data + + transport, protocol = yield from self.loop.create_datagram_endpoint(BasicClient, + remote_addr=('127.0.0.1', random_port)) + yield from asyncio.wait_for(connect, timeout=0.1) + self.assertFalse(receive.done()) + self.assertFalse(server.protocol._sock._closed) + server.server_close() + + # -----------------------------------------------------------------------# + # Test ModbusServerFactory + # -----------------------------------------------------------------------# + def testModbusServerFactory(self): + ''' Test the base class for all the clients ''' + with self.assertWarns(DeprecationWarning): + factory = ModbusServerFactory(store=None) + + def testStopServer(self): + with self.assertWarns(DeprecationWarning): + StopServer() + + +# --------------------------------------------------------------------------- # +# Main +# --------------------------------------------------------------------------- # +if __name__ == "__main__": + asynctest.main() \ No newline at end of file