-
Notifications
You must be signed in to change notification settings - Fork 0
/
parameter_manager.py
106 lines (94 loc) · 4.3 KB
/
parameter_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
from serial_manager import SerialManager
class ParameterManager:
def __init__(self, serial_manager):
self.serial_manager = serial_manager
self.serial_connection = None
self.serial_connected = False
def open_serial_connection(self):
try:
esp32_port = self.serial_manager.find_esp32_port()
self.serial_connection = self.serial_manager.open_connection(esp32_port)
print(f"Opened {esp32_port}")
self.serial_connected = True
except IOError as e:
print(e)
def get_serial_connection_name(self):
if self.serial_connection:
return self.serial_connection.name
return "/!\\ NOT CONNECTED"
def close_serial_connection(self):
if self.serial_connected and self.serial_connection:
self.serial_connection.close()
print("Serial connection closed.")
self.serial_connected = False
else:
print("No serial connection to close.")
def set_parameter(self, name, value):
command = f"{name} {value}\n"
self.serial_manager.write_to_serial(command.encode())
def get_parameter(self, name):
command = f"{name}\n"
self.serial_manager.write_to_serial(command.encode())
if self.serial_manager.esp32_connected:
try:
response = self.serial_manager.serial_connection.readline().decode().strip()
except self.serial_manager.serial_connection.serialutil.SerialException:
print("Timeout: No response received from the serial port")
return "Unexpected", None
if response:
parts = response.split(' ')
if len(parts) == 2:
parameter_name, parameter_value = parts
# Process the received parameter update
print(f"Received parameter '{parameter_name}' = {parameter_value}")
return parameter_name, parameter_value
else:
print(f"ERROR: response does not contain 2 values: {response}")
return "Unexpected", response
else:
print(f"Error receiving parameter. '{response}' received")
return "parameter manager error", response
else:
print(f"ESP32 not connected")
return name, "0"
'''
def get_parameter(self, name):
command = f"{name}\n"
self.serial_manager.write_to_serial(command.encode())
# response = self.serial_connection.readline().decode().strip()
### NEW CODE
try:
response = self.serial_connection.readline().decode().strip()
except self.serial_connection.serialutil.SerialException:
print("Timeout: No response received from the serial port")
return "Unexpected", None
### END NEW CODE
if response:
parts = response.split(' ')
if len(parts) == 2:
parameter_name, parameter_value = parts
# Process the received parameter update
print(f"Received parameter '{parameter_name}' = {parameter_value}")
return parameter_name, parameter_value
else:
print(f"ERROR: response does not contain 2 values: {response}")
return "Unexpected", response
else:
print(f"Error receiving parameter. '{response}' received")
return "parameter manager error", response
'''
def check_parameter_updates(self):
parameter_updates = []
if self.serial_manager.esp32_connected:
while self.serial_connection.in_waiting:
received_data = self.serial_connection.readline().decode().strip()
if received_data:
parts = received_data.split(' ')
if len(parts) == 2:
parameter_name, parameter_value = parts
# Process the received parameter update
print(f"Received parameter update: {parameter_name} = {parameter_value}")
parameter_updates.append((parameter_name, parameter_value))
else:
print(f"Error parameter updates. '{received_data}' received")
return parameter_updates