-
Notifications
You must be signed in to change notification settings - Fork 6
/
I2CDevice.py
116 lines (100 loc) · 3.54 KB
/
I2CDevice.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import os
import sys
import time
import errno
import fluidiscopeGlobVar as fg
import logging
if not fg.my_dev_flag:
from I2CBus import I2CBus
logger = logging.getLogger('UC2_I2CDEVICE')
class I2CDevice(object):
max_msg_len = 32
delim_strt = "*"
delim_stop = "#"
delim_cmds = ";"
delim_inst = "+"
com_cmds = {"STATUS": "STATUS", "LOGOFF": "LOGOFF", "NAME": "NAME"}
def __init__(self, address):
if fg.my_dev_flag:
adress = 0
else:
try:
address = int(address)
except ValueError:
raise fluidException("Invalid address format.")
self.address = address
self.name = "unknown"
self.outBuffer = ""
def isEmpty(self, byte_list):
if (byte_list):
cutoff = len(self.delim_strt) + len(self.delim_stop)
byte_list = byte_list[cutoff:]
return (byte_list.count(255) == len(byte_list))
return False
def requestEvent(self):
try:
byte_msg = I2CBus.defaultBus.read_i2c_block_data(
self.address, 0, I2CDevice.max_msg_len)
except Exception as e:
print("Unknown error {0} occured on request on address {1}".format(
e, hex(self.address)))
return
if self.isEmpty(byte_msg):
print("Device on address {0} is not responding. (message is empty).".format(
hex(self.address)))
else:
try:
str_msg = [chr(x) for x in byte_msg]
strt = min(i for i, c in enumerate(
str_msg) if c == self.delim_strt)
stop = max(i for i, c in enumerate(
str_msg) if c == self.delim_stop)
str_msg = str_msg[strt + 1:stop]
str_msg = "".join(str_msg)
except Exception as e:
print(
"Unknown error {0} occured on decoding of received response on address {1}. Is start or stop signal missing?".format(
e, hex(self.address)))
return
return str_msg
return
def sendEvent(self, value):
try:
self.outBuffer = self.delim_strt + str(value) + self.delim_stop
self.outBuffer = [ord(x) for x in self.outBuffer]
I2CBus.defaultBus.write_i2c_block_data(
self.address, 1, self.outBuffer)
self.outBuffer = ""
except Exception as e:
print("Unknown error {0} occured on send to address {1}".format(
e, hex(self.address)))
return
def extractCommand(self, args):
cmd = ""
delim = I2CDevice.delim_inst
for i, arg in enumerate(args):
if type(arg) == list:
sep = [str(x) for x in arg]
cmd += delim.join(sep)
else:
cmd += str(arg)
cmd += delim
return cmd[:-1]
def send(self, *args):
cmd = self.extractCommand(args)
if fg.my_dev_flag:
print(
"Debugging mode. Generated Command=[{}] has not been sent.".format(cmd))
else:
print("Sending: {0}".format(cmd))
logger.info("Sending: {0}".format(cmd))
self.sendEvent(cmd)
def request(self):
ans = self.requestEvent()
if ans and (ans != "BUSY"):
print("Receiving: {0}".format(ans))
return ans
def sendCommand(self, *args):
self.send(*args)
time.sleep(0.0025)
return self.request()