-
Notifications
You must be signed in to change notification settings - Fork 0
/
tipnovus_class_api.py
194 lines (170 loc) · 6.65 KB
/
tipnovus_class_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
import sys
import serial
from logging_decor import *
from time import sleep
import re
#{{{ FORMAT COMMANDS
class FC:
def __init__(self, tp_unit, part_of_cmd_1, part_of_cmd_2=''):
self.tp_unit = tp_unit
self.part_of_cmd_1 = part_of_cmd_1
self.part_of_cmd_2 = part_of_cmd_2
@property
def run_cmds(self):
return f"0{self.tp_unit},TI,{self.part_of_cmd_1},{self.part_of_cmd_2},#"
@property
def util_cmds(self):
#if self.part_of_cmd_2 == '':
# return f"0{self.tp_unit},{self.part_of_cmd_1},#"
#else:
return f"0{self.tp_unit},{self.part_of_cmd_1},#"
def setparam(self, setvar):
self.setvar = setvar
return f"0{self.tp_unit},TI,{self.part_of_cmd_1},{self.part_of_cmd_2},{self.setvar}#"
@staticmethod
def to_secs(mins):
return mins*60
#}}}
#{{{ SEND COMMAND DICTIONARY
TP = '1'
WA = 'WA'
DR = 'DR'
send_cmd_dict = {
#key = command name, val1 = ascii command, val2 = time delay
'connect' : [FC(TP, '@').util_cmds, FC.to_secs(0.02)],
'ack' : [FC(TP, 'ACK,1').util_cmds, FC.to_secs(0.02)],
'nak' : [FC(TP, 'NAK,@').util_cmds, FC.to_secs(0.02)],
'opendoor_washer' : [FC(TP, WA, 'OD').run_cmds, FC.to_secs(0.04)],
'closedoor_washer' : [FC(TP, WA, 'CD').run_cmds, FC.to_secs(0.04)],
'opendoor_dryer' : [FC(TP, DR, 'OD').run_cmds, FC.to_secs(0.04)],
'closedoor_dryer' : [FC(TP, DR, 'CD').run_cmds, FC.to_secs(0.05)],
'start_dryer' : [FC(TP, DR, 'SD').run_cmds, FC.to_secs(0.07)],
'custom2_proc' : [FC(TP, WA, 'S2').run_cmds, FC.to_secs(0.07)],
'custom1_proc' : [FC(TP, WA, 'S1').run_cmds, FC.to_secs(0.07)],
'self_clean' : [FC(TP, WA, 'CL').run_cmds, FC.to_secs(0.05)],
'check_sensor' : [FC(TP, WA, 'SC').run_cmds, FC.to_secs(0.82)],
'waste_drain' : [FC(TP, WA, 'WD').run_cmds, FC.to_secs(0.04)],
'dply_wash' : [FC(TP, WA, 'WS').run_cmds, FC.to_secs(0.04)],
'dply_dryer' : [FC(TP, DR, 'DS').run_cmds, FC.to_secs(0.03)],
'abort_dryer' : [FC(TP, DR, 'AD').run_cmds, FC.to_secs(0.03)],
'abort_wash' : [FC(TP, WA, 'AW').run_cmds, FC.to_secs(0.02)],
'get_dtemp' : [FC(TP, DR, 'CT').run_cmds, FC.to_secs(0.02)],
'primeA' : [FC(TP, WA, 'PA').run_cmds, FC.to_secs(0.04)],
'primeDI' : [FC(TP, WA, 'PD').run_cmds, FC.to_secs(0.04)],
'set_dtime' : [FC(TP, DR, 'TM').setparam(10), FC.to_secs(0.02)],
'set_dtemp' : [FC(TP, DR, 'MT').setparam(50), FC.to_secs(0.02)],
'discon_resp' : [FC(TP, 'ACK,@').util_cmds, FC.to_secs(0.02)],
'ack2' : [FC(TP, 'ACK').util_cmds, FC.to_secs(0.02)],
'ack3' : [FC(TP, 'ACK,00').util_cmds, FC.to_secs(0.02)],
'un_op' : [FC(TP, 'ACK,0').util_cmds, FC.to_secs(0.02)]
}
#}}}
class tipnovus:
def __init__(self, str_command):
def check_setcmd(str_cmd):
if re.search('set_d(temp|time);\d{1,3}$', str_cmd):
return True
else:
return False
self.check = check_setcmd(str_command)
if str_command not in send_cmd_dict.keys():
if not self.check:
raise ValueError(f'that command does not exist! - {str_command}')
else:
self.str_command = str_command[:9]
self.setval = str_command[10:]
self.buffer_wait_time = send_cmd_dict[self.str_command][1]
else:
self.str_command = str_command
self.buffer_wait_time = send_cmd_dict[str_command][1]
@property
def word_command(self):
if self.check:
return f'{self.str_command};{self.setval}'
else:
return self.str_command
@word_command.setter
def word_commmand(self, command):
self.str_command = command
@property
def code_command(self):
if self.check:
return f'{send_cmd_dict[self.str_command][0][:12]}{self.setval}#'
else:
return send_cmd_dict[self.str_command][0]
@property
def encode_str_cmd(self):
if self.check:
return f'{send_cmd_dict[self.str_command][0][:12]}{self.setval}#'.encode()
else:
return send_cmd_dict[self.str_command][0].encode()
class tpserial:
def __init__(self, port):
self._ser = None
self._baudrate = 115200
self._port = port
self._timeout = 10
@property
def connect(self):
try:
self._ser = serial.Serial(port = self._port, baudrate = self._baudrate, timeout = self._timeout)
self._ser.reset_output_buffer
self._ser.reset_input_buffer
handle_logs(f"Connected to TipNovus! ({self._port})")
return self
except serial.SerialException as e:
handle_logs(('error',f"Error occured during serial connection - {e}"))
sys.exit(1)
@property
def disconnect(self):
if self._ser.isOpen():
self._ser.close()
handle_logs(f'Disconnecting from serial port {self._port}!')
@property
def is_connected(self):
try:
return self._ser.isOpen()
except:
return False
def write_cmd(self, byte_command):
self.byte_command = byte_command
try:
self._ser.write(byte_command)
tipnovus_logger.debug(f'Sent {byte_command} to serial device')
except Exception as e:
handle_logs(('error',f'Serial connection errored whilst sending {byte_command}: {e}'))
@property
def read_resp(self):
_response = self._ser.read(1)
while True:
n_ = self._ser.in_waiting
if not n_:
break
else:
sleep(0.36)
_response += self._ser.read(n_)
tipnovus_logger.debug(f'Read {_response.decode()} from serial device')
return _response.decode()
def __enter__(self):
try:
if self._ser == None:
self._ser = serial.Serial(port = self._port, baudrate = self._baudrate, timeout = self._timeout)
handle_logs(f'Connected to serial port {self._port}!')
else:
if self._ser.isOpen():
self._ser.close()
handle_logs('Disconnecting...')
else:
self._ser.open()
handle_logs(f'Connected to serial port {self._port}!')
return self._ser
except serial.SerialException as e:
handle_logs(f'Error occured: {e}')
sys.exit(1)
def __exit__(self, exc_type, exc_val, traceback):
self._ser.close()
# tip_clean_tasks = tipnovus('closedoor_dryer')
# tip_clean_tasks.buffer_wait_time
# tip_clean_tasks.encode_str_cmd
# tip_clean_tasks.word_command
# tip_clean_tasks.code_command