-
-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add TCP source for lightweight, extended data gathering
Certain Omnik inverters (supposedly with a specific serial number, see sources below) reply with its statistics as binary data when a "magic" packet is sent to them on port 8899 over TCP. This data is more lightweight than an HTML page or JavaScript source with values embedded, and provides more statistics then either similar to what can be found on the LCD display. The data format and meaning has been extracted from various Python/C sources (see links below), with some of my own "work" to simplify checksum calculation and "the reverse hexadecimal serial number" is nothing more than its little-endian byte representation. Response data is parsed at once with a `ctypes.BigEndianStructure` instead of unpacking individual ranges of bytes through `struct.unpack`. Only minimal data massaging is needed to convert the struct to useful information. It provides the following extra fields: - Inverter temperature; - Voltage and current for the DC input strings (up to 3); - Voltage, current, frequency and power for all AC outputs (also up to 3); - Total number of runtime hours (not sure if this is just the Omnik being on, or the time it delivers >0W of power back to the network). No "device" (network module) data is found in any of the unparsed bits, nor is `solar_rater_power` available. Sources: 1. https://github.com/jbouwh/omnikdatalogger/blob/dev/apps/omnikdatalogger/omnik/InverterMsg.py 2. http://www.mb200d.nl/wordpress/2015/11/omniksol-4k-tl-wifi-kit/#more-590 3. https://github.com/Woutrrr/Omnik-Data-Logger 4. https://github.com/t3kpunk/Omniksol-PV-Logger
- Loading branch information
Showing
6 changed files
with
332 additions
and
19 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,198 @@ | ||
"""Data model and conversions for tcp-based communication.""" | ||
from ctypes import BigEndianStructure, c_char, c_ubyte, c_uint, c_ushort | ||
from typing import Any | ||
|
||
from .exceptions import OmnikInverterPacketInvalidError | ||
|
||
UINT16_MAX = 65535 | ||
|
||
|
||
class _AcOutput(BigEndianStructure): | ||
_fields_ = [ | ||
("frequency", c_ushort), | ||
("power", c_ushort), | ||
] | ||
|
||
def get_power(self): | ||
"""Retrieve AC power. | ||
Returns: | ||
The power field, or None if it is unset. | ||
""" | ||
return None if self.power == UINT16_MAX else self.power | ||
|
||
def get_frequency(self): | ||
"""Retrieve AC frequency. | ||
Returns: | ||
The frequency field in Hertz, or None if it is unset. | ||
""" | ||
return None if self.frequency == UINT16_MAX else self.frequency * 0.01 | ||
|
||
|
||
class _TcpData(BigEndianStructure): | ||
_pack_ = 1 | ||
_fields_ = [ | ||
# Unlikely for all these bytes to represent the model, | ||
# this mapping has to be built out over time. | ||
("model", c_char * 3), | ||
# The s/n from the request, twice, in little-endian | ||
("double_serial_number", (c_ubyte * 4) * 2), | ||
("padding0", c_char * 3), | ||
("serial_number", c_char * 16), | ||
("temperature", c_ushort), | ||
("dc_input_voltage", c_ushort * 3), | ||
("dc_input_current", c_ushort * 3), | ||
("ac_output_current", c_ushort * 3), | ||
("ac_output_voltage", c_ushort * 3), | ||
("ac_output", _AcOutput * 3), | ||
("solar_energy_today", c_ushort), | ||
("solar_energy_total", c_uint), | ||
("solar_hours_total", c_uint), | ||
("maybe_weekday_counter", c_ushort), | ||
("padding1", c_ubyte * 4), | ||
("unknown0", c_ushort), | ||
("padding2", c_ubyte * 10), | ||
("firmware", c_char * 16), | ||
("padding3", c_ubyte * 4), | ||
("firmware_slave", c_char * 16), | ||
("padding4", c_ubyte * 4), | ||
] | ||
|
||
|
||
class TcpParser: | ||
"""Functions for interacting with the Omnik over TCP.""" | ||
|
||
MESSAGE_START = 0x68 | ||
MESSAGE_END = 0x16 | ||
|
||
@classmethod | ||
def _pack_message(cls, message: bytearray) -> bytearray: | ||
checksum = sum(message) & 0xFF | ||
|
||
message.insert(0, cls.MESSAGE_START) | ||
message.append(checksum) | ||
message.append(cls.MESSAGE_END) | ||
|
||
return message | ||
|
||
@classmethod | ||
def _unpack_message(cls, message: bytearray) -> bytearray: | ||
if message.pop(0) != cls.MESSAGE_START: | ||
raise OmnikInverterPacketInvalidError("Invalid start byte") | ||
if message.pop() != cls.MESSAGE_END: | ||
raise OmnikInverterPacketInvalidError("Invalid end byte") | ||
|
||
expected_checksum = message.pop() | ||
checksum = sum(message) & 0xFF | ||
if expected_checksum != checksum: | ||
raise OmnikInverterPacketInvalidError( | ||
f"Checksum mismatch (got `{checksum}` expected `{expected_checksum}`" | ||
) | ||
|
||
return message | ||
|
||
@classmethod | ||
def create_information_request(cls, serial_number: int) -> bytearray: | ||
"""Compute a magic message to which the Omnik will reply with raw statistics. | ||
Args: | ||
serial_number: Integer with the serial number of your Omnik device. | ||
Returns: | ||
A bytearray with the raw message data, to be sent over a TCP socket. | ||
""" | ||
request_data = bytearray([0x02, 0x40, 0x30]) | ||
serial_bytes = serial_number.to_bytes(length=4, byteorder="little") | ||
request_data.extend(serial_bytes) | ||
request_data.extend(serial_bytes) | ||
request_data.extend([0x01, 0x00]) | ||
return cls._pack_message(request_data) | ||
|
||
@classmethod | ||
def parse_information_reply(cls, serial_number: int, data: bytes) -> dict[str, Any]: | ||
"""Perform a raw TCP request to the Omnik device. | ||
Args: | ||
serial_number: Serial number passed to | ||
`clk.create_information_request()`, used to validate the reply. | ||
data: Raw data reply from the Omnik Inverter. | ||
Returns: | ||
A Python dictionary (text) with the response from | ||
the Omnik Inverter. | ||
Raises: | ||
OmnikInverterPacketInvalidError: Received data fails basic validity checks. | ||
""" | ||
|
||
data = cls._unpack_message(bytearray(data)) | ||
data = _TcpData.from_buffer_copy(data) | ||
|
||
if any( | ||
serial_number != int.from_bytes(b, byteorder="little") | ||
for b in data.double_serial_number | ||
): | ||
raise OmnikInverterPacketInvalidError("Serial number mismatch in reply") | ||
|
||
if data.unknown0 != UINT16_MAX: | ||
print(f"Unexpected unknown0 `{data.unknown0}`") | ||
|
||
if data.padding0 != b"\x81\x02\x01": | ||
print(f"Unexpected padding0 `{data.padding0}`") | ||
|
||
# For all data that's expected to be zero, print it if it's not. Perhaps | ||
# there are more interesting fields on different inverters waiting to be | ||
# uncovered. | ||
for idx in range(1, 5): | ||
name = f"padding{idx}" | ||
padding = getattr(data, name) | ||
if sum(padding): | ||
print(f"Unexpected `{name}`: `{padding}`") | ||
|
||
def extract_model(magic: c_char * 3): | ||
return { | ||
b"\x7d\x41\xb0": "Omniksol 3000TL", | ||
# http://www.mb200d.nl/wordpress/2015/11/omniksol-4k-tl-wifi-kit/#more-590: | ||
b"\x81\x41\xb0": "Omniksol 4000TL", | ||
}.get(magic, f"Unknown device model from magic {magic!r}") | ||
|
||
def list_divide_10(integers: list[c_ushort]) -> list[c_ushort]: | ||
return [None if v == UINT16_MAX else v * 0.1 for v in integers] | ||
|
||
# Only these fields will be extracted from the structure | ||
field_extractors = { | ||
"model": extract_model, | ||
"serial_number": None, | ||
"temperature": 0.1, | ||
"dc_input_voltage": list_divide_10, | ||
"dc_input_current": list_divide_10, | ||
"ac_output_current": list_divide_10, | ||
"ac_output_voltage": list_divide_10, | ||
"ac_output": None, | ||
"solar_energy_today": 0.01, | ||
"solar_energy_total": 0.1, | ||
"solar_hours_total": None, | ||
"firmware": None, | ||
"firmware_slave": None, | ||
} | ||
|
||
result = {} | ||
|
||
for (name, extractor) in field_extractors.items(): | ||
value = getattr(data, name) | ||
if name == "ac_output": | ||
# Flatten the list of frequency+power AC objects | ||
|
||
result["ac_output_frequency"] = [o.get_frequency() for o in value] | ||
result["ac_output_power"] = [o.get_power() for o in value] | ||
continue | ||
|
||
if isinstance(extractor, float): | ||
value *= extractor | ||
elif extractor is not None: | ||
value = extractor(value) | ||
|
||
result[name] = value | ||
|
||
return result |
Oops, something went wrong.