forked from sfera-labs/exo-sense-py-modbus
-
Notifications
You must be signed in to change notification settings - Fork 48
/
rtu_client_example.py
138 lines (118 loc) · 4.23 KB
/
rtu_client_example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
Main script
Do your stuff here, this file is similar to the loop() function on Arduino
Create a Modbus RTU client (slave) which can be requested for data or set with
specific values by a host device.
The RTU communication pins can be choosen freely (check MicroPython device/
port specific limitations).
The register definitions of the client as well as its connection settings like
bus address and UART communication speed can be defined by the user.
"""
# import modbus client classes
from umodbus.serial import ModbusRTU
IS_DOCKER_MICROPYTHON = False
try:
import machine
machine.reset_cause()
except ImportError:
raise Exception('Unable to import machine, are all fakes available?')
except AttributeError:
# machine fake class has no "reset_cause" function
IS_DOCKER_MICROPYTHON = True
import json
# ===============================================
# RTU Slave setup
# act as client, provide Modbus data via RTU to a host device
# ModbusRTU can get serial requests from a host device to provide/set data
# check MicroPython UART documentation
# https://docs.micropython.org/en/latest/library/machine.UART.html
# for Device/Port specific setup
# RP2 needs "rtu_pins = (Pin(4), Pin(5))" whereas ESP32 can use any pin
# the following example is for an ESP32
rtu_pins = (25, 26) # (TX, RX)
slave_addr = 10 # address on bus as client
baudrate = 9600
client = ModbusRTU(
addr=slave_addr, # address on bus
pins=rtu_pins, # given as tuple (TX, RX)
baudrate=baudrate, # optional, default 9600
# data_bits=8, # optional, default 8
# stop_bits=1, # optional, default 1
# parity=None, # optional, default None
# ctrl_pin=12, # optional, control DE/RE
# uart_id=1 # optional, see port specific documentation
)
if IS_DOCKER_MICROPYTHON:
# works only with fake machine UART
assert client._itf._uart._is_server is True
def reset_data_registers_cb(reg_type, address, val):
# usage of global isn't great, but okay for an example
global client
global register_definitions
print('Resetting register data to default values ...')
client.setup_registers(registers=register_definitions)
print('Default values restored')
# common slave register setup, to be used with the Master example above
register_definitions = {
"COILS": {
"RESET_REGISTER_DATA_COIL": {
"register": 42,
"len": 1,
"val": 0
},
"EXAMPLE_COIL": {
"register": 123,
"len": 1,
"val": 1
}
},
"HREGS": {
"EXAMPLE_HREG": {
"register": 93,
"len": 1,
"val": 19
}
},
"ISTS": {
"EXAMPLE_ISTS": {
"register": 67,
"len": 1,
"val": 0
}
},
"IREGS": {
"EXAMPLE_IREG": {
"register": 10,
"len": 1,
"val": 60001
}
}
}
# alternatively the register definitions can also be loaded from a JSON file
# this is always done if Docker is used for testing purpose in order to keep
# the client registers in sync with the test registers
if IS_DOCKER_MICROPYTHON:
with open('registers/example.json', 'r') as file:
register_definitions = json.load(file)
# reset all registers back to their default value with a callback
register_definitions['COILS']['RESET_REGISTER_DATA_COIL']['on_set_cb'] = \
reset_data_registers_cb
print('Setting up registers ...')
# use the defined values of each register type provided by register_definitions
client.setup_registers(registers=register_definitions)
# alternatively use dummy default values (True for bool regs, 999 otherwise)
# client.setup_registers(registers=register_definitions, use_default_vals=True)
print('Register setup done')
print('Serving as RTU client on address {} at {} baud'.
format(slave_addr, baudrate))
while True:
try:
result = client.process()
except KeyboardInterrupt:
print('KeyboardInterrupt, stopping RTU client...')
break
except Exception as e:
print('Exception during execution: {}'.format(e))
print("Finished providing/accepting data as client")