Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple Python versions to fix test error from PR #400 #444

Merged
merged 6 commits into from
Oct 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pymodbus/client/sync.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import socket
import select
import serial
import time
import sys
from functools import partial
Expand Down
18 changes: 15 additions & 3 deletions pymodbus/server/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import traceback

import asyncio
from pymodbus.compat import PYTHON_VERSION
from pymodbus.constants import Defaults
from pymodbus.utilities import hexlify_packets
from pymodbus.factory import ServerDecoder
Expand Down Expand Up @@ -61,7 +62,10 @@ def connection_made(self, transport):
self.framer = self.server.framer(self.server.decoder, client=None)

# schedule the connection handler on the event loop
self.handler_task = asyncio.create_task(self.handle())
if PYTHON_VERSION >= (3, 7):
self.handler_task = asyncio.create_task(self.handle())
else:
self.handler_task = asyncio.ensure_future(self.handle())
except Exception as ex: # pragma: no cover
_logger.debug("Datastore unable to fulfill request: "
"%s; %s", ex, traceback.format_exc())
Expand Down Expand Up @@ -392,12 +396,20 @@ def __init__(self,

self.serving = self.loop.create_future() # asyncio future that will be done once server has started
self.server = None # constructors cannot be declared async, so we have to defer the initialization of the server
self.server_factory = self.loop.create_server(lambda : self.handler(self),
if PYTHON_VERSION >= (3, 7):
# start_serving is new in version 3.7
self.server_factory = self.loop.create_server(lambda : self.handler(self),
*self.address,
reuse_address=allow_reuse_address,
reuse_port=allow_reuse_port,
backlog=backlog,
start_serving=not defer_start)
else:
self.server_factory = self.loop.create_server(lambda : self.handler(self),
*self.address,
reuse_address=allow_reuse_address,
reuse_port=allow_reuse_port,
backlog=backlog)

async def serve_forever(self):
if self.server is None:
Expand All @@ -409,7 +421,7 @@ async def serve_forever(self):

def server_close(self):
for k,v in self.active_connections.items():
_logger.warning(f"aborting active session {k}")
_logger.warning("aborting active session {}".format(k))
v.handler_task.cancel()
self.active_connections = {}
self.server.close()
Expand Down
97 changes: 75 additions & 22 deletions test/test_server_asyncio.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python
from pymodbus.compat import IS_PYTHON3
from pymodbus.compat import IS_PYTHON3, PYTHON_VERSION
import pytest
import asynctest
import asyncio
Expand Down Expand Up @@ -73,15 +73,18 @@ def testStartTcpServer(self):
self.loop = asynctest.Mock(self.loop)
server = yield from StartTcpServer(context=self.context,loop=self.loop,identity=identity)
self.assertEqual(server.control.Identity.VendorName, 'VendorName')
self.loop.create_server.assert_called_once()
if PYTHON_VERSION >= (3, 6):
self.loop.create_server.assert_called_once()

@pytest.mark.skipif(PYTHON_VERSION < (3, 7), reason="requires python3.7 or above")
@asyncio.coroutine
def testTcpServerServeNoDefer(self):
''' Test StartTcpServer without deferred start (immediate execution of server) '''
with patch('asyncio.base_events.Server.serve_forever', new_callable=asynctest.CoroutineMock) as serve:
server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0), loop=self.loop, defer_start=False)
serve.assert_awaited()

@pytest.mark.skipif(PYTHON_VERSION < (3, 7), reason="requires python3.7 or above")
@asyncio.coroutine
def testTcpServerServeForever(self):
''' Test StartTcpServer serve_forever() method '''
Expand All @@ -94,7 +97,10 @@ def testTcpServerServeForever(self):
def testTcpServerServeForeverTwice(self):
''' Call on serve_forever() twice should result in a runtime error '''
server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0), loop=self.loop)
server_task = asyncio.create_task(server.serve_forever())
if PYTHON_VERSION >= (3, 7):
server_task = asyncio.create_task(server.serve_forever())
else:
server_task = asyncio.ensure_future(server.serve_forever())
yield from server.serving
with self.assertRaises(RuntimeError):
yield from server.serve_forever()
Expand All @@ -105,7 +111,10 @@ def testTcpServerReceiveData(self):
''' Test data sent on socket is received by internals - doesn't not process data '''
data = b'\x01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x19'
server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop)
server_task = asyncio.create_task(server.serve_forever())
if PYTHON_VERSION >= (3, 7):
server_task = asyncio.create_task(server.serve_forever())
else:
server_task = asyncio.ensure_future(server.serve_forever())
yield from server.serving
with patch('pymodbus.transaction.ModbusSocketFramer.processIncomingPacket', new_callable=Mock) as process:
# process = server.framer.processIncomingPacket = Mock()
Expand All @@ -126,7 +135,8 @@ def eof_received(self):
# if this unit test fails on a machine, see if increasing the sleep time makes a difference, if it does
# blame author for a fix

