-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
145 lines (112 loc) · 4.45 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import re
import serial
import time
from loguru import logger
from csv import writer
from csv import Error as CSV_Error
from os import path, rename
from config import CSV_FILE_PATH, SERIAL_PORT, BAUD_RATE, REDIS_HOST, REDIS_PORT, REDIS_KEY
from utils.fake_utils import get_fake_random_serial
from utils.log_utils import setup_logger
from utils.redis_utils import RedisClient
setup_logger(logger, "dispatcher-logs/{time}.log")
r = RedisClient(host=REDIS_HOST, port=REDIS_PORT, key=REDIS_KEY)
# validate_pattern = r"([+-]?\d+(\.?\d+)?)\,\s([+-]?\d+(\.?\d+)?)\,\s([+-]?\d+(\.?\d+)?)\,\s(\d{2}:\d{2}:\d{2})\,\s([+-]?\d+(\.?\d+)?)\,\s([+-]?\d+(\.?\d+)?)\,\s([+-]?\d+(\.?\d+)?)\,\s([+-]?\d+(\.?\d+)?)\,\s([+-]?\d+(\.?\d+)?)\,\s([+-]?\d+(\.?\d+)?)\,\s([+-]?\d+(\.?\d+)?)\,\s([+-]?\d+(\.?\d+)?)"
# validate_pattern = r"([+-]?(\d+)?(\.?\d+)?)\,\s([+-]?(\d+)?(\.?\d+)?)\,\s([+-]?(\d+)?(\.?\d+)?)\,(\s)?(\d{2}:\d{2}:\d{2})\,\s([+-]?(\d+)?(\.?\d+)?)\,\s([+-]?(\d+)?(\.?\d+)?)\,\s([+-]?(\d+)?(\.?\d+)?)\,\s([+-]?(\d+)?(\.?\d+)?)\,\s([+-]?(\d+)?(\.?\d+)?)\,\s([+-]?(\d+)?(\.?\d+)?)\,\s([+-]?(\d+)?(\.?\d+)?)\,\s([+-]?(\d+)?(\.?\d+)?)"
validate_pattern = r"(?i)((NaN)?([+-]?(\d+)?(\.?\d+)?)\,\s?){3}(\d{2}:\d{2}:\d{2})\,\s?((NaN)?([+-]?(\d+)?(\.?\d+)?)\,\s?){7}((NaN)?[+-]?(\d+)?(\.?\d+)?)"
def connect() -> serial:
idle: bool = True
ser = None
while idle:
try:
ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=1)
except Exception as e:
logger.error(e)
time.sleep(1)
if ser:
logger.success(f"Connected to {ser.port} ✅")
idle = False
return ser
def create_csv() -> None:
logger.info("Creating " + CSV_FILE_PATH)
try:
if path.exists(CSV_FILE_PATH):
new_name: str = f"{CSV_FILE_PATH.split('/')[0]}/{time.time()}-{CSV_FILE_PATH.split('/')[1]}"
rename(CSV_FILE_PATH, new_name)
logger.info(f"CSV file Renamed to: {new_name}")
with open(CSV_FILE_PATH, 'w', encoding='UTF8') as f:
wr = writer(f)
wr.writerow(
['Iteration', 'Latitude', 'Longitude', 'Elevation', 'Time', 'Temperature', 'Pressure', 'Elevation',
'Temperature', 'Humidity', 'Pm1', 'Pm2.5', 'Pm10'])
logger.success("Created " + CSV_FILE_PATH)
except CSV_Error as e:
logger.error("Failed to write csv headers:")
logger.error(e)
except OSError as e:
logger.error("Failed to create CSV file:")
logger.error(e)
except Exception as e:
logger.error(e)
def add_to_csv(row: list, iteration: int) -> None:
row.insert(0, iteration)
try:
with open(CSV_FILE_PATH, 'a', encoding='UTF8', newline='') as f:
wr = writer(f)
wr.writerow(row)
logger.success("Added data to " + CSV_FILE_PATH)
except CSV_Error as e:
logger.error("Failed to write csv data:")
logger.error(e)
except Exception as e:
logger.error(e)
def validate_serial(data: str) -> bool:
match = re.search(validate_pattern, data)
if not match:
logger.error("Invalid serial output")
return True if match else False
def normalize_serial(data: str) -> list:
t = data.split(",")
k = []
for i in t:
if ":" in i:
k.append(i)
elif "-" in i:
k.append(0)
elif "Nan" in i:
k.append(0)
elif "nan" in i:
k.append(0)
else:
k.append(float(i))
return k
@logger.catch
def main() -> None:
create_csv()
ser = None
iteration = 0
while True:
if not ser:
ser = connect()
logger.info(f"Iteration number: {iteration}")
iteration += 1
try:
response = ser.readline().decode("utf-8").strip()
# response = get_fake_random_serial()
logger.log("SERIAL", response)
if response and validate_serial(response):
data = normalize_serial(response)
logger.info(f"Received -> \n{data}")
add_to_csv(data, iteration)
try:
r.push_list(data)
logger.success(f"Pushed data to Redis")
except Exception as e:
logger.error(e)
except OSError as e:
ser = None
logger.error(f"Disconnected: {e}")
except Exception as e:
logger.error(e)
if __name__ == '__main__':
main()