Skip to content

Commit

Permalink
Add encoding/decoding of generic Schedule registers
Browse files Browse the repository at this point in the history
Extend "eco mode" encoding with generic schedule kind and add new "Eco Mode 745" mode which includes also months.
  • Loading branch information
mletenay committed Feb 17, 2024
1 parent 3050e7c commit bc0676a
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 78 deletions.
203 changes: 128 additions & 75 deletions goodwe/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from abc import ABC, abstractmethod
from datetime import datetime
from enum import IntEnum
from struct import unpack
from typing import Any, Callable, Optional

Expand All @@ -10,6 +11,75 @@
from .protocol import ProtocolResponse

DAY_NAMES = ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"]
MONTH_NAMES = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]


class ScheduleType(IntEnum):
ECO_MODE = 0,
DRY_CONTACT_LOAD = 1,
DRY_CONTACT_SMART_LOAD = 2,
PEAK_SHAVING = 3,
BACKUP_MODE = 4,
SMART_CHARGE_MODE = 5,
ECO_MODE_745 = 6

@classmethod
def detect_schedule_type(cls, value: int) -> ScheduleType:
"""Detect schedule type from its on/off value"""
if value in (0, -1, 85):
return ScheduleType.ECO_MODE
elif value in (1, -2):
return ScheduleType.DRY_CONTACT_LOAD
elif value in (2, -3):
return ScheduleType.DRY_CONTACT_SMART_LOAD
elif value in (3, -4):
return ScheduleType.PEAK_SHAVING
elif value in (4, -5):
return ScheduleType.BACKUP_MODE
elif value in (5, -6):
return ScheduleType.SMART_CHARGE_MODE
elif value in (6, -7):
return ScheduleType.ECO_MODE_745
else:
raise ValueError(f"{value}: on_off value {value} out of range.")

def power_unit(self):
"""Return unit of power parameter"""
if self == ScheduleType.PEAK_SHAVING:
return "W"
else:
return "%"

def decode_power(self, value: int) -> int:
"""Decode human readable value of power parameter"""
if self == ScheduleType.ECO_MODE:
return value
elif self == ScheduleType.PEAK_SHAVING:
return value * 10
if self == ScheduleType.ECO_MODE_745:
return int(value / 10)
else:
return value

def encode_power(self, value: int) -> int:
"""Encode human readable value of power parameter"""
if self == ScheduleType.ECO_MODE:
return value
elif self == ScheduleType.PEAK_SHAVING:
return int(value / 10)
if self == ScheduleType.ECO_MODE_745:
return value * 10
else:
return value

def is_in_range(self, value: int) -> bool:
"""Check if the value fits in allowed values range"""
if self == ScheduleType.ECO_MODE:
return -100 <= value <= 100
if self == ScheduleType.ECO_MODE_745:
return -1000 <= value <= 1000
else:
return True


class Voltage(Sensor):
Expand Down Expand Up @@ -479,10 +549,10 @@ def as_eco_mode_v2(self) -> EcoModeV2:
return result


class EcoModeV2(Sensor, EcoMode):
"""Sensor representing Eco Mode Battery Power Group encoded in 12 bytes"""
class Schedule(Sensor, EcoMode):
"""Sensor representing Schedule Group encoded in 12 bytes"""

def __init__(self, id_: str, offset: int, name: str):
def __init__(self, id_: str, offset: int, name: str, schedule_type: ScheduleType = ScheduleType.ECO_MODE):
super().__init__(id_, offset, name, 12, "", SensorKind.BAT)
self.start_h: int | None = None
self.start_m: int | None = None
Expand All @@ -493,39 +563,43 @@ def __init__(self, id_: str, offset: int, name: str):
self.days: str | None = None
self.power: int | None = None
self.soc: int | None = None
# 2 bytes padding 0000
self.month_bits: int | None = None
self.months: str | None = None
self.schedule_type: ScheduleType = schedule_type

