Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tcp: Make firmware strings (in packet suffix) optional #376

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions omnikinverter/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ class Inverter:

serial_number: str | None
model: str | None
firmware: str | None
firmware_slave: str | None
solar_rated_power: int | None
solar_current_power: int | None
solar_energy_today: float | None
solar_energy_total: float | None
alarm_code: str | None = None

firmware: str | None = None
firmware_slave: str | None = None

# TCP only
inverter_active: bool | None = None
solar_hours_total: int | None = None
Expand Down
174 changes: 105 additions & 69 deletions omnikinverter/tcp.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Data model and conversions for tcp-based communication with the Omnik Inverter."""
from __future__ import annotations

from ctypes import BigEndianStructure, c_char, c_ubyte, c_uint, c_ushort
from ctypes import BigEndianStructure, c_char, c_ubyte, c_uint, c_ushort, sizeof
from typing import TYPE_CHECKING, Any

from .const import LOGGER
Expand Down Expand Up @@ -70,12 +70,102 @@ class _TcpData(BigEndianStructure):
("padding1", c_ubyte * 4),
("unknown0", c_ushort),
("padding2", c_ubyte * 10),
]

@classmethod
def parse(cls, data: bytes, offset: int = 0) -> dict[str, Any]:
"""Parse `data` into all fields described by this C structure."""
tcp_data = cls.from_buffer_copy(data, offset)

if tcp_data.unknown0 not in [0, UINT16_MAX]: # pragma: no cover
LOGGER.warning("Unexpected unknown0 `%s`", tcp_data.unknown0)

if tcp_data.padding0 != b"\x81\x02\x01": # pragma: no cover
LOGGER.warning("Unexpected padding0 `%s`", tcp_data.padding0)

# For all data that's expected to be zero, print it if it's not. Perhaps
# there are more interesting fields on different inverters waiting to be
# uncovered.
for idx in range(1, 3): # pragma: no cover
name = f"padding{idx}"
padding = getattr(tcp_data, name)
if sum(padding):
LOGGER.warning("Unexpected `%s`: `%s`", name, padding)

def list_divide_10(integers: list[int]) -> list[float | None]:
return [None if v == UINT16_MAX else v * 0.1 for v in integers]

def int_to_bool(num: int) -> bool:
return {
0: False,
1: True,
}[num]

# Set temperature to None if it matches 65326, this is returned
# when the inverter is "offline".
def temperature_to_float(temp: int) -> float | None:
return None if temp == 65326 else temp * 0.1

# Only these fields will be extracted from the structure
field_extractors = {
"serial_number": None,
"temperature": temperature_to_float,
"dc_input_voltage": list_divide_10,
"dc_input_current": list_divide_10,
"ac_output_current": list_divide_10,
"ac_output_voltage": list_divide_10,
"ac_output": None,
"solar_energy_today": 0.01,
"solar_energy_total": 0.1,
"solar_hours_total": None,
"inverter_active": int_to_bool,
}

result = {}

for name, extractor in field_extractors.items():
value = getattr(tcp_data, name)

if name == "ac_output":
# Flatten the list of frequency+power AC objects

result["ac_output_frequency"] = [o.get_frequency() for o in value]
result["ac_output_power"] = [o.get_power() for o in value]
continue

if isinstance(extractor, float):
value *= extractor
elif extractor is not None:
value = extractor(value)
elif isinstance(value, bytes):
value = value.decode(encoding="utf-8")

result[name] = value

return result


class _TcpFirmwareStrings(BigEndianStructure):
_pack_ = 1
_fields_ = [
("firmware", c_char * 16),
("padding3", c_ubyte * 4),
("firmware_slave", c_char * 16),
("padding4", c_ubyte * 4),
]

@classmethod
def parse(cls, data: bytes, offset: int = 0) -> dict[str, Any]:
"""Parse `data` into all fields described by this C structure."""
tcp_firmware_data = cls.from_buffer_copy(data, offset)

result = {}

for name in ["firmware", "firmware_slave"]:
result[name] = getattr(tcp_firmware_data, name).decode(encoding="utf-8")

return result


def _pack_message(
message_type: int,
Expand Down Expand Up @@ -248,73 +338,19 @@ def parse_messages(serial_number: int, data: bytes) -> dict[str, Any]:


def _parse_information_reply(data: bytes) -> dict[str, Any]:
tcp_data = _TcpData.from_buffer_copy(data)

if tcp_data.unknown0 not in [0, UINT16_MAX]: # pragma: no cover
LOGGER.warning("Unexpected unknown0 `%s`", tcp_data.unknown0)

if tcp_data.padding0 != b"\x81\x02\x01": # pragma: no cover
LOGGER.warning("Unexpected padding0 `%s`", tcp_data.padding0)

# For all data that's expected to be zero, print it if it's not. Perhaps
# there are more interesting fields on different inverters waiting to be
# uncovered.
for idx in range(1, 5): # pragma: no cover
name = f"padding{idx}"
padding = getattr(tcp_data, name)
if sum(padding):
LOGGER.warning("Unexpected `%s`: `%s`", name, padding)

def list_divide_10(integers: list[int]) -> list[float | None]:
return [None if v == UINT16_MAX else v * 0.1 for v in integers]

def int_to_bool(num: int) -> bool:
return {
0: False,
1: True,
}[num]

# Set temperature to None if it matches 65326, this is returned
# when the inverter is "offline".
def temperature_to_float(temp: int) -> float | None:
return None if temp == 65326 else temp * 0.1

# Only these fields will be extracted from the structure
field_extractors = {
"serial_number": None,
"temperature": temperature_to_float,
"dc_input_voltage": list_divide_10,
"dc_input_current": list_divide_10,
"ac_output_current": list_divide_10,
"ac_output_voltage": list_divide_10,
"ac_output": None,
"solar_energy_today": 0.01,
"solar_energy_total": 0.1,
"solar_hours_total": None,
"inverter_active": int_to_bool,
"firmware": None,
"firmware_slave": None,
}

result: dict[str, Any] = {}

for name, extractor in field_extractors.items():
value = getattr(tcp_data, name)

if name == "ac_output":
# Flatten the list of frequency+power AC objects

result["ac_output_frequency"] = [o.get_frequency() for o in value]
result["ac_output_power"] = [o.get_power() for o in value]
continue

if isinstance(extractor, float):
value *= extractor
elif extractor is not None:
value = extractor(value)
elif isinstance(value, bytes):
value = value.decode(encoding="utf-8")

result[name] = value
if len(data) not in [
sizeof(_TcpData),
sizeof(_TcpData) + sizeof(_TcpFirmwareStrings),
]: # pragma: no cover
LOGGER.warning(
"Unrecognized INFORMATION_REPLY size `%s`, are there extra bytes?",
len(data),
)

result = _TcpData.parse(data)

# Only parse firmware strings if available
if len(data) >= sizeof(_TcpData) + sizeof(_TcpFirmwareStrings):
result.update(_TcpFirmwareStrings.parse(data, sizeof(_TcpData)))

return result
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ asyncio_mode = "auto"
select = ["ALL"]
ignore = [
"ANN101", # Self... explanatory
"ANN102", # cls in classmethods
"ANN401", # Opinionated warning on disallowing dynamically typed expressions
"D203", # Conflicts with other rules
"D213", # Conflicts with other rules
Expand Down
Binary file added tests/fixtures/tcp_reply_no_firmware.data
Binary file not shown.
38 changes: 38 additions & 0 deletions tests/test_tcp_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,44 @@ async def test_inverter_tcp() -> None:
await server_exit


async def test_inverter_tcp_no_firmware() -> None:
"""Test request from an Inverter without firmware reply - TCP source."""
serial_number = 987654321

(server_exit, port) = tcp_server(serial_number, "tcp_reply_no_firmware.data")

client = OmnikInverter(
host="localhost",
source_type="tcp",
serial_number=serial_number,
tcp_port=port,
)

inverter: Inverter = await client.inverter()

assert inverter
assert inverter.solar_rated_power is None
assert inverter.solar_current_power == 0

assert inverter.model is None
assert inverter.serial_number == "NLDN202013212035"
assert inverter.temperature == 23.400000000000002
assert inverter.dc_input_voltage == [118.30000000000001, 0.0, None]
assert inverter.dc_input_current == [0.0, 15.100000000000001, None]
MarijnS95 marked this conversation as resolved.
Show resolved Hide resolved
assert inverter.ac_output_voltage == [224.5, None, None]
assert inverter.ac_output_current == [0.1, None, None]
assert inverter.ac_output_frequency == [50.02, None, None]
assert inverter.ac_output_power == [0, None, None]
assert inverter.solar_energy_today == 7.21
assert inverter.solar_energy_total == 12861.900000000001
assert inverter.solar_hours_total == 30940
assert inverter.inverter_active is True
assert inverter.firmware is None
assert inverter.firmware_slave is None

await server_exit


async def test_inverter_tcp_offline() -> None:
"""Test request from an Inverter (offline) - TCP source."""
serial_number = 1608449224
Expand Down