process.assert_called_once()
if PYTHON_VERSION >= (3, 6):
process.assert_called_once()
self.assertTrue( process.call_args[1]["data"] == data )
server.server_close()

Expand All @@ -136,7 +146,10 @@ def testTcpServerRoundtrip(self):
data = b"\x01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01" # unit 1, read register
expected_response = b'\x01\x00\x00\x00\x00\x05\x01\x03\x02\x00\x11' # value of 17 as per context
server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop)
server_task = asyncio.create_task(server.serve_forever())
if PYTHON_VERSION >= (3, 7):
server_task = asyncio.create_task(server.serve_forever())
else:
server_task = asyncio.ensure_future(server.serve_forever())
yield from server.serving

random_port = server.server.sockets[0].getsockname()[1] # get the random server port
Expand Down Expand Up @@ -172,7 +185,10 @@ def testTcpServerConnectionLost(self):
''' Test tcp stream interruption '''
data = b"\x01\x00\x00\x00\x00\x06\x01\x01\x00\x00\x00\x01"
server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop)
server_task = asyncio.create_task(server.serve_forever())
if PYTHON_VERSION >= (3, 7):
server_task = asyncio.create_task(server.serve_forever())
else:
server_task = asyncio.ensure_future(server.serve_forever())
yield from server.serving

random_port = server.server.sockets[0].getsockname()[1] # get the random server port
Expand Down Expand Up @@ -202,7 +218,10 @@ def testTcpServerCloseActiveConnection(self):
''' Test server_close() while there are active TCP connections '''
data = b"\x01\x00\x00\x00\x00\x06\x01\x01\x00\x00\x00\x01"
server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop)
server_task = asyncio.create_task(server.serve_forever())
if PYTHON_VERSION >= (3, 7):
server_task = asyncio.create_task(server.serve_forever())
else:
server_task = asyncio.ensure_future(server.serve_forever())
yield from server.serving

random_port = server.server.sockets[0].getsockname()[1] # get the random server port
Expand Down Expand Up @@ -231,7 +250,10 @@ def testTcpServerException(self):
''' Sending garbage data on a TCP socket should drop the connection '''
garbage = b'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF'
server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop)
server_task = asyncio.create_task(server.serve_forever())
if PYTHON_VERSION >= (3, 7):
server_task = asyncio.create_task(server.serve_forever())
else:
server_task = asyncio.ensure_future(server.serve_forever())
yield from server.serving
with patch('pymodbus.transaction.ModbusSocketFramer.processIncomingPacket',
new_callable=lambda : Mock(side_effect=Exception)) as process:
Expand Down Expand Up @@ -267,7 +289,10 @@ def testTcpServerNoSlave(self):
context = ModbusServerContext(slaves={0x01: self.store, 0x02: self.store }, single=False)
data = b"\x01\x00\x00\x00\x00\x06\x05\x03\x00\x00\x00\x01" # get slave 5 function 3 (holding register)
server = yield from StartTcpServer(context=context,address=("127.0.0.1", 0),loop=self.loop)
server_task = asyncio.create_task(server.serve_forever())
if PYTHON_VERSION >= (3, 7):
server_task = asyncio.create_task(server.serve_forever())
else:
server_task = asyncio.ensure_future(server.serve_forever())
yield from server.serving
connect, receive, eof = self.loop.create_future(),self.loop.create_future(),self.loop.create_future()
received_data = None
Expand Down Expand Up @@ -299,7 +324,10 @@ def testTcpServerModbusError(self):
''' Test sending garbage data on a TCP socket should drop the connection '''
data = b"\x01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01" # get slave 5 function 3 (holding register)
server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop)
server_task = asyncio.create_task(server.serve_forever())
if PYTHON_VERSION >= (3, 7):
server_task = asyncio.create_task(server.serve_forever())
else:
server_task = asyncio.ensure_future(server.serve_forever())
yield from server.serving
with patch("pymodbus.register_read_message.ReadHoldingRegistersRequest.execute",
side_effect=NoSuchSlaveException):
Expand Down Expand Up @@ -335,7 +363,10 @@ def testTcpServerInternalException(self):
''' Test sending garbage data on a TCP socket should drop the connection '''
data = b"\x01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01" # get slave 5 function 3 (holding register)
server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop)
server_task = asyncio.create_task(server.serve_forever())
if PYTHON_VERSION >= (3, 7):
server_task = asyncio.create_task(server.serve_forever())
else:
server_task = asyncio.ensure_future(server.serve_forever())
yield from server.serving
with patch("pymodbus.register_read_message.ReadHoldingRegistersRequest.execute",
side_effect=Exception):
Expand Down Expand Up @@ -380,7 +411,8 @@ def testStartUdpServer(self):
self.loop = asynctest.Mock(self.loop)
server = yield from StartUdpServer(context=self.context,loop=self.loop,identity=identity)
self.assertEqual(server.control.Identity.VendorName, 'VendorName')
self.loop.create_datagram_endpoint.assert_called_once()
if PYTHON_VERSION >= (3, 6):
self.loop.create_datagram_endpoint.assert_called_once()