def __str__(self):
return f"{self.start_h}:{self.start_m}-{self.end_h}:{self.end_m} {self.days} " \
f"{self.power}% (SoC {self.soc}%) " \
f"{'On' if self.on_off == -1 else 'Off' if self.on_off == 0 else 'Unset'}"
f"{self.months + ' ' if self.months else ''}" \
f"{self.schedule_type.decode_power(self.power)}{self.schedule_type.power_unit()} (SoC {self.soc}%) " \
f"{'On' if -10 < self.on_off < 0 else 'Off' if 10 > self.on_off >= 0 else 'Unset'}"

def read_value(self, data: ProtocolResponse):
self.start_h = read_byte(data)
if (self.start_h < 0 or self.start_h > 23) and self.start_h != 48:
if (self.start_h < 0 or self.start_h > 23) and self.start_h != 48 and self.start_h != -1:
raise ValueError(f"{self.id_}: start_h value {self.start_h} out of range.")
self.start_m = read_byte(data)
if self.start_m < 0 or self.start_m > 59:
if (self.start_m < 0 or self.start_m > 59) and self.start_m != -1:
raise ValueError(f"{self.id_}: start_m value {self.start_m} out of range.")
self.end_h = read_byte(data)
if (self.end_h < 0 or self.end_h > 23) and self.end_h != 48:
if (self.end_h < 0 or self.end_h > 23) and self.end_h != 48 and self.end_h != -1:
raise ValueError(f"{self.id_}: end_h value {self.end_h} out of range.")
self.end_m = read_byte(data)
if self.end_m < 0 or self.end_m > 59:
if (self.end_m < 0 or self.end_m > 59) and self.end_m != -1:
raise ValueError(f"{self.id_}: end_m value {self.end_m} out of range.")
self.on_off = read_byte(data)
if self.on_off not in (0, -1, 85):
raise ValueError(f"{self.id_}: on_off value {self.on_off} out of range.")
self.schedule_type = ScheduleType.detect_schedule_type(self.on_off)
self.day_bits = read_byte(data)
self.days = decode_day_of_week(self.day_bits)
if self.day_bits < 0:
raise ValueError(f"{self.id_}: day_bits value {self.day_bits} out of range.")
self.power = read_bytes2(data) # negative=charge, positive=discharge
if self.power < -100 or self.power > 100:
if not self.schedule_type.is_in_range(self.power):
raise ValueError(f"{self.id_}: power value {self.power} out of range.")
self.soc = read_bytes2(data)
if self.soc < 0 or self.soc > 100:
raise ValueError(f"{self.id_}: SoC value {self.soc} out of range.")
self.month_bits = read_bytes2(data)
self.months = decode_months(self.month_bits)
return self

def encode_value(self, value: Any, register_value: bytes = None) -> bytes:
Expand All @@ -538,35 +612,46 @@ def encode_value(self, value: Any, register_value: bytes = None) -> bytes:
def encode_charge(self, eco_mode_power: int, eco_mode_soc: int = 100) -> bytes:
"""Answer bytes representing all the time enabled charging eco mode group"""
return bytes.fromhex(
"0000173bff7f{:04x}{:04x}0000".format((-1 * abs(eco_mode_power)) & (2 ** 16 - 1), eco_mode_soc))
"0000173b{:02x}7f{:04x}{:04x}{:04x}".format(
255 - self.schedule_type,
(-1 * abs(self.schedule_type.encode_power(eco_mode_power))) & (2 ** 16 - 1),
eco_mode_soc,
0 if self.schedule_type != ScheduleType.ECO_MODE_745 else 0x0fff))

def encode_discharge(self, eco_mode_power: int) -> bytes:
"""Answer bytes representing all the time enabled discharging eco mode group"""
return bytes.fromhex("0000173bff7f{:04x}00640000".format(abs(eco_mode_power)))
return bytes.fromhex("0000173b{:02x}7f{:04x}0064{:04x}".format(
255 - self.schedule_type,
abs(self.schedule_type.encode_power(eco_mode_power)),
0 if self.schedule_type != ScheduleType.ECO_MODE_745 else 0x0fff))

