-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
322 lines (280 loc) · 12.3 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
from flask_restful import Resource, Api
from flask import render_template, Response, jsonify, redirect, request, flash, send_from_directory, abort
from eTape_sensor.pi_camera import *
from eTape_sensor.sensor_funcs import *
from eTape_sensor.phidget_rfid import *
import signal
import json
from main_funcs import *
from tp_schemas import *
from eTape_sensor.video_thread import *
from logging_decor import *
api = Api(app)
picam = Camera()
record_video_cls = None
vd_thread = None
app.config['SECRET_KEY'] = 'SECRET'
app.config['VIDEO_DIR'] = os.path.join(fpath, 'recorded_videos', 'mp4_format')
app.config['IPADDR'] = '192.168.1.212'
#{{{ ETAPE SENSOR
class waste_check_5L_carboy(Resource):
def get(self):
small_carboy_logs = create_logger('small_5L_carboy')
volt_sm = check_voltage('small', small_carboy_logs)
model_sm = poly_reg_model('model_sm.sav')
pred_volume_sm = check_prediction(model_sm.predict([[volt_sm]])[0])
small_carboy_logs.info(f'<< API_call >> pred_vol_small_carboy: {pred_volume_sm}')
return {'small_carboy_voltage': volt_sm, 'small_carboy_predicted_vol' : pred_volume_sm}
class waste_check_20L_carboy(Resource):
def get(self):
large_carboy_logs = create_logger('large_20L_carboy')
volt_lg = check_voltage('large', large_carboy_logs)
model_lg = poly_reg_model('model_lg.sav')
pred_volume_lg = check_prediction(model_lg.predict([[volt_lg]])[0])
large_carboy_logs.info(f'<< API_call >> pred_vol_large_carboy: {pred_volume_lg}')
return {'large_carboy_voltage' : volt_lg, 'large_carboy_predicted_vol' : pred_volume_lg}
#}}}
@app.route("/<status>")
def onAction(status):
'''
turn on lamp switch
'''
if status == "on":
GPIO.output (pins['pin24'], GPIO.HIGH)
print ("light is on - from flask route")
if status == "off":
GPIO.output(pins['pin24'], GPIO.LOW)
print("light is off - from flask route")
return jsonify({"pin" : pins['pin24'], "status" : status})
class record_video_stream_on(Resource):
def put(self):
global record_video_cls, vd_thread
vid_status = {}
vid_status['time'] = request.form['time']
vid_status['wrkflow_name'] = request.form['wrkflow_name']
if validate_trigger_cmd(vid_status):
record_video_cls = RecordVideoClass()
vd_thread = ThreadWithReturnValue(target=record_video_cls.record_video, args=(vid_status['time'], vid_status['wrkflow_name']))
vd_thread.start()
return {'response' : 'video recording...'}, 201
else:
abort_if_invalid({'response' : f'error involving argument validation of time and workflow name - {vid_status}'})
class record_video_stream_off(Resource):
def get(self):
global record_video_cls, vd_thread
#print(vd_thread.is_alive())
record_video_cls.terminate()
# join with a thread, which waits for it to terminate
# This blocks the calling thread until the thread whose
# join() method is called terminates
file_path = vd_thread.join()
return {'response': file_path}, 200
@app.route('/tp_ser_wbsrv/video_feed')
def video_feed():
return render_template('video_feed.html')
@app.route('/tp_ser_wbsrv')
@app.route('/')
def video_recording():
return render_template('index.html')
@app.route('/tp_ser_wbsrv/filter_by_date', methods=["POST"])
def filter_by_date():
date = request.form['date-picker']
print(date)
if not date:
msg = "Enter a date to filter by"
print(msg)
flash(msg, 'warning')
return redirect('/')
filtered_lst = filter_func(date)
#today = datetime.strptime(get_time(), '%Y-%b-%d %H:%M:%S')
today = datetime.now()
if datetime.strptime(date, dt_fmt).day > today.day:
msg = f"{date} is in the future!"
print(msg)
flash(msg, 'warning')
return redirect('/')
if not filtered_lst:
msg = f"{date} didn't contain any recordings"
print(msg)
flash(msg, 'warning')
return redirect('/')
return render_template('index.html', listdir=filtered_lst, ipaddr = app.config['IPADDR'])
@app.route('/tp_ser_wbsrv/video/<filename>')
def show_video(filename):
try:
#print(f"{app.config['VIDEO_DIR']}/{filename}")
return send_from_directory(app.config['VIDEO_DIR'], filename=filename)
except FileNotFoundError:
abort(404)
@app.route('/tp_ser_wbsrv/video_feed/start')
def start():
global picam
picam.start()
btn_res = request.args.get('btn_type')
return jsonify({'btn' : btn_res})
@app.route('/tp_ser_wbsrv/video_feed/stop')
def stop():
global picam
picam.stop()
btn_res = request.args.get('btn_type')
return jsonify({'btn' : btn_res})
@app.route('/videofeed')
def videofeed():
return Response(gen(picam), mimetype='multipart/x-mixed-replace; boundary=frame')
#
#@app.route('/output_test')
#def output_test():
# def inner():
# for x in range(100):
# time.sleep(1)
# json_data = json.dumps({'data' : x})
# yield f"data:{x}\n\n"
# return Response(inner(), mimetype = 'text/event-stream')
#@app.route('/console_output')
#def console_output():
# def inner_co():
# #main() was the tipnovus_api_v3.py fx
# return Response(inner_co(), mimetype='text/event-stream')
#@app.route('/tp_ser_wbsrv/display_std_output')
#def dply_output():
# return render_template('console_output.html' )
#}}}
class dmsokeg(Resource):
def get(self):
rftag = read_tag()
serial_nbr, tare_wt = get_tare_weight(rftag)
gross_wt = float(read_scale())
output = f'func: {self.__class__.__name__}_{self.get.__name__}', f"gross weight: {gross_wt}", f"tare weight: {tare_wt}", f"serial number: {serial_nbr}"
handle_logs(output)
return {'rfid tag': rftag, 'gross weight' : gross_wt, 'tare weight': tare_wt, 'serial number': serial_nbr}
#{{{ TPSerWebServ class
class tp_wbsrv_upd(Resource):
def put(self, cmd):
# try:
# current_ts = get_time()
# except Exception:
current_ts = datetime.now().strftime('%G-%b-%d %H:%M:%S')
input_cmd_dict = {'cmd' : cmd, \
'code_cmd' : request.form['code_cmd']}
schema_check = False
req_setval = request.form.get('setval', False)
if req_setval:
tpsetcmd_schema = tp_ser_check_setcmd_schema()
try:
tpsetcmd_schema.load({'cmd_' : cmd}) #check if set_d -type of command
validate_val(cmd, input_cmd_dict['code_cmd'], request.form['setval']) #check if the code_cmd is valid
validate_val(cmd, request.form['response'], request.form['setval']) #check if the response string is valid
input_cmd_dict['response'] = "01,ACK,#" #temporarily set to a valid response to parse in marshmallow schema
input_cmd_dict['setval'] = request.form['setval']
tpcmd_schema.load(input_cmd_dict) #load in main schema to check ranges of setval; with fake response since it will complain if it is a set cmd (won't match up with the fixed list)
input_cmd_dict['response'] = request.form['response'] #overwrite the response to the real one
schema_check = True
except ValidationError as err:
pprint(err.messages)
output = f'func: {self.__class__.__name__}_{self.put.__name__}', f"error: {err.messages}"
handle_logs(output)
else:
if 'set_' in cmd:
msg = "Required 'setval' argument missing"
raise ValidationError(msg)
handle_logs(msg)
try: #to load the cmd using the marshmallow schema defined above
input_cmd_dict['response'] = request.form['response']
tpcmd_schema.load(input_cmd_dict)
schema_check = True
except ValidationError as err:
pprint(err.messages)
pprint(f'valid data: {err.valid_data}')
output = f'func: {self.__class__.__name__}_{self.put.__name__}', f"error: {err.messages}"
handle_logs(output)
if schema_check:
if len(input_cmd_dict.keys()) > 3:
cmd = cmd + input_cmd_dict['setval'].split(';')[1]
print(input_cmd_dict)
change = update_data(current_ts, cmd, input_cmd_dict['code_cmd'], input_cmd_dict['response'])
output = f'func: {self.__class__.__name__}_{self.put.__name__}', f'ts: {current_ts}',f'sent: {cmd}', f"response: {input_cmd_dict['response']}"
handle_logs(output)
return change, 201
else:
if req_setval:
input_cmd_dict['setval'] = request.form['setval']
abort_if_invalid(input_cmd_dict)
class tp_ser_wbsrv_response(Resource):
# get the response
def get(self):
with tpdb(tp_db_filepath) as db:
res = db.queryone("SELECT response FROM CMDRESPONSE")
output = f'func: {self.__class__.__name__}_{self.get.__name__}', f"response: {res}"
handle_logs(output)
return {'response' : res}
class tp_ser_wbsrv(Resource):
# issuing commands
def put(self, cmd):
schema_check, data_dict = ref_fx_cmd_proc(cmd, send_cmd) #send cmd is a function
if schema_check:
#then acknowledge
schema_check, ack_dict = ref_fx_cmd_proc(cmd, ack_cmd)
if schema_check and data_dict['response']:
#if cmd in ['dply_wash', 'dply_dryer', 'check_sensor'] or cmd.startswith('set_dt'):
return ack_dict, 201
#return data_dict, 201
else:
return data_dict, 502
else:
abort_if_invalid(data_dict)
class tp_ser_wbsrv_check_con(Resource):
# check connection of tip novus
def get(self):
try:
bool_connected = check_conn()
except AttributeError as e:
bool_connected = False
output = f'func: {self.__class__.__name__}_{self.get.__name__}', f'is_connected? = {bool_connected}'
return {'response' : bool_connected}
class tp_ser_wbsrv_con(Resource):
# connect to tip novus
def get(self):
s, r = connect_tp()
output = f'func: {self.__class__.__name__}_{self.get.__name__}', f'sent: {s}', f"response: {r}"
handle_logs(output)
return {'sent' : s, 'response' : r}
class tp_ser_wbsrv_discon(Resource):
# disconnect from tip novus
def get(self):
s, s2, r = disconnect_tp()
output = f'func: {self.__class__.__name__}_{self.get.__name__}', f'sent: {s} {s2}', f"response: {r}"
handle_logs(output)
return {'sent' : f'{s} {s2}', 'response' : r}
#}}}
class tp_ser_wbsrv_cmds(Resource):
# get list of available commands
def get(self):
return send_cmd_dict
#{{{ TIP NOVUS API
api.add_resource(tp_ser_wbsrv_con, '/tp_ser_wbsrv/connect')
api.add_resource(tp_ser_wbsrv_check_con, '/tp_ser_wbsrv/check_con')
api.add_resource(tp_ser_wbsrv_discon, '/tp_ser_wbsrv/disconnect')
api.add_resource(tp_ser_wbsrv_response, '/tp_ser_wbsrv/response')
api.add_resource(tp_wbsrv_upd, '/tp_ser_wbsrv/update/<string:cmd>') # change/update the command andrepsonse string
api.add_resource(tp_ser_wbsrv, '/tp_ser_wbsrv/<string:cmd>') # issue commands to tp
api.add_resource(tp_ser_wbsrv_cmds, '/tp_ser_wbsrv/cmds') #get list of valid commands
#}}}
#{{{ ETAPE SENSOR API SOURCE
api.add_resource(waste_check_5L_carboy, '/tp_ser_wbsrv/carboy/5L')
api.add_resource(waste_check_20L_carboy, '/tp_ser_wbsrv/carboy/20L')
api.add_resource(record_video_stream_on, '/tp_ser_wbsrv/record_video_on') #start video recording
api.add_resource(record_video_stream_off, '/tp_ser_wbsrv/record_video_off') #stop video recording
api.add_resource(dmsokeg, '/tp_ser_wbsrv/dmsokeg') #dmso keg tare wt, gross wt, serial number
#}}}
signal.signal(signal.SIGINT, signal_handler)
check_waste_volume = threading.Event()
continue_logging_tp = threading.Event()
bkg_thread = threading.Thread(name = 'bkg_led_indicator', target = bkg_etape, args=(check_waste_volume,))
bkg_thread.setDaemon(True)
bkg_thread.start()
bkg_tp_logging = threading.Thread(name = 'bkg_tp_logging', target = bkg_tp_log, args=(continue_logging_tp,))
bkg_tp_logging.setDaemon(True)
bkg_tp_logging.start()
if __name__ == '__main__':
background_check_volume()
app.run(debug=True, host='0.0.0.0', threaded=True)