-
-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathtcp.py
295 lines (236 loc) · 9.65 KB
/
tcp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
"""Data model and conversions for tcp-based communication with the Omnik Inverter."""
from collections.abc import Generator
from ctypes import BigEndianStructure, c_char, c_ubyte, c_uint, c_ushort
from typing import Any, Optional
from .const import LOGGER
from .exceptions import OmnikInverterPacketInvalidError
MESSAGE_START = 0x68
MESSAGE_END = 0x16
MESSAGE_SEND_SEP = 0x40
MESSAGE_RECV_SEP = 0x41
MESSAGE_TYPE_INFORMATION_REQUEST = 0x30
MESSAGE_TYPE_INFORMATION_REPLY = 0xB0
MESSAGE_TYPE_STRING = 0xF0 # Message seems to consist of pure text
UINT16_MAX = 65535
# Message length field does not include the "header":
# - Length (1 byte)
# - Separator (1 byte)
# - Message type (1 byte)
# - Repeated integer serial number (2 * 4 bytes)
# - CRC in suffix (1 byte)
MESSAGE_HEADER_SIZE = 3 + 2 * 4 + 1
class _AcOutput(BigEndianStructure):
_fields_ = [
("frequency", c_ushort),
("power", c_ushort),
]
def get_power(self) -> Optional[int]:
"""Retrieve AC power.
Returns:
The power field, or None if it is unset.
"""
return None if self.power == UINT16_MAX else self.power
def get_frequency(self) -> Optional[float]:
"""Retrieve AC frequency.
Returns:
The frequency field in Hertz, or None if it is unset.
"""
return None if self.frequency == UINT16_MAX else self.frequency * 0.01
class _TcpData(BigEndianStructure):
_pack_ = 1
_fields_ = [
("padding0", c_char * 3),
("serial_number", c_char * 16),
("temperature", c_ushort),
("dc_input_voltage", c_ushort * 3),
("dc_input_current", c_ushort * 3),
("ac_output_current", c_ushort * 3),
("ac_output_voltage", c_ushort * 3),
("ac_output", _AcOutput * 3),
("solar_energy_today", c_ushort),
("solar_energy_total", c_uint),
("solar_hours_total", c_uint),
("inverter_active", c_ushort),
("padding1", c_ubyte * 4),
("unknown0", c_ushort),
("padding2", c_ubyte * 10),
("firmware", c_char * 16),
("padding3", c_ubyte * 4),
("firmware_slave", c_char * 16),
("padding4", c_ubyte * 4),
]
def _pack_message(
message_type: int, serial_number: int, message: bytearray
) -> bytearray:
# Prepend message "header"
request_data = bytearray([len(message), MESSAGE_SEND_SEP, message_type])
serial_bytes = serial_number.to_bytes(4, "little")
request_data.extend(serial_bytes)
request_data.extend(serial_bytes)
request_data.extend(message)
checksum = sum(request_data) & 0xFF
# Finally, prepend message with a start marker, and append the CRC
# and end marker.
request_data.insert(0, MESSAGE_START)
request_data.append(checksum)
request_data.append(MESSAGE_END)
return request_data
def _unpack_message(message: bytearray) -> tuple[int, int, bytearray]:
LOGGER.debug("Handling message `%s`", message)
message_checksum = message.pop()
checksum = sum(message) & 0xFF
if message_checksum != checksum:
raise OmnikInverterPacketInvalidError(
f"Checksum mismatch (calculated `{checksum}` got `{message_checksum}`)"
)
# Now that the checksum has been computed remove the length,
# separator, message type and repeated serial number
length = message.pop(0) # Length
if message.pop(0) != MESSAGE_RECV_SEP:
raise OmnikInverterPacketInvalidError("Invalid receiver separator")
message_type = message.pop(0)
LOGGER.debug(
"Message type %02x, length %s, checksum %02x", message_type, length, checksum
)
serial0 = int.from_bytes(message[:4], "little")
serial1 = int.from_bytes(message[4:8], "little")
if serial0 != serial1:
raise OmnikInverterPacketInvalidError(
f"Serial number mismatch in reply {serial0} != {serial1}"
)
return (message_type, serial0, message[8:])
def _unpack_messages(
data: bytearray,
) -> Generator[tuple[int, int, bytearray], None, None]:
while len(data):
message_start = data.pop(0)
# Whenever my Omnik sends an INFORMATION_REPLY followed by a STRING
# text message, there's a bunch of trailing 0xFF garbage
if message_start == 0xFF: # pragma: no cover
if not all(d == 0xFF for d in data):
raise OmnikInverterPacketInvalidError(
"(Next) message starts with `0xFF` but the remainder "
f"is not strictly 0xFF: {data}"
)
# We're done
return
if message_start != MESSAGE_START:
raise OmnikInverterPacketInvalidError("Invalid start byte")
length = data[0] + MESSAGE_HEADER_SIZE
message = data[:length]
if len(message) != length:
raise OmnikInverterPacketInvalidError(
f"Could only read {len(message)} out of {length} "
"expected bytes from TCP stream",
)
yield _unpack_message(message)
# Prepare for the next message by stripping off the end byte
data = data[length:]
if data.pop(0) != MESSAGE_END:
raise OmnikInverterPacketInvalidError("Invalid end byte")
def create_information_request(serial_number: int) -> bytearray:
"""Compute a magic message to which the Omnik will reply with raw statistics.
Args:
serial_number: Integer with the serial number of your Omnik device.
Returns:
A bytearray with the raw message data, to be sent over a TCP socket.
"""
return _pack_message(
MESSAGE_TYPE_INFORMATION_REQUEST, serial_number, bytearray([0x01, 0x00])
)
def parse_messages(serial_number: int, data: bytes) -> dict[str, Any]:
"""Perform a raw TCP request to the Omnik device.
Args:
serial_number: Serial number passed to
`clk.create_information_request()`, used to validate the reply.
data: Raw data reply from the Omnik Inverter.
Returns:
A Python dictionary (text) with the response from
the Omnik Inverter.
Raises:
OmnikInverterPacketInvalidError: Received data fails basic validity checks.
"""
info = None
for (message_type, reply_serial_number, message) in _unpack_messages(
bytearray(data)
):
if reply_serial_number != serial_number: # pragma: no cover
# This is allowed as it does not seem to be required to pass the serial
# number in the request - though empirical testing has to point out whether
# the request takes longer this way.
LOGGER.debug(
"Replied serial number %s not equal to request %s",
reply_serial_number,
serial_number,
)
if message_type == MESSAGE_TYPE_INFORMATION_REPLY:
if info is not None: # pragma: no cover
LOGGER.warning("Omnik sent multiple INFORMATION_REPLY messages")
info = _parse_information_reply(message)
elif message_type == MESSAGE_TYPE_STRING: # pragma: no cover
LOGGER.warning(
"Omnik sent text message `%s`", message.decode("utf8").strip()
)
else:
raise OmnikInverterPacketInvalidError(
f"Unknown Omnik message type {message_type:02x} "
f"with contents `{message}`",
)
if info is None:
raise OmnikInverterPacketInvalidError(
"None of the messages contained an information reply!"
)
return info
def _parse_information_reply(data: bytes) -> dict[str, Any]:
tcp_data = _TcpData.from_buffer_copy(data)
if tcp_data.unknown0 not in [0, UINT16_MAX]: # pragma: no cover
LOGGER.warning("Unexpected unknown0 `%s`", tcp_data.unknown0)
if tcp_data.padding0 != b"\x81\x02\x01": # pragma: no cover
LOGGER.warning("Unexpected padding0 `%s`", tcp_data.padding0)
# For all data that's expected to be zero, print it if it's not. Perhaps
# there are more interesting fields on different inverters waiting to be
# uncovered.
for idx in range(1, 5): # pragma: no cover
name = f"padding{idx}"
padding = getattr(tcp_data, name)
if sum(padding):
LOGGER.warning("Unexpected `%s`: `%s`", name, padding)
def list_divide_10(integers: list[int]) -> list[Optional[float]]:
return [None if v == UINT16_MAX else v * 0.1 for v in integers]
def int_to_bool(num: int) -> bool:
return {
0: False,
1: True,
}[num]
# Only these fields will be extracted from the structure
field_extractors = {
"serial_number": None,
"temperature": 0.1,
"dc_input_voltage": list_divide_10,
"dc_input_current": list_divide_10,
"ac_output_current": list_divide_10,
"ac_output_voltage": list_divide_10,
"ac_output": None,
"solar_energy_today": 0.01,
"solar_energy_total": 0.1,
"solar_hours_total": None,
"inverter_active": int_to_bool,
"firmware": None,
"firmware_slave": None,
}
result = {}
for (name, extractor) in field_extractors.items():
value = getattr(tcp_data, name)
if name == "ac_output":
# Flatten the list of frequency+power AC objects
result["ac_output_frequency"] = [o.get_frequency() for o in value]
result["ac_output_power"] = [o.get_power() for o in value]
continue
if isinstance(extractor, float):
value *= extractor
elif extractor is not None:
value = extractor(value)
elif isinstance(value, bytes):
value = value.decode(encoding="utf-8")
result[name] = value
return result