def encode_off(self) -> bytes:
"""Answer bytes representing empty and disabled eco mode group"""
return bytes.fromhex("300030000000006400640000")
"""Answer bytes representing empty and disabled schedule group"""
return bytes.fromhex("30003000{:02x}00{:04x}00640000".format(
self.schedule_type.value,
self.schedule_type.encode_power(100)))

def is_eco_charge_mode(self) -> bool:
"""Answer if it represents the emulated 24/7 fulltime discharge mode"""
return self.start_h == 0 \
and self.start_m == 0 \
and self.end_h == 23 \
and self.end_m == 59 \
and self.on_off == -1 \
and self.on_off == (-1 - self.schedule_type) \
and self.day_bits == 127 \
and self.power < 0
and self.power < 0 \
and (self.month_bits == 0 or self.month_bits == 0x0fff)

def is_eco_discharge_mode(self) -> bool:
"""Answer if it represents the emulated 24/7 fulltime discharge mode"""
return self.start_h == 0 \
and self.start_m == 0 \
and self.end_h == 23 \
and self.end_m == 59 \
and self.on_off == -1 \
and self.on_off == (-1 - self.schedule_type) \
and self.day_bits == 127 \
and self.power > 0
and self.power > 0 \
and (self.month_bits == 0 or self.month_bits == 0x0fff)

def as_eco_mode_v1(self) -> EcoModeV1:
"""Convert V2 to V1 EcoMode"""
Expand All @@ -582,65 +667,18 @@ def as_eco_mode_v1(self) -> EcoModeV1:
return result


class PeakShavingMode(Sensor):
"""Sensor representing Peak Shaving Mode encoded in 12 bytes"""
class EcoModeV2(Schedule):
"""Sensor representing Eco Mode Group encoded in 12 bytes"""

def __init__(self, id_: str, offset: int, name: str):
super().__init__(id_, offset, name, 12, "", SensorKind.BAT)
self.start_h: int | None = None
self.start_m: int | None = None
self.end_h: int | None = None
self.end_m: int | None = None
self.on_off: int | None = None
self.day_bits: int | None = None
self.days: str | None = None
self.import_power: float | None = None
self.soc: int | None = None
# 2 bytes padding 0000

def __str__(self):
return f"{self.start_h}:{self.start_m}-{self.end_h}:{self.end_m} {self.days} " \
f"{self.import_power}kW (SoC {self.soc}%) " \
f"{'On' if self.on_off == -4 else 'Off' if self.on_off == 3 else 'Unset'}"
super().__init__(id_, offset, name, ScheduleType.ECO_MODE)

def read_value(self, data: ProtocolResponse):
self.start_h = read_byte(data)
if (self.start_h < 0 or self.start_h > 23) and self.start_h != 48:
raise ValueError(f"{self.id_}: start_h value {self.start_h} out of range.")
self.start_m = read_byte(data)
if self.start_m < 0 or self.start_m > 59:
raise ValueError(f"{self.id_}: start_m value {self.start_m} out of range.")
self.end_h = read_byte(data)
if (self.end_h < 0 or self.end_h > 23) and self.end_h != 48:
raise ValueError(f"{self.id_}: end_h value {self.end_h} out of range.")
self.end_m = read_byte(data)
if self.end_m < 0 or self.end_m > 59:
raise ValueError(f"{self.id_}: end_m value {self.end_m} out of range.")
self.on_off = read_byte(data)
if self.on_off not in (-4, 3, 85):
raise ValueError(f"{self.id_}: on_off value {self.on_off} out of range.")
self.day_bits = read_byte(data)
self.days = decode_day_of_week(self.day_bits)
if self.day_bits < 0:
raise ValueError(f"{self.id_}: day_bits value {self.day_bits} out of range.")
self.import_power = read_decimal2(data, 100)
if self.import_power < 0 or self.import_power > 500:
raise ValueError(f"{self.id_}: import_power value {self.import_power} out of range.")
self.soc = read_bytes2(data)
if self.soc < 0 or self.soc > 100:
raise ValueError(f"{self.id_}: soc value {self.soc} out of range.")
return self

