diff --git a/pymodbus/client/sync.py b/pymodbus/client/sync.py index 51e8164ee..0ad2b222d 100644 --- a/pymodbus/client/sync.py +++ b/pymodbus/client/sync.py @@ -1,5 +1,6 @@ import socket import select +import serial import time import sys from functools import partial diff --git a/pymodbus/server/asyncio.py b/pymodbus/server/asyncio.py index 0652c3372..5566452bd 100755 --- a/pymodbus/server/asyncio.py +++ b/pymodbus/server/asyncio.py @@ -8,6 +8,7 @@ import traceback import asyncio +from pymodbus.compat import PYTHON_VERSION from pymodbus.constants import Defaults from pymodbus.utilities import hexlify_packets from pymodbus.factory import ServerDecoder @@ -61,7 +62,10 @@ def connection_made(self, transport): self.framer = self.server.framer(self.server.decoder, client=None) # schedule the connection handler on the event loop - self.handler_task = asyncio.create_task(self.handle()) + if PYTHON_VERSION >= (3, 7): + self.handler_task = asyncio.create_task(self.handle()) + else: + self.handler_task = asyncio.ensure_future(self.handle()) except Exception as ex: # pragma: no cover _logger.debug("Datastore unable to fulfill request: " "%s; %s", ex, traceback.format_exc()) @@ -392,12 +396,20 @@ def __init__(self, self.serving = self.loop.create_future() # asyncio future that will be done once server has started self.server = None # constructors cannot be declared async, so we have to defer the initialization of the server - self.server_factory = self.loop.create_server(lambda : self.handler(self), + if PYTHON_VERSION >= (3, 7): + # start_serving is new in version 3.7 + self.server_factory = self.loop.create_server(lambda : self.handler(self), *self.address, reuse_address=allow_reuse_address, reuse_port=allow_reuse_port, backlog=backlog, start_serving=not defer_start) + else: + self.server_factory = self.loop.create_server(lambda : self.handler(self), + *self.address, + reuse_address=allow_reuse_address, + reuse_port=allow_reuse_port, + backlog=backlog) async def serve_forever(self): if self.server is None: @@ -409,7 +421,7 @@ async def serve_forever(self): def server_close(self): for k,v in self.active_connections.items(): - _logger.warning(f"aborting active session {k}") + _logger.warning("aborting active session {}".format(k)) v.handler_task.cancel() self.active_connections = {} self.server.close() diff --git a/test/test_server_asyncio.py b/test/test_server_asyncio.py index 26927a2ab..372c96479 100755 --- a/test/test_server_asyncio.py +++ b/test/test_server_asyncio.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -from pymodbus.compat import IS_PYTHON3 +from pymodbus.compat import IS_PYTHON3, PYTHON_VERSION import pytest import asynctest import asyncio @@ -73,8 +73,10 @@ def testStartTcpServer(self): self.loop = asynctest.Mock(self.loop) server = yield from StartTcpServer(context=self.context,loop=self.loop,identity=identity) self.assertEqual(server.control.Identity.VendorName, 'VendorName') - self.loop.create_server.assert_called_once() + if PYTHON_VERSION >= (3, 6): + self.loop.create_server.assert_called_once() + @pytest.mark.skipif(PYTHON_VERSION < (3, 7), reason="requires python3.7 or above") @asyncio.coroutine def testTcpServerServeNoDefer(self): ''' Test StartTcpServer without deferred start (immediate execution of server) ''' @@ -82,6 +84,7 @@ def testTcpServerServeNoDefer(self): server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0), loop=self.loop, defer_start=False) serve.assert_awaited() + @pytest.mark.skipif(PYTHON_VERSION < (3, 7), reason="requires python3.7 or above") @asyncio.coroutine def testTcpServerServeForever(self): ''' Test StartTcpServer serve_forever() method ''' @@ -94,7 +97,10 @@ def testTcpServerServeForever(self): def testTcpServerServeForeverTwice(self): ''' Call on serve_forever() twice should result in a runtime error ''' server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0), loop=self.loop) - server_task = asyncio.create_task(server.serve_forever()) + if PYTHON_VERSION >= (3, 7): + server_task = asyncio.create_task(server.serve_forever()) + else: + server_task = asyncio.ensure_future(server.serve_forever()) yield from server.serving with self.assertRaises(RuntimeError): yield from server.serve_forever() @@ -105,7 +111,10 @@ def testTcpServerReceiveData(self): ''' Test data sent on socket is received by internals - doesn't not process data ''' data = b'\x01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x19' server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop) - server_task = asyncio.create_task(server.serve_forever()) + if PYTHON_VERSION >= (3, 7): + server_task = asyncio.create_task(server.serve_forever()) + else: + server_task = asyncio.ensure_future(server.serve_forever()) yield from server.serving with patch('pymodbus.transaction.ModbusSocketFramer.processIncomingPacket', new_callable=Mock) as process: # process = server.framer.processIncomingPacket = Mock() @@ -126,7 +135,8 @@ def eof_received(self): # if this unit test fails on a machine, see if increasing the sleep time makes a difference, if it does # blame author for a fix - process.assert_called_once() + if PYTHON_VERSION >= (3, 6): + process.assert_called_once() self.assertTrue( process.call_args[1]["data"] == data ) server.server_close() @@ -136,7 +146,10 @@ def testTcpServerRoundtrip(self): data = b"\x01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01" # unit 1, read register expected_response = b'\x01\x00\x00\x00\x00\x05\x01\x03\x02\x00\x11' # value of 17 as per context server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop) - server_task = asyncio.create_task(server.serve_forever()) + if PYTHON_VERSION >= (3, 7): + server_task = asyncio.create_task(server.serve_forever()) + else: + server_task = asyncio.ensure_future(server.serve_forever()) yield from server.serving random_port = server.server.sockets[0].getsockname()[1] # get the random server port @@ -172,7 +185,10 @@ def testTcpServerConnectionLost(self): ''' Test tcp stream interruption ''' data = b"\x01\x00\x00\x00\x00\x06\x01\x01\x00\x00\x00\x01" server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop) - server_task = asyncio.create_task(server.serve_forever()) + if PYTHON_VERSION >= (3, 7): + server_task = asyncio.create_task(server.serve_forever()) + else: + server_task = asyncio.ensure_future(server.serve_forever()) yield from server.serving random_port = server.server.sockets[0].getsockname()[1] # get the random server port @@ -202,7 +218,10 @@ def testTcpServerCloseActiveConnection(self): ''' Test server_close() while there are active TCP connections ''' data = b"\x01\x00\x00\x00\x00\x06\x01\x01\x00\x00\x00\x01" server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop) - server_task = asyncio.create_task(server.serve_forever()) + if PYTHON_VERSION >= (3, 7): + server_task = asyncio.create_task(server.serve_forever()) + else: + server_task = asyncio.ensure_future(server.serve_forever()) yield from server.serving random_port = server.server.sockets[0].getsockname()[1] # get the random server port @@ -231,7 +250,10 @@ def testTcpServerException(self): ''' Sending garbage data on a TCP socket should drop the connection ''' garbage = b'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF' server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop) - server_task = asyncio.create_task(server.serve_forever()) + if PYTHON_VERSION >= (3, 7): + server_task = asyncio.create_task(server.serve_forever()) + else: + server_task = asyncio.ensure_future(server.serve_forever()) yield from server.serving with patch('pymodbus.transaction.ModbusSocketFramer.processIncomingPacket', new_callable=lambda : Mock(side_effect=Exception)) as process: @@ -267,7 +289,10 @@ def testTcpServerNoSlave(self): context = ModbusServerContext(slaves={0x01: self.store, 0x02: self.store }, single=False) data = b"\x01\x00\x00\x00\x00\x06\x05\x03\x00\x00\x00\x01" # get slave 5 function 3 (holding register) server = yield from StartTcpServer(context=context,address=("127.0.0.1", 0),loop=self.loop) - server_task = asyncio.create_task(server.serve_forever()) + if PYTHON_VERSION >= (3, 7): + server_task = asyncio.create_task(server.serve_forever()) + else: + server_task = asyncio.ensure_future(server.serve_forever()) yield from server.serving connect, receive, eof = self.loop.create_future(),self.loop.create_future(),self.loop.create_future() received_data = None @@ -299,7 +324,10 @@ def testTcpServerModbusError(self): ''' Test sending garbage data on a TCP socket should drop the connection ''' data = b"\x01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01" # get slave 5 function 3 (holding register) server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop) - server_task = asyncio.create_task(server.serve_forever()) + if PYTHON_VERSION >= (3, 7): + server_task = asyncio.create_task(server.serve_forever()) + else: + server_task = asyncio.ensure_future(server.serve_forever()) yield from server.serving with patch("pymodbus.register_read_message.ReadHoldingRegistersRequest.execute", side_effect=NoSuchSlaveException): @@ -335,7 +363,10 @@ def testTcpServerInternalException(self): ''' Test sending garbage data on a TCP socket should drop the connection ''' data = b"\x01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01" # get slave 5 function 3 (holding register) server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop) - server_task = asyncio.create_task(server.serve_forever()) + if PYTHON_VERSION >= (3, 7): + server_task = asyncio.create_task(server.serve_forever()) + else: + server_task = asyncio.ensure_future(server.serve_forever()) yield from server.serving with patch("pymodbus.register_read_message.ReadHoldingRegistersRequest.execute", side_effect=Exception): @@ -380,7 +411,8 @@ def testStartUdpServer(self): self.loop = asynctest.Mock(self.loop) server = yield from StartUdpServer(context=self.context,loop=self.loop,identity=identity) self.assertEqual(server.control.Identity.VendorName, 'VendorName') - self.loop.create_datagram_endpoint.assert_called_once() + if PYTHON_VERSION >= (3, 6): + self.loop.create_datagram_endpoint.assert_called_once() # async def testUdpServerServeNoDefer(self): # ''' Test StartUdpServer without deferred start - NOT IMPLEMENTED - this test is hard to do without additional @@ -389,6 +421,7 @@ def testStartUdpServer(self): # server = yield from StartUdpServer(address=("127.0.0.1", 0), loop=self.loop, defer_start=False) # server.server.serve_forever.assert_awaited() + @pytest.mark.skipif(PYTHON_VERSION < (3, 7), reason="requires python3.7 or above") @asyncio.coroutine def testUdpServerServeForeverStart(self): ''' Test StartUdpServer serve_forever() method ''' @@ -401,7 +434,10 @@ def testUdpServerServeForeverStart(self): def testUdpServerServeForeverClose(self): ''' Test StartUdpServer serve_forever() method ''' server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0), loop=self.loop) - server_task = asyncio.create_task(server.serve_forever()) + if PYTHON_VERSION >= (3, 7): + server_task = asyncio.create_task(server.serve_forever()) + else: + server_task = asyncio.ensure_future(server.serve_forever()) yield from server.serving self.assertTrue(asyncio.isfuture(server.on_connection_terminated)) @@ -416,7 +452,10 @@ def testUdpServerServeForeverTwice(self): identity = ModbusDeviceIdentification(info={0x00: 'VendorName'}) server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0), loop=self.loop,identity=identity) - server_task = asyncio.create_task(server.serve_forever()) + if PYTHON_VERSION >= (3, 7): + server_task = asyncio.create_task(server.serve_forever()) + else: + server_task = asyncio.ensure_future(server.serve_forever()) yield from server.serving with self.assertRaises(RuntimeError): yield from server.serve_forever() @@ -426,7 +465,10 @@ def testUdpServerServeForeverTwice(self): def testUdpServerReceiveData(self): ''' Test that the sending data on datagram socket gets data pushed to framer ''' server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop) - server_task = asyncio.create_task(server.serve_forever()) + if PYTHON_VERSION >= (3, 7): + server_task = asyncio.create_task(server.serve_forever()) + else: + server_task = asyncio.ensure_future(server.serve_forever()) yield from server.serving with patch('pymodbus.transaction.ModbusSocketFramer.processIncomingPacket',new_callable=Mock) as process: @@ -434,7 +476,8 @@ def testUdpServerReceiveData(self): yield from asyncio.sleep(0.1) process.seal() - process.assert_called_once() + if PYTHON_VERSION >= (3, 6): + process.assert_called_once() self.assertTrue( process.call_args[1]["data"] == b"12345" ) server.server_close() @@ -445,7 +488,10 @@ def testUdpServerSendData(self): identity = ModbusDeviceIdentification(info={0x00: 'VendorName'}) data = b'x\01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x19' server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0)) - server_task = asyncio.create_task(server.serve_forever()) + if PYTHON_VERSION >= (3, 7): + server_task = asyncio.create_task(server.serve_forever()) + else: + server_task = asyncio.ensure_future(server.serve_forever()) yield from server.serving random_port = server.protocol._sock.getsockname()[1] received = server.endpoint.datagram_received = Mock(wraps=server.endpoint.datagram_received) @@ -469,7 +515,8 @@ def datagram_received(self, data, addr): yield from asyncio.sleep(0.1) - received.assert_called_once() + if PYTHON_VERSION >= (3, 6): + received.assert_called_once() self.assertEqual(received.call_args[0][0], data) server.server_close() @@ -483,7 +530,10 @@ def testUdpServerRoundtrip(self): data = b"\x01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01" # unit 1, read register expected_response = b'\x01\x00\x00\x00\x00\x05\x01\x03\x02\x00\x11' # value of 17 as per context server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop) - server_task = asyncio.create_task(server.serve_forever()) + if PYTHON_VERSION >= (3, 7): + server_task = asyncio.create_task(server.serve_forever()) + else: + server_task = asyncio.ensure_future(server.serve_forever()) yield from server.serving random_port = server.protocol._sock.getsockname()[1] @@ -517,7 +567,10 @@ def testUdpServerException(self): ''' Test sending garbage data on a TCP socket should drop the connection ''' garbage = b'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF' server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop) - server_task = asyncio.create_task(server.serve_forever()) + if PYTHON_VERSION >= (3, 7): + server_task = asyncio.create_task(server.serve_forever()) + else: + server_task = asyncio.ensure_future(server.serve_forever()) yield from server.serving with patch('pymodbus.transaction.ModbusSocketFramer.processIncomingPacket', new_callable=lambda: Mock(side_effect=Exception)) as process: @@ -562,4 +615,4 @@ def testStopServer(self): # Main # --------------------------------------------------------------------------- # if __name__ == "__main__": - asynctest.main() \ No newline at end of file + asynctest.main()