# async def testUdpServerServeNoDefer(self):
# ''' Test StartUdpServer without deferred start - NOT IMPLEMENTED - this test is hard to do without additional
Expand All @@ -389,6 +421,7 @@ def testStartUdpServer(self):
# server = yield from StartUdpServer(address=("127.0.0.1", 0), loop=self.loop, defer_start=False)
# server.server.serve_forever.assert_awaited()

@pytest.mark.skipif(PYTHON_VERSION < (3, 7), reason="requires python3.7 or above")
@asyncio.coroutine
def testUdpServerServeForeverStart(self):
''' Test StartUdpServer serve_forever() method '''
Expand All @@ -401,7 +434,10 @@ def testUdpServerServeForeverStart(self):
def testUdpServerServeForeverClose(self):
''' Test StartUdpServer serve_forever() method '''
server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0), loop=self.loop)
server_task = asyncio.create_task(server.serve_forever())
if PYTHON_VERSION >= (3, 7):
server_task = asyncio.create_task(server.serve_forever())
else:
server_task = asyncio.ensure_future(server.serve_forever())
yield from server.serving

self.assertTrue(asyncio.isfuture(server.on_connection_terminated))
Expand All @@ -416,7 +452,10 @@ def testUdpServerServeForeverTwice(self):
identity = ModbusDeviceIdentification(info={0x00: 'VendorName'})
server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0),
loop=self.loop,identity=identity)
server_task = asyncio.create_task(server.serve_forever())
if PYTHON_VERSION >= (3, 7):
server_task = asyncio.create_task(server.serve_forever())
else:
server_task = asyncio.ensure_future(server.serve_forever())
yield from server.serving
with self.assertRaises(RuntimeError):
yield from server.serve_forever()
Expand All @@ -426,15 +465,19 @@ def testUdpServerServeForeverTwice(self):
def testUdpServerReceiveData(self):
''' Test that the sending data on datagram socket gets data pushed to framer '''
server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop)
server_task = asyncio.create_task(server.serve_forever())
if PYTHON_VERSION >= (3, 7):
server_task = asyncio.create_task(server.serve_forever())
else:
server_task = asyncio.ensure_future(server.serve_forever())
yield from server.serving
with patch('pymodbus.transaction.ModbusSocketFramer.processIncomingPacket',new_callable=Mock) as process:

server.endpoint.datagram_received(data=b"12345", addr=("127.0.0.1", 12345))
yield from asyncio.sleep(0.1)
process.seal()

process.assert_called_once()
if PYTHON_VERSION >= (3, 6):
process.assert_called_once()
self.assertTrue( process.call_args[1]["data"] == b"12345" )

server.server_close()
Expand All @@ -445,7 +488,10 @@ def testUdpServerSendData(self):
identity = ModbusDeviceIdentification(info={0x00: 'VendorName'})
data = b'x\01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x19'
server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0))
server_task = asyncio.create_task(server.serve_forever())
if PYTHON_VERSION >= (3, 7):
server_task = asyncio.create_task(server.serve_forever())
else:
server_task = asyncio.ensure_future(server.serve_forever())
yield from server.serving
random_port = server.protocol._sock.getsockname()[1]
received = server.endpoint.datagram_received = Mock(wraps=server.endpoint.datagram_received)
Expand All @@ -469,7 +515,8 @@ def datagram_received(self, data, addr):

yield from asyncio.sleep(0.1)

received.assert_called_once()
if PYTHON_VERSION >= (3, 6):
received.assert_called_once()
self.assertEqual(received.call_args[0][0], data)

server.server_close()
Expand All @@ -483,7 +530,10 @@ def testUdpServerRoundtrip(self):
data = b"\x01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01" # unit 1, read register
expected_response = b'\x01\x00\x00\x00\x00\x05\x01\x03\x02\x00\x11' # value of 17 as per context
server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop)
server_task = asyncio.create_task(server.serve_forever())
if PYTHON_VERSION >= (3, 7):
server_task = asyncio.create_task(server.serve_forever())
else:
server_task = asyncio.ensure_future(server.serve_forever())
yield from server.serving

random_port = server.protocol._sock.getsockname()[1]
Expand Down Expand Up @@ -517,7 +567,10 @@ def testUdpServerException(self):
''' Test sending garbage data on a TCP socket should drop the connection '''
garbage = b'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF'
server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop)
server_task = asyncio.create_task(server.serve_forever())
if PYTHON_VERSION >= (3, 7):
server_task = asyncio.create_task(server.serve_forever())
else:
server_task = asyncio.ensure_future(server.serve_forever())
yield from server.serving
with patch('pymodbus.transaction.ModbusSocketFramer.processIncomingPacket',
new_callable=lambda: Mock(side_effect=Exception)) as process:
Expand Down Expand Up @@ -562,4 +615,4 @@ def testStopServer(self):
# Main
# --------------------------------------------------------------------------- #
if __name__ == "__main__":
asynctest.main()
asynctest.main()