def encode_value(self, value: Any, register_value: bytes = None) -> bytes:
if isinstance(value, bytes) and len(value) == 12:
# try to read_value to check if values are valid
if self.read_value(ProtocolResponse(value, None)):
return value
raise ValueError
class PeakShavingMode(Schedule):
"""Sensor representing Peak Shaving Mode encoded in 12 bytes"""

def encode_off(self) -> bytes:
"""Answer bytes representing empty and disabled eco mode group"""
return bytes.fromhex("300030000000006400640000")
def __init__(self, id_: str, offset: int, name: str):
super().__init__(id_, offset, name, ScheduleType.PEAK_SHAVING)


class Calculated(Sensor):
Expand Down Expand Up @@ -807,3 +845,18 @@ def decode_day_of_week(data: int) -> str:
days += daynames[0]
daynames.pop(0)
return days


def decode_months(data: int) -> str | None:
if data == 0 or data == 0x0fff:
return None
bits = bin(data)[2:]
monthnames = list(MONTH_NAMES)
months = ""
for each in bits[::-1]:
if each == '1':
if len(months) > 0:
months += ","
months += monthnames[0]
monthnames.pop(0)
return months
62 changes: 59 additions & 3 deletions tests/test_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,12 @@ def test_eco_mode_v1(self):
self.assertFalse(testee.read(data).is_eco_charge_mode())
self.assertFalse(testee.read(data).is_eco_discharge_mode())

def test_eco_mode_v2(self):
testee = EcoModeV2("", 0, "")
def test_schedule(self):
testee = Schedule("", 0, "")

data = MockResponse("0d1e0e28ff1affc4005a0000")
self.assertEqual("13:30-14:40 Mon,Wed,Thu -60% (SoC 90%) On", testee.read(data).__str__())
self.assertEqual(ScheduleType.ECO_MODE, testee.schedule_type)
self.assertEqual(bytes.fromhex("0d1e0e28ff1affc4005a0000"),
testee.encode_value(bytes.fromhex("0d1e0e28ff1affc4005a0000")))
self.assertRaises(ValueError, lambda: testee.encode_value(bytes.fromhex("0d1e0e28ffffffc4005a0000")))
Expand All @@ -177,15 +178,70 @@ def test_eco_mode_v2(self):

data = MockResponse("0000173b5500001400640000")
self.assertEqual("0:0-23:59 20% (SoC 100%) Unset", testee.read(data).__str__())
data = MockResponse("ffffffff557f000000010001")
self.assertEqual("-1:-1--1:-1 Sun,Mon,Tue,Wed,Thu,Fri,Sat Jan 0% (SoC 1%) Unset", testee.read(data).__str__())
data = MockResponse("000000005500000000000000")
self.assertEqual("0:0-0:0 0% (SoC 0%) Unset", testee.read(data).__str__())

def test_eco_mode_v745(self):
testee = Schedule("", 0, "", ScheduleType.ECO_MODE_745)

data = MockResponse("0d1e0e28f91affc4005a0000")
self.assertEqual("13:30-14:40 Mon,Wed,Thu -6% (SoC 90%) On", testee.read(data).__str__())
self.assertEqual(bytes.fromhex("0d1e0e28f91affc4005a0000"),
testee.encode_value(bytes.fromhex("0d1e0e28f91affc4005a0000")))
self.assertFalse(testee.read(data).is_eco_charge_mode())
self.assertFalse(testee.read(data).is_eco_discharge_mode())
self.assertEqual(ScheduleType.ECO_MODE_745, testee.schedule_type)

