-
Notifications
You must be signed in to change notification settings - Fork 0
/
collector.py
69 lines (54 loc) · 1.66 KB
/
collector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
'''
sudo chmod a+rw /dev/ttyUSB0
python3 collector.py
'''
import sqlite3
from utils.motor import SinamicV20
from pymodbus.client import ModbusSerialClient
print('Import successfully!')
# Initialize
PORT = '/dev/ttyUSB0'
METHOD = 'rtu'
STOPBITS = 1
BYTESIZE = 8
PARITY = 'N'
BAUDRATE = 9600
SLAVE_ID = 2
N_SAMPLES = 100
ID = 0
TABLE_NAME = 'sinamicv20'
DATABASE_PATH = 'data/inverter.db'
def generate_update_query_by_id(table_name, data_dict, id):
query = f"UPDATE {table_name} SET "
updates = []
for key, value in data_dict.items():
if isinstance(value, str):
updates.append(f"{key} = '{value}'")
else:
updates.append(f"{key} = {value}")
query += ", ".join(updates)
query += f" WHERE ID = {id}"
query += ";"
return query
if __name__ == "__main__":
client = ModbusSerialClient(method = METHOD,
port = PORT,
stopbits = STOPBITS,
bytesize = BYTESIZE,
parity = PARITY,
baudrate= BAUDRATE,
unit=1)
inverter = SinamicV20(client=client,slave_id=2)
conn = sqlite3.connect(DATABASE_PATH)
c = conn.cursor()
while True:
# get inverter values
dict_of_values = inverter.read_raw_all_address_convert_dict()
update_query = generate_update_query_by_id(TABLE_NAME, dict_of_values, ID)
try:
c.execute(update_query)
conn.commit()
except Exception as e:
print(e)
print(update_query)
conn.close()