-
Notifications
You must be signed in to change notification settings - Fork 0
/
Oasis.py
361 lines (306 loc) · 12.9 KB
/
Oasis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
#Python 3 class to control a Oasis 150 by Solid State Cooling Systems
#Date: 17 June 2021
#Author: Lars E. Borm
#E-mail: [email protected] or [email protected]
#Python version: 3.7.0
#Based on: Oasis 150 Product Manual Rev M27
#If you want to use Python 2, the "to_bytes" function to create the messages
#does not exist. replace it with "struckt.pack"
#similar for decoding: change "int.from_bytes" to "struct.pack"
import serial
import time
import warnings
from serial.tools import list_ports
# FIND SERIAL PORT
def find_address(identifier = None):
"""
Find the address of a serial device. It can either find the address using
an identifier given by the user or by manually unplugging and plugging in
the device.
Input:
`identifier`(str): Any attribute of the connection. Usually USB to Serial
converters use an FTDI chip. These chips store a number of attributes
like: name, serial number or manufacturer. This can be used to
identify a serial connection as long as it is unique. See the pyserial
list_ports.grep() function for more details.
Returns:
The function prints the address and serial number of the FTDI chip.
`port`(obj): Returns a pyserial port object. port.device stores the
address.
"""
found = False
if identifier != None:
port = [i for i in list(list_ports.grep(identifier))]
if len(port) == 1:
print('Device address: {}'.format(port[0].device))
found = True
elif len(port) == 0:
print('''No devices found using identifier: {}
\nContinue with manually finding USB address...\n'''.format(identifier))
else:
for p in connections:
print('{:15}| {:15} |{:15} |{:15} |{:15}'.format('Device', 'Name', 'Serial number', 'Manufacturer', 'Description') )
print('{:15}| {:15} |{:15} |{:15} |{:15}\n'.format(str(p.device), str(p.name), str(p.serial_number), str(p.manufacturer), str(p.description)))
raise Exception("""The input returned multiple devices, see above.""")
if found == False:
print('Performing manual USB address search.')
while True:
input(' Unplug the USB. Press Enter if unplugged...')
before = list_ports.comports()
input(' Plug in the USB. Press Enter if USB has been plugged in...')
after = list_ports.comports()
port = [i for i in after if i not in before]
if port != []:
break
print(' No port found. Try again.\n')
print('Device address: {}'.format(port[0].device))
try:
print('Device serial_number: {}'.format(port[0].serial_number))
except Exception:
print('Could not find serial number of device.')
return port[0]
class Oasis():
'''
Class to controll a Solid State Oasis 150
Probably also works for the newer models 160/070/180
However, temperature ranges should probably be adjusted.
Use the high level functions to control the Oasis:
-set_temp( celcius ) Set a required temperature
-read_set_point() Read the current set point
-read_temp() Read the current temperature
-check_error() Check for errors
Assumes the Oasis is set in Celcius mode.
'''
def __init__(self, address, name = ''):
'''
Input:
`address`: address of Oasis, '/dev/ttyUSBX' on linux.
'COMX' on windows.
`name`(str): Name to identify the Oasis for user (not necessary)
If connection is made "self.connected" will be set to True
'''
self.address = address
self.name = name
self.ser = serial.Serial(address, timeout = 2, baudrate = 9600)
self.connected = False
#9600 is the default baudrate for the Oasis
#Check for errors and check if a connection is made
try:
self.check_error()
self.connected = True
except Exception as e:
print('Could not connect to Oasis. Is it on and connected? Is the address correct?')
# Low level funcitons
def celsius_to_fahrenheit(self, C):
"""
Converts degree Celsius to degree Fahrenheit.
Input:
`C` (float or int): degree Celsius.
Returns:
Degree fahrenheit as float with one decimal.
"""
F = (C * (9/5)) + 32
return round(F, 1)
def fahrenheit_to_celsius(self, F):
"""
Converts degree Fahrenheit to degree Celcius.
Input:
`F` (float or int): degree Fahrenheit.
Returns:
Degree celcius as float with one decimal.
"""
C = (F - 32) * (5/9)
return round(C, 1)
# Composing, writing and reading commands
def TC_command(self, command, temperature = None):
"""
Function to format the command for the Oasis as hexadecimal byte
string.
Input:
`command`(int): Decimal representation of the command
`temperature`(float/int, optional): temperature in farenheid
"""
#command
command_byte = command.to_bytes(1, byteorder='little')
message = command_byte
#if the command is "set temperature", the temperature is added
if temperature != None:
temperature = int(temperature * 10)
temperature_byte = temperature.to_bytes(2, byteorder='little')
message += temperature_byte
return message
def write_message(self, message):
"""
Write message to the Oasis.
Input:
`message`: Message to sent to Oasis
"""
self.ser.write(message)
#The Oasis can handle up to 2 commands per second
time.sleep(0.55)
def read_response(self):
"""
Read response of the Oasis.
Output: response of the pump.
"""
n = self.ser.inWaiting()
response = self.ser.read(n)
return response
def resp_int(self, response):
"""
Interpret the respons of the Oasis
Input:
`response`: Oasis response read by "read_response" function
Depending on the respons the output is either:
option1: temperature value
option2: Status of thermal cube (True/False), Error code
"""
#Empty response
if len(response) == 0:
if raise_error == True:
raise Exception('Got an empty response from {}, is it connected and on?'.format(self.name))
else:
warnings.warn('Got an empty response from {}, is it connected and on? The program will likely crash now.'.format(self.name))
return False, 'Got an empty response from {}, is it connected and on?'.format(self.name)
#Afirmative command
if response[0] == 225:
return True
#Temperature response
if response[0] == 201 or response[0] == 193:
received_temp = int.from_bytes(response[1:], byteorder='little')
received_temp = received_temp / 10 #get decimal in right place
return received_temp
#Error code response
if response[0] == 200:
if int.from_bytes(response[1:], byteorder='little') == 0:
return [True, 'No errors on Oasis "{}"'.format(self.name)]
else:
error_dict = {1: "Tank level low",
2: "Spare",
4: "Temperature above alarm range",
10: "RTD Fault (temperature sensor error)",
20: "Pump current fault",
80: "Temperature below alarm range"}
error_code = int.from_bytes(response[1:], byteorder='little')
if error_code in error_dict:
return False, error_dict[error_code]
else:
return False, ('Unknown error code on Oasis "{}"'
.format(self.name))
else:
raise ValueError('Unknown response: "{}" from Oasis "{}"'
.format(response, self.name))
# High level functions
def set_temp(self, temp_C):
"""
Set the temperature of the Oasis. Checks for errors first.
Input:
`temp_C`: temperature to set in celcius.
"""
if not 10 <= temp_C <= 45:
raise ValueError('''temperature in C for {0} should be between 10C
and 45C, not {1}C'''
.format(self.name, temp_C))
#Check for errors
self.check_error(verbose = False)
#Flush output
self.read_response()
#Command
msg = self.TC_command(225, temperature = temp_C)
n_try = 1
while n_try <= 5:
#Send messages
self.write_message(msg)
#Check if set point is set correctly
set_point = self.read_set_point()
if (temp_C - 1) <= set_point <= (temp_C + 1): #Made a range in case there is an error in rounding up or down
return 'Oasis "{}" set to {}C'.format(self.name, temp_C)
else:
print('''Could not set Oasis {} to right temperature,\n
try: {}/5'''.format(self.name, n_try))
n_try += 1
print('''Failed to set temperature of Oasis "{}" to {}C,\n
Will check for errors'''.format(self.name, temp_C))
self.check_error(verbose = True)
raise Exception('Failed to set temperature of Oasis "{}" to {}C'
.format(self.name, temp_C))
def read_set_point(self):
"""
Read the set point of the Thermal cube
Output:
returns set point in Celcius
"""
#Flush output
self.read_response()
#Command
msg = self.TC_command(193)
#Send message
self.write_message(msg)
#Read response
response = self.read_response()
#interpret response
set_point = self.resp_int(response)
return set_point
def read_temp(self):
"""
CURRENT temperature of RTD sensor
Output:
current temperature in Celcius read by the RTD sensor
"""
#Flush output
self.read_response()
#Command
msg = self.TC_command(201)
#Send message
self.write_message(msg)
#Read message
response = self.read_response()
#interpret response
current_temp = self.resp_int(response)
return current_temp
def read_error(self):
"""
Cecks for errors and returns error code.
"""
#Flush output
self.read_response()
#Command
msg = self.TC_command(200)
#Send message
self.write_message(msg)
#Read message
response = self.read_response()
return response
def check_error(self, verbose = True, raise_error=True):
"""
Check for errors. Raises exeption if there is an error.
Retruns a warning if error is resolved after 100 seconds.
If verbose = True, will print that there is no error if there is no error.
"""
#Check for errors and interpret the response
error = self.resp_int(self.read_error())
if error[0] == True:
if verbose == True:
print(error[1])
return error
#See if error resolves itselve
else:
#Will check 5 times to see if the error persists.
print('Error from Oasis "{}": {}'.format(self.name, error[1]))
n_try = 2
while n_try <= 5:
time.sleep(10)
#Check for errors and interpret the response
error = self.resp_int(self.read_error())
if error[0] == True:
warnings.warn('''\nError resolved after {} tries.\n
!!! Check the Oasis "{}" to resolve the issue: "{}" !!!'''.format( n_try, self.name, error[1]))
break
else:
print('Error from Oasis "{}": {}, try {}/5'
.format(self.name, error[1], n_try))
n_try += 1
if raise_error == True:
raise Exception('Error from Oasis "{}": {}'.format(self.name, error[1]))
else:
return [False, '{}: {}'.format(self.name, error[1])]