data = MockResponse(testee.encode_charge(-40, 80).hex())
self.assertEqual("0:0-23:59 Sun,Mon,Tue,Wed,Thu,Fri,Sat -40% (SoC 80%) On", testee.read(data).__str__())
self.assertTrue(testee.read(data).is_eco_charge_mode())
self.assertFalse(testee.read(data).is_eco_discharge_mode())
self.assertEqual(ScheduleType.ECO_MODE_745, testee.schedule_type)
data = MockResponse(testee.encode_discharge(60).hex())
self.assertEqual("0:0-23:59 Sun,Mon,Tue,Wed,Thu,Fri,Sat 60% (SoC 100%) On", testee.read(data).__str__())
self.assertFalse(testee.read(data).is_eco_charge_mode())
self.assertTrue(testee.read(data).is_eco_discharge_mode())
self.assertEqual(ScheduleType.ECO_MODE_745, testee.schedule_type)
data = MockResponse(testee.encode_off().hex())
self.assertEqual("48:0-48:0 100% (SoC 100%) Off", testee.read(data).__str__())
self.assertFalse(testee.read(data).is_eco_charge_mode())
self.assertFalse(testee.read(data).is_eco_discharge_mode())
self.assertEqual(ScheduleType.ECO_MODE_745, testee.schedule_type)

data = MockResponse("10001600f97f00c800000fff")
self.assertEqual("16:0-22:0 Sun,Mon,Tue,Wed,Thu,Fri,Sat 20% (SoC 0%) On", testee.read(data).__str__())
self.assertEqual(ScheduleType.ECO_MODE_745, testee.schedule_type)
data = MockResponse("10001600067f00c800000fff")
self.assertEqual("16:0-22:0 Sun,Mon,Tue,Wed,Thu,Fri,Sat 20% (SoC 0%) Off", testee.read(data).__str__())
self.assertEqual(ScheduleType.ECO_MODE_745, testee.schedule_type)
data = MockResponse("10001600f97ffe70004b0fff")
self.assertEqual("16:0-22:0 Sun,Mon,Tue,Wed,Thu,Fri,Sat -40% (SoC 75%) On", testee.read(data).__str__())
self.assertEqual(ScheduleType.ECO_MODE_745, testee.schedule_type)
data = MockResponse("10001600f97fff3800320fff")
self.assertEqual("16:0-22:0 Sun,Mon,Tue,Wed,Thu,Fri,Sat -20% (SoC 50%) On", testee.read(data).__str__())
self.assertEqual(ScheduleType.ECO_MODE_745, testee.schedule_type)
data = MockResponse("10001600f902ff38004b0002")
self.assertEqual("16:0-22:0 Mon Feb -20% (SoC 75%) On", testee.read(data).__str__())
data = MockResponse("10001600f902ff38004b0004")
self.assertEqual("16:0-22:0 Mon Mar -20% (SoC 75%) On", testee.read(data).__str__())

def test_peak_shaving_mode(self):
testee = PeakShavingMode("", 0, "")

data = MockResponse("010a020a037f00fa00370000")
self.assertEqual("1:10-2:10 Sun,Mon,Tue,Wed,Thu,Fri,Sat 2.5kW (SoC 55%) Off", testee.read(data).__str__())
self.assertEqual("1:10-2:10 Sun,Mon,Tue,Wed,Thu,Fri,Sat 2500W (SoC 55%) Off", testee.read(data).__str__())
self.assertEqual(bytes.fromhex("010a020a037f00fa00370000"),
testee.encode_value(bytes.fromhex("010a020a037f00fa00370000")))

data = MockResponse("00000d08fc7f006400140000")
self.assertEqual("0:0-13:8 Sun,Mon,Tue,Wed,Thu,Fri,Sat 1000W (SoC 20%) On", testee.read(data).__str__())
self.assertEqual(ScheduleType.PEAK_SHAVING, testee.schedule_type)
data = MockResponse("00000d08037f000000000000")
self.assertEqual("0:0-13:8 Sun,Mon,Tue,Wed,Thu,Fri,Sat 0W (SoC 0%) Off", testee.read(data).__str__())
self.assertEqual(ScheduleType.PEAK_SHAVING, testee.schedule_type)

def test_decode_bitmap(self):
self.assertEqual('', decode_bitmap(0, ERROR_CODES))
self.assertEqual('Utility Loss', decode_bitmap(512, ERROR_CODES))
Expand Down

0 comments on commit bc0676a

Please sign in to comment.