forked from rytilahti/python-miio
-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add leshow.fan.ss4 support (Closes: rytilahti#806)
- Loading branch information
Showing
3 changed files
with
351 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,217 @@ | ||
import enum | ||
import logging | ||
from typing import Any, Dict | ||
|
||
import click | ||
|
||
from .click_common import EnumType, command, format_output | ||
from .device import Device | ||
from .exceptions import DeviceException | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
MODEL_FAN_LESHOW_SS4 = "leshow.fan.ss4" | ||
|
||
AVAILABLE_PROPERTIES_COMMON = [ | ||
"power", | ||
"mode", | ||
"blow", | ||
"timer", | ||
"sound", | ||
"yaw", | ||
"fault", | ||
] | ||
|
||
AVAILABLE_PROPERTIES = { | ||
MODEL_FAN_LESHOW_SS4: AVAILABLE_PROPERTIES_COMMON, | ||
} | ||
|
||
|
||
class FanLeshowException(DeviceException): | ||
pass | ||
|
||
|
||
class OperationMode(enum.Enum): | ||
Manual = 0 | ||
Sleep = 1 | ||
Strong = 2 | ||
Natural = 3 | ||
|
||
|
||
class FanLeshowStatus: | ||
"""Container for status reports from the Xiaomi Rosou SS4 Ventilator.""" | ||
|
||
def __init__(self, data: Dict[str, Any]) -> None: | ||
""" | ||
Response of a Leshow Fan SS4 (): | ||
{'power': 1, 'mode': 2, 'blow': 100, 'timer': 0, | ||
'sound': 1, 'yaw': 0, 'fault': 0} | ||
""" | ||
self.data = data | ||
|
||
@property | ||
def power(self) -> str: | ||
"""Power state.""" | ||
return "on" if self.data["power"] == 1 else "off" | ||
|
||
@property | ||
def is_on(self) -> bool: | ||
"""True if device is turned on.""" | ||
return self.data["power"] == 1 | ||
|
||
@property | ||
def mode(self) -> OperationMode: | ||
"""Operation mode. Can be either 0, 1, 2 or 3.""" | ||
return OperationMode(self.data["mode"]) | ||
|
||
@property | ||
def speed(self) -> int: | ||
"""Speed of the fan in percent.""" | ||
return self.data["blow"] | ||
|
||
@property | ||
def buzzer(self) -> bool: | ||
"""True if buzzer is turned on.""" | ||
return self.data["sound"] == 1 | ||
|
||
@property | ||
def oscillate(self) -> bool: | ||
"""True if oscillation is enabled.""" | ||
return self.data["yaw"] == 1 | ||
|
||
@property | ||
def delay_off_countdown(self) -> int: | ||
"""Countdown until turning off in minutes.""" | ||
return self.data["timer"] | ||
|
||
@property | ||
def error_detected(self) -> bool: | ||
"""True if a fault was detected.""" | ||
return self.data["fault"] == 1 | ||
|
||
def __repr__(self) -> str: | ||
s = ( | ||
"<FanLeshowStatus power=%s, " | ||
"mode=%s, " | ||
"speed=%s, " | ||
"buzzer=%s, " | ||
"oscillate=%s, " | ||
"delay_off_countdown=%s, " | ||
"error_detected=%s>" | ||
% ( | ||
self.power, | ||
self.mode, | ||
self.speed, | ||
self.buzzer, | ||
self.oscillate, | ||
self.delay_off_countdown, | ||
self.error_detected, | ||
) | ||
) | ||
return s | ||
|
||
|
||
class FanLeshow(Device): | ||
"""Main class representing the Xiaomi Rosou SS4 Ventilator.""" | ||
|
||
def __init__( | ||
self, | ||
ip: str = None, | ||
token: str = None, | ||
start_id: int = 0, | ||
debug: int = 0, | ||
lazy_discover: bool = True, | ||
model: str = MODEL_FAN_LESHOW_SS4, | ||
) -> None: | ||
super().__init__(ip, token, start_id, debug, lazy_discover) | ||
|
||
if model in AVAILABLE_PROPERTIES: | ||
self.model = model | ||
else: | ||
self.model = MODEL_FAN_LESHOW_SS4 | ||
|
||
@command( | ||
default_output=format_output( | ||
"", | ||
"Power: {result.power}\n" | ||
"Mode: {result.mode}\n" | ||
"Speed: {result.speed}\n" | ||
"Buzzer: {result.buzzer}\n" | ||
"Oscillate: {result.oscillate}\n" | ||
"Power-off time: {result.delay_off_countdown}\n" | ||
"Error detected: {result.error_detected}\n", | ||
) | ||
) | ||
def status(self) -> FanLeshowStatus: | ||
"""Retrieve properties.""" | ||
properties = AVAILABLE_PROPERTIES[self.model] | ||
values = self.get_properties(properties, max_properties=15) | ||
|
||
return FanLeshowStatus(dict(zip(properties, values))) | ||
|
||
@command(default_output=format_output("Powering on")) | ||
def on(self): | ||
"""Power on.""" | ||
return self.send("set_power", [1]) | ||
|
||
@command(default_output=format_output("Powering off")) | ||
def off(self): | ||
"""Power off.""" | ||
return self.send("set_power", [0]) | ||
|
||
@command( | ||
click.argument("mode", type=EnumType(OperationMode)), | ||
default_output=format_output("Setting mode to '{mode.value}'"), | ||
) | ||
def set_mode(self, mode: OperationMode): | ||
"""Set mode.""" | ||
return self.send("set_mode", [mode.value]) | ||
|
||
@command( | ||
click.argument("speed", type=int), | ||
default_output=format_output("Setting speed of the manual mode to {speed}"), | ||
) | ||
def set_speed(self, speed: int): | ||
"""Set natural level.""" | ||
if speed < 0 or speed > 100: | ||
raise FanLeshowException("Invalid speed: %s" % speed) | ||
|
||
return self.send("set_blow", [speed]) | ||
|
||
@command( | ||
click.argument("oscillate", type=bool), | ||
default_output=format_output( | ||
lambda oscillate: "Turning on oscillate" | ||
if oscillate | ||
else "Turning off oscillate" | ||
), | ||
) | ||
def set_oscillate(self, oscillate: bool): | ||
"""Set oscillate on/off.""" | ||
return self.send("set_yaw", [int(oscillate)]) | ||
|
||
@command( | ||
click.argument("buzzer", type=bool), | ||
default_output=format_output( | ||
lambda buzzer: "Turning on buzzer" if buzzer else "Turning off buzzer" | ||
), | ||
) | ||
def set_buzzer(self, buzzer: bool): | ||
"""Set buzzer on/off.""" | ||
return self.send("set_sound", [int(buzzer)]) | ||
|
||
@command( | ||
click.argument("minutes", type=int), | ||
default_output=format_output("Setting delayed turn off to {minutes} minutes"), | ||
) | ||
def delay_off(self, minutes: int): | ||
"""Set delay off minutes.""" | ||
|
||
if minutes < 0 or minutes > 999: | ||
raise FanLeshowException( | ||
"Invalid value for a delayed turn off: %s" % minutes | ||
) | ||
|
||
return self.send("set_timer", [minutes]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
from unittest import TestCase | ||
|
||
import pytest | ||
|
||
from miio import FanLeshow | ||
from miio.fan_leshow import ( | ||
MODEL_FAN_LESHOW_SS4, | ||
FanLeshowException, | ||
FanLeshowStatus, | ||
OperationMode, | ||
) | ||
|
||
from .dummies import DummyDevice | ||
|
||
|
||
class DummyFanLeshow(DummyDevice, FanLeshow): | ||
def __init__(self, *args, **kwargs): | ||
self.model = MODEL_FAN_LESHOW_SS4 | ||
self.state = { | ||
"power": 1, | ||
"mode": 2, | ||
"blow": 100, | ||
"timer": 0, | ||
"sound": 1, | ||
"yaw": 0, | ||
"fault": 0, | ||
} | ||
self.return_values = { | ||
"get_prop": self._get_state, | ||
"set_power": lambda x: self._set_state("power", x), | ||
"set_mode": lambda x: self._set_state("mode", x), | ||
"set_blow": lambda x: self._set_state("blow", x), | ||
"set_timer": lambda x: self._set_state("timer", x), | ||
"set_sound": lambda x: self._set_state("sound", x), | ||
"set_yaw": lambda x: self._set_state("yaw", x), | ||
} | ||
super().__init__(args, kwargs) | ||
|
||
|
||
@pytest.fixture(scope="class") | ||
def fanleshow(request): | ||
request.cls.device = DummyFanLeshow() | ||
# TODO add ability to test on a real device | ||
|
||
|
||
@pytest.mark.usefixtures("fanleshow") | ||
class TestFanLeshow(TestCase): | ||
def is_on(self): | ||
return self.device.status().is_on | ||
|
||
def state(self): | ||
return self.device.status() | ||
|
||
def test_on(self): | ||
self.device.off() # ensure off | ||
assert self.is_on() is False | ||
|
||
self.device.on() | ||
assert self.is_on() is True | ||
|
||
def test_off(self): | ||
self.device.on() # ensure on | ||
assert self.is_on() is True | ||
|
||
self.device.off() | ||
assert self.is_on() is False | ||
|
||
def test_status(self): | ||
self.device._reset_state() | ||
|
||
assert repr(self.state()) == repr(FanLeshowStatus(self.device.start_state)) | ||
|
||
assert self.is_on() is True | ||
assert self.state().mode == OperationMode(self.device.start_state["mode"]) | ||
assert self.state().speed == self.device.start_state["blow"] | ||
assert self.state().buzzer is (self.device.start_state["sound"] == 1) | ||
assert self.state().oscillate is (self.device.start_state["yaw"] == 1) | ||
assert self.state().delay_off_countdown == self.device.start_state["timer"] | ||
assert self.state().error_detected is (self.device.start_state["fault"] == 1) | ||
|
||
def test_set_speed(self): | ||
def speed(): | ||
return self.device.status().speed | ||
|
||
self.device.set_speed(0) | ||
assert speed() == 0 | ||
self.device.set_speed(1) | ||
assert speed() == 1 | ||
self.device.set_speed(100) | ||
assert speed() == 100 | ||
|
||
with pytest.raises(FanLeshowException): | ||
self.device.set_speed(-1) | ||
|
||
with pytest.raises(FanLeshowException): | ||
self.device.set_speed(101) | ||
|
||
def test_set_oscillate(self): | ||
def oscillate(): | ||
return self.device.status().oscillate | ||
|
||
self.device.set_oscillate(True) | ||
assert oscillate() is True | ||
|
||
self.device.set_oscillate(False) | ||
assert oscillate() is False | ||
|
||
def test_set_buzzer(self): | ||
def buzzer(): | ||
return self.device.status().buzzer | ||
|
||
self.device.set_buzzer(True) | ||
assert buzzer() is True | ||
|
||
self.device.set_buzzer(False) | ||
assert buzzer() is False | ||
|
||
def test_delay_off(self): | ||
def delay_off_countdown(): | ||
return self.device.status().delay_off_countdown | ||
|
||
self.device.delay_off(100) | ||
assert delay_off_countdown() == 100 | ||
self.device.delay_off(200) | ||
assert delay_off_countdown() == 200 | ||
self.device.delay_off(0) | ||
assert delay_off_countdown() == 0 | ||
|
||
with pytest.raises(FanLeshowException): | ||
self.device.delay_off(-1) | ||
|
||
with pytest.raises(FanLeshowException): | ||
self.device.delay_off(1000) |