Skip to content

Commit

Permalink
1. Add support to asyncio (#187)
Browse files Browse the repository at this point in the history
2. Updated examples and tests
3. Move to py.test from nosetests
4. Set coverage failure to 85% till all asyncio unit tests are covered
  • Loading branch information
dhoomakethu committed Dec 18, 2017
1 parent 5e00a05 commit 8612d54
Show file tree
Hide file tree
Showing 21 changed files with 588 additions and 97 deletions.
2 changes: 0 additions & 2 deletions .coveragerc

This file was deleted.

1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ test/__pycache__/
/test/lib/
/test/pip-selfcheck.json
/test/.Python
/.cache/
5 changes: 3 additions & 2 deletions examples/common/async-asyncio-client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio

from pymodbus.client.async.tcp import AsyncModbusTCPClient as ModbusClient
from pymodbus.client.async.udp import AsyncModbusUDPClient as ModbusClient
from pymodbus.client.async import schedulers


Expand All @@ -23,7 +24,7 @@ async def start_async_test(client):
# which defaults to `0x00`
#---------------------------------------------------------------------------#
log.debug("Reading Coils")
rr = client.read_coils(1, 1, unit=0x01)
rr = await client.read_coils(1, 1, unit=0x01)

#---------------------------------------------------------------------------#
# example requests
Expand Down Expand Up @@ -100,6 +101,6 @@ async def start_async_test(client):


if __name__ == '__main__':
loop, client = ModbusClient(schedulers.ASYNC_IO, port=5440)
loop, client = ModbusClient(schedulers.ASYNC_IO, port=5020)
loop.run_until_complete(start_async_test(client.protocol))
loop.close()
5 changes: 3 additions & 2 deletions examples/common/async-asyncio-serial-client.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ async def start_async_test(client):
await asyncio.sleep(1)

if __name__ == '__main__':
loop, client = ModbusClient(schedulers.ASYNC_IO, port='/dev/ptyp0', baudrate=9600, timeout=2, method="rtu")
loop.run_until_complete(start_async_test(client))
# socat -d -d PTY,link=/tmp/ptyp0,raw,echo=0,ispeed=9600 PTY,link=/tmp/ttyp0,raw,echo=0,ospeed=9600
loop, client = ModbusClient(schedulers.ASYNC_IO, port='/tmp/ptyp0', baudrate=9600, timeout=2, method="rtu")
loop.run_until_complete(start_async_test(client.protocol))
loop.close()
2 changes: 1 addition & 1 deletion examples/common/async-twisted-client-serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def error_handler(self, failure):
log.error(failure)


client, proto = AsyncModbusSerialClient(schedulers.REACTOR, method="rtu", port=SERIAL_PORT, timeout=2, proto_cls=ExampleProtocol)
proto, client = AsyncModbusSerialClient(schedulers.REACTOR, method="rtu", port=SERIAL_PORT, timeout=2, proto_cls=ExampleProtocol)
proto.start()
# proto.stop()

Expand Down
10 changes: 5 additions & 5 deletions examples/common/synchronous-server.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
#---------------------------------------------------------------------------#
# import the various server implementations
#---------------------------------------------------------------------------#
from pymodbus.server.sync import StartTcpServer
# from pymodbus.server.sync import StartUdpServer
from pymodbus.server.sync import StartSerialServer
# from pymodbus.server.sync import StartTcpServer
from pymodbus.server.sync import StartUdpServer
# from pymodbus.server.sync import StartSerialServer

from pymodbus.device import ModbusDeviceIdentification
from pymodbus.datastore import ModbusSequentialDataBlock
Expand Down Expand Up @@ -108,10 +108,10 @@
# StartTcpServer(context, identity=identity, address=("localhost", 5020))

# Udp:
# StartUdpServer(context, identity=identity, address=("localhost", 5020))
StartUdpServer(context, identity=identity, address=("localhost", 5020))

# Ascii:
#StartSerialServer(context, identity=identity, port='/dev/pts/3', timeout=1)

# RTU:
StartSerialServer(context, framer=ModbusRtuFramer, identity=identity, port='/dev/ttyp0', timeout=.005, baudrate=9600)
# StartSerialServer(context, framer=ModbusRtuFramer, identity=identity, port='/dev/ttyp0', timeout=.005, baudrate=9600)
193 changes: 189 additions & 4 deletions pymodbus/client/async/asyncio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,119 @@ def _buildResponse(self, tid):
self.transaction.addTransaction(f, tid)
return f

def close(self):
self.transport.close()
self._connected = False


class ModbusUdpClientProtocol(asyncio.DatagramProtocol, AsyncModbusClientMixin):
"""
Asyncio specific implementation of asynchronous modbus udp client protocol.
"""

#: Factory that created this instance.
factory = None

def __init__(self, host=None, port=0, **kwargs):
self.host = host
self.port = port
super(self.__class__, self).__init__(**kwargs)

def connection_made(self, transport):
self.transport = transport
self._connectionMade()

if self.factory:
self.factory.protocol_made_connection(self)

def connection_lost(self, reason):
self.transport = None
self._connectionLost(reason)

if self.factory:
self.factory.protocol_lost_connection(self)

def execute(self, request):
''' Starts the producer to send the next request to
consumer.write(Frame(request))
'''
request.transaction_id = self.transaction.getNextTID()
packet = self.framer.buildPacket(request)
self.transport.sendto(packet, (self.host, self.port))
return self._buildResponse(request.transaction_id)

def datagram_received(self, data, addr):
self._dataReceived(data)

def create_future(self):
return asyncio.Future()

def resolve_future(self, f, result):
f.set_result(result)

def raise_future(self, f, exc):
f.set_exception(exc)

def _connectionMade(self):
''' Called upon a successful client connection.
'''
_logger.debug("Client connected to modbus server")
self._connected = True

def _connectionLost(self, reason):
''' Called upon a client disconnect
:param reason: The reason for the disconnect
'''
_logger.debug("Client disconnected from modbus server: %s" % reason)
self._connected = False
for tid in list(self.transaction):
self.raise_future(self.transaction.getTransaction(tid), ConnectionException('Connection lost during request'))

def _handleResponse(self, reply, **kwargs):
''' Handle the processed response and link to correct deferred
:param reply: The reply to process
'''
if reply is not None:
tid = reply.transaction_id
handler = self.transaction.getTransaction(tid)
if handler:
self.resolve_future(handler, reply)
else:
_logger.debug("Unrequested message: " + str(reply))

def _buildResponse(self, tid):
''' Helper method to return a deferred response
for the current request.
:param tid: The transaction identifier for this response
:returns: A defer linked to the latest request
'''
f = self.create_future()
if not self._connected:
self.raise_future(f, ConnectionException('Client is not connected'))
else:
self.transaction.addTransaction(f, tid)
return f

def _dataReceived(self, data):
''' Get response, check for valid message, decode result
:param data: The data returned from the server
'''
self.framer.processIncomingPacket(data, self._handleResponse)

def close(self):
self.transport.close()
self._connected = False

@property
def connected(self):
""" Return connection status.
"""
return self._connected


class ReconnectingAsyncioModbusTcpClient(object):
"""Client to connect to modbus device repeatedly over TCP/IP."""
Expand Down Expand Up @@ -145,7 +258,6 @@ def start(self, host, port=502):
_logger.debug('Connecting to %s:%s.' % (host, port))
self.host = host
self.port = port

yield from self._connect()

def stop(self):
Expand Down Expand Up @@ -383,8 +495,6 @@ def __init__(self, host=None, port=502, protocol_class=None, loop=None):

self.connected = False



def stop(self):
# prevent reconnect:
# self.host = None
Expand Down Expand Up @@ -438,9 +548,84 @@ def protocol_lost_connection(self, protocol):
_logger.error('Factory protocol disconnect callback called while not connected.')


class AsyncioModbusSerialClient(object):
"""Client to connect to modbus device over serial."""
transport = None
framer = None

def __init__(self, port, protocol_class=None, framer=None, loop=None):
#: Protocol used to talk to modbus device.
self.protocol_class = protocol_class or ModbusClientProtocol
#: Current protocol instance.
self.protocol = None
#: Event loop to use.
self.loop = loop or asyncio.get_event_loop()
self.port = port
self.framer = framer
self._connected = False

def stop(self):

if self._connected:
if self.protocol:
if self.protocol.transport:
self.protocol.transport.close()

def _create_protocol(self):
"""Factory function to create initialized protocol instance."""
from serial_asyncio import create_serial_connection

def factory():
return self.protocol_class(framer=self.framer)

cor = create_serial_connection(self.loop, factory, self.port)
return cor

@asyncio.coroutine
def connect(self):
_logger.debug('Connecting.')
try:
yield from self.loop.create_connection(self._create_protocol, self.host, self.port)
_logger.info('Connected to %s:%s.' % (self.host, self.port))
except Exception as ex:
_logger.warning('Failed to connect: %s' % ex)
# asyncio.async(self._reconnect(), loop=self.loop)

def protocol_made_connection(self, protocol):
"""Protocol notification of successful connection."""
_logger.info('Protocol made connection.')
if not self._connected:
self._connected = True
self.protocol = protocol
else:
_logger.error('Factory protocol connect callback called while connected.')

def protocol_lost_connection(self, protocol):
"""Protocol notification of lost connection."""
if self._connected:
_logger.info('Protocol lost connection.')
if protocol is not self.protocol:
_logger.error('Factory protocol callback called from unexpected protocol instance.')

self._connected = False
self.protocol = None
# if self.host:
# asyncio.async(self._reconnect(), loop=self.loop)
else:
_logger.error('Factory protocol disconnect callback called while not connected.')


@asyncio.coroutine
def init_client(proto_cls, loop, host, port, **kwargs):
def init_tcp_client(proto_cls, loop, host, port, **kwargs):
client = ReconnectingAsyncioModbusTcpClient(protocol_class=proto_cls,
loop=loop)
yield from client.start(host, port)
return client


@asyncio.coroutine
def init_udp_client(proto_cls, loop, host, port, **kwargs):
client = ReconnectingAsyncioModbusUdpClient(protocol_class=proto_cls,
loop=loop)
yield from client.start(host, port)
return client
25 changes: 15 additions & 10 deletions pymodbus/client/async/factory/serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ def __init__(self, framer, *args, **kwargs):

proto = EventLoopThread("reactor", reactor.run, reactor.stop,
installSignalHandlers=0)
s = SerialModbusClient(framer, port, reactor, **kwargs)
ser_client = SerialModbusClient(framer, port, reactor, **kwargs)

return s, proto
return proto, ser_client


def io_loop_factory(port=None, framer=None, **kwargs):
Expand All @@ -68,22 +68,27 @@ def io_loop_factory(port=None, framer=None, **kwargs):

def async_io_factory(port=Defaults.Port, framer=None, **kwargs):
import asyncio
from pymodbus.client.async.asyncio import ModbusClientProtocol
from pymodbus.client.async.asyncio import ModbusClientProtocol, AsyncioModbusSerialClient
loop = kwargs.pop("loop", None) or asyncio.get_event_loop()
proto_cls = kwargs.pop("proto_cls", None) or ModbusClientProtocol

try:
from serial_asyncio import create_serial_connection
except ImportError:
LOGGER.critical("pyserial-asyncio is not installed, install with 'pip install pyserial-asyncio")
exit(0)

def proto_factory():
return proto_cls(framer=framer)

coro = create_serial_connection(loop, proto_factory, port, **kwargs)
import sys
sys.exit(1)
#
# def proto_factory():
# return proto_cls(framer=framer)

client = AsyncioModbusSerialClient(port, proto_cls, framer, loop)
# coro = create_serial_connection(loop, proto_factory, port, **kwargs)
coro = client._create_protocol()
transport, protocol = loop.run_until_complete(asyncio.gather(coro))[0]
return loop, protocol
client.transport = transport
client.protocol = protocol
return loop, client


def get_factory(scheduler):
Expand Down
6 changes: 3 additions & 3 deletions pymodbus/client/async/factory/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ def io_loop_factory(host="127.0.0.1", port=Defaults.Port, framer=None,
def async_io_factory(host="127.0.0.1", port=Defaults.Port, framer=None,
source_address=None, timeout=None, **kwargs):
import asyncio
from pymodbus.client.async.asyncio import init_client
from pymodbus.client.async.asyncio import init_tcp_client
loop = kwargs.get("loop") or asyncio.get_event_loop()
proto_cls = kwargs.get("proto_cls", None)
cor = init_client(proto_cls, loop, host, port)
cor = init_tcp_client(proto_cls, loop, host, port)
client = loop.run_until_complete(asyncio.gather(cor))[0]
return loop, client

Expand All @@ -77,7 +77,7 @@ def get_factory(scheduler):
elif scheduler == schedulers.ASYNC_IO:
return async_io_factory
else:
LOGGER.warn("Allowed Schedulers: {}, {}, {}".format(
LOGGER.warning("Allowed Schedulers: {}, {}, {}".format(
schedulers.REACTOR, schedulers.IO_LOOP, schedulers.ASYNC_IO
))
raise Exception("Invalid Scheduler '{}'".format(scheduler))
8 changes: 7 additions & 1 deletion pymodbus/client/async/factory/udp.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ def io_loop_factory(host="127.0.0.1", port=Defaults.Port, framer=None,

def async_io_factory(host="127.0.0.1", port=Defaults.Port, framer=None,
source_address=None, timeout=None, **kwargs):
raise NotImplementedError()
import asyncio
from pymodbus.client.async.asyncio import init_udp_client
loop = kwargs.get("loop") or asyncio.get_event_loop()
proto_cls = kwargs.get("proto_cls", None)
cor = init_udp_client(proto_cls, loop, host, port)
client = loop.run_until_complete(asyncio.gather(cor))[0]
return loop, client


def get_factory(scheduler):
Expand Down
3 changes: 2 additions & 1 deletion pymodbus/client/async/serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ def __new__(cls, scheduler, method, port, **kwargs):
"""
if not IS_PYTHON3 and scheduler == ASYNC_IO:
logger.critical("ASYNCIO is supported only on python3")
exit(0)
import sys
sys.exit(1)
factory_class = get_factory(scheduler)
framer = cls._framer(method)
yieldable = factory_class(framer=framer, port=port, **kwargs)
Expand Down
Loading

0 comments on commit 8612d54

Please sign in to comment.