-
Notifications
You must be signed in to change notification settings - Fork 956
/
Copy pathasynchronous-asyncio-serial-client.py
executable file
·116 lines (95 loc) · 5.16 KB
/
asynchronous-asyncio-serial-client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import asyncio
from serial_asyncio import create_serial_connection
from pymodbus.client.sync import ModbusSerialClient as ModbusClient
from pymodbus.client.async_asyncio import ModbusClientProtocol
from pymodbus.transaction import ModbusAsciiFramer, ModbusRtuFramer
from pymodbus.factory import ClientDecoder
#---------------------------------------------------------------------------#
# configure the client logging
#---------------------------------------------------------------------------#
import logging
logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.DEBUG)
async def start_async_test(client):
#---------------------------------------------------------------------------#
# specify slave to query
#---------------------------------------------------------------------------#
# The slave to query is specified in an optional parameter for each
# individual request. This can be done by specifying the `unit` parameter
# which defaults to `0x00`
#---------------------------------------------------------------------------#
log.debug("Reading Coils")
rr = client.read_coils(1, 1, unit=0x01)
#---------------------------------------------------------------------------#
# example requests
#---------------------------------------------------------------------------#
# simply call the methods that you would like to use. An example session
# is displayed below along with some assert checks. Note that some modbus
# implementations differentiate holding/input discrete/coils and as such
# you will not be able to write to these, therefore the starting values
# are not known to these tests. Furthermore, some use the same memory
# blocks for the two sets, so a change to one is a change to the other.
# Keep both of these cases in mind when testing as the following will
# _only_ pass with the supplied async modbus server (script supplied).
#---------------------------------------------------------------------------#
log.debug("Write to a Coil and read back")
rq = await client.write_coil(0, True, unit=1)
rr = await client.read_coils(0, 1, unit=1)
assert(rq.function_code < 0x80) # test that we are not an error
assert(rr.bits[0] == True) # test the expected value
log.debug("Write to multiple coils and read back- test 1")
rq = await client.write_coils(1, [True]*8, unit=1)
assert(rq.function_code < 0x80) # test that we are not an error
rr = await client.read_coils(1, 21, unit=1)
assert(rr.function_code < 0x80) # test that we are not an error
resp = [True]*21
# If the returned output quantity is not a multiple of eight,
# the remaining bits in the final data byte will be padded with zeros
# (toward the high order end of the byte).
resp.extend([False]*3)
assert(rr.bits == resp) # test the expected value
log.debug("Write to multiple coils and read back - test 2")
rq = await client.write_coils(1, [False]*8, unit=1)
rr = await client.read_coils(1, 8, unit=1)
assert(rq.function_code < 0x80) # test that we are not an error
assert(rr.bits == [False]*8) # test the expected value
log.debug("Read discrete inputs")
rr = await client.read_discrete_inputs(0, 8, unit=1)
assert(rq.function_code < 0x80) # test that we are not an error
log.debug("Write to a holding register and read back")
rq = await client.write_register(1, 10, unit=1)
rr = await client.read_holding_registers(1, 1, unit=1)
assert(rq.function_code < 0x80) # test that we are not an error
assert(rr.registers[0] == 10) # test the expected value
log.debug("Write to multiple holding registers and read back")
rq = await client.write_registers(1, [10]*8, unit=1)
rr = await client.read_holding_registers(1, 8, unit=1)
assert(rq.function_code < 0x80) # test that we are not an error
assert(rr.registers == [10]*8) # test the expected value
log.debug("Read input registers")
rr = await client.read_input_registers(1, 8, unit=1)
assert(rq.function_code < 0x80) # test that we are not an error
arguments = {
'read_address': 1,
'read_count': 8,
'write_address': 1,
'write_registers': [20]*8,
}
log.debug("Read write registeres simulataneously")
rq = await client.readwrite_registers(unit=1, **arguments)
rr = await client.read_holding_registers(1, 8, unit=1)
assert(rq.function_code < 0x80) # test that we are not an error
assert(rq.registers == [20]*8) # test the expected value
assert(rr.registers == [20]*8) # test the expected value
# create_serial_connection doesn't allow to pass arguments
# to protocol so that this is kind of workaround
def make_protocol():
return ModbusClientProtocol(framer=ModbusAsciiFramer(ClientDecoder()))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
coro = create_serial_connection(loop, make_protocol, '/dev/ttyUSB0',
baudrate=9600)
transport, protocol = loop.run_until_complete(asyncio.gather(coro))[0]
loop.run_until_complete(start_async_test(protocol))
loop.close()