diff --git a/miio/__init__.py b/miio/__init__.py index 2bc23a199..1a7b25b2f 100644 --- a/miio/__init__.py +++ b/miio/__init__.py @@ -33,6 +33,7 @@ from miio.device import Device from miio.exceptions import DeviceError, DeviceException from miio.fan import Fan, FanP5, FanSA1, FanV2, FanZA1, FanZA4 +from miio.fan_leshow import FanLeshow from miio.fan_miot import FanMiot, FanP9, FanP10, FanP11 from miio.gateway import Gateway from miio.heater import Heater diff --git a/miio/fan_leshow.py b/miio/fan_leshow.py new file mode 100644 index 000000000..4ee4d56c4 --- /dev/null +++ b/miio/fan_leshow.py @@ -0,0 +1,217 @@ +import enum +import logging +from typing import Any, Dict + +import click + +from .click_common import EnumType, command, format_output +from .device import Device +from .exceptions import DeviceException + +_LOGGER = logging.getLogger(__name__) + +MODEL_FAN_LESHOW_SS4 = "leshow.fan.ss4" + +AVAILABLE_PROPERTIES_COMMON = [ + "power", + "mode", + "blow", + "timer", + "sound", + "yaw", + "fault", +] + +AVAILABLE_PROPERTIES = { + MODEL_FAN_LESHOW_SS4: AVAILABLE_PROPERTIES_COMMON, +} + + +class FanLeshowException(DeviceException): + pass + + +class OperationMode(enum.Enum): + Manual = 0 + Sleep = 1 + Strong = 2 + Natural = 3 + + +class FanLeshowStatus: + """Container for status reports from the Xiaomi Rosou SS4 Ventilator.""" + + def __init__(self, data: Dict[str, Any]) -> None: + """ + Response of a Leshow Fan SS4 (): + + {'power': 1, 'mode': 2, 'blow': 100, 'timer': 0, + 'sound': 1, 'yaw': 0, 'fault': 0} + + """ + self.data = data + + @property + def power(self) -> str: + """Power state.""" + return "on" if self.data["power"] == 1 else "off" + + @property + def is_on(self) -> bool: + """True if device is turned on.""" + return self.data["power"] == 1 + + @property + def mode(self) -> OperationMode: + """Operation mode. Can be either 0, 1, 2 or 3.""" + return OperationMode(self.data["mode"]) + + @property + def speed(self) -> int: + """Speed of the fan in percent.""" + return self.data["blow"] + + @property + def buzzer(self) -> bool: + """True if buzzer is turned on.""" + return self.data["sound"] == 1 + + @property + def oscillate(self) -> bool: + """True if oscillation is enabled.""" + return self.data["yaw"] == 1 + + @property + def delay_off_countdown(self) -> int: + """Countdown until turning off in minutes.""" + return self.data["timer"] + + @property + def error_detected(self) -> bool: + """True if a fault was detected.""" + return self.data["fault"] == 1 + + def __repr__(self) -> str: + s = ( + "" + % ( + self.power, + self.mode, + self.speed, + self.buzzer, + self.oscillate, + self.delay_off_countdown, + self.error_detected, + ) + ) + return s + + +class FanLeshow(Device): + """Main class representing the Xiaomi Rosou SS4 Ventilator.""" + + def __init__( + self, + ip: str = None, + token: str = None, + start_id: int = 0, + debug: int = 0, + lazy_discover: bool = True, + model: str = MODEL_FAN_LESHOW_SS4, + ) -> None: + super().__init__(ip, token, start_id, debug, lazy_discover) + + if model in AVAILABLE_PROPERTIES: + self.model = model + else: + self.model = MODEL_FAN_LESHOW_SS4 + + @command( + default_output=format_output( + "", + "Power: {result.power}\n" + "Mode: {result.mode}\n" + "Speed: {result.speed}\n" + "Buzzer: {result.buzzer}\n" + "Oscillate: {result.oscillate}\n" + "Power-off time: {result.delay_off_countdown}\n" + "Error detected: {result.error_detected}\n", + ) + ) + def status(self) -> FanLeshowStatus: + """Retrieve properties.""" + properties = AVAILABLE_PROPERTIES[self.model] + values = self.get_properties(properties, max_properties=15) + + return FanLeshowStatus(dict(zip(properties, values))) + + @command(default_output=format_output("Powering on")) + def on(self): + """Power on.""" + return self.send("set_power", [1]) + + @command(default_output=format_output("Powering off")) + def off(self): + """Power off.""" + return self.send("set_power", [0]) + + @command( + click.argument("mode", type=EnumType(OperationMode)), + default_output=format_output("Setting mode to '{mode.value}'"), + ) + def set_mode(self, mode: OperationMode): + """Set mode.""" + return self.send("set_mode", [mode.value]) + + @command( + click.argument("speed", type=int), + default_output=format_output("Setting speed of the manual mode to {speed}"), + ) + def set_speed(self, speed: int): + """Set natural level.""" + if speed < 0 or speed > 100: + raise FanLeshowException("Invalid speed: %s" % speed) + + return self.send("set_blow", [speed]) + + @command( + click.argument("oscillate", type=bool), + default_output=format_output( + lambda oscillate: "Turning on oscillate" + if oscillate + else "Turning off oscillate" + ), + ) + def set_oscillate(self, oscillate: bool): + """Set oscillate on/off.""" + return self.send("set_yaw", [int(oscillate)]) + + @command( + click.argument("buzzer", type=bool), + default_output=format_output( + lambda buzzer: "Turning on buzzer" if buzzer else "Turning off buzzer" + ), + ) + def set_buzzer(self, buzzer: bool): + """Set buzzer on/off.""" + return self.send("set_sound", [int(buzzer)]) + + @command( + click.argument("minutes", type=int), + default_output=format_output("Setting delayed turn off to {minutes} minutes"), + ) + def delay_off(self, minutes: int): + """Set delay off minutes.""" + + if minutes < 0 or minutes > 999: + raise FanLeshowException( + "Invalid value for a delayed turn off: %s" % minutes + ) + + return self.send("set_timer", [minutes]) diff --git a/miio/tests/test_fan_leshow.py b/miio/tests/test_fan_leshow.py new file mode 100644 index 000000000..e918f919f --- /dev/null +++ b/miio/tests/test_fan_leshow.py @@ -0,0 +1,133 @@ +from unittest import TestCase + +import pytest + +from miio import FanLeshow +from miio.fan_leshow import ( + MODEL_FAN_LESHOW_SS4, + FanLeshowException, + FanLeshowStatus, + OperationMode, +) + +from .dummies import DummyDevice + + +class DummyFanLeshow(DummyDevice, FanLeshow): + def __init__(self, *args, **kwargs): + self.model = MODEL_FAN_LESHOW_SS4 + self.state = { + "power": 1, + "mode": 2, + "blow": 100, + "timer": 0, + "sound": 1, + "yaw": 0, + "fault": 0, + } + self.return_values = { + "get_prop": self._get_state, + "set_power": lambda x: self._set_state("power", x), + "set_mode": lambda x: self._set_state("mode", x), + "set_blow": lambda x: self._set_state("blow", x), + "set_timer": lambda x: self._set_state("timer", x), + "set_sound": lambda x: self._set_state("sound", x), + "set_yaw": lambda x: self._set_state("yaw", x), + } + super().__init__(args, kwargs) + + +@pytest.fixture(scope="class") +def fanleshow(request): + request.cls.device = DummyFanLeshow() + # TODO add ability to test on a real device + + +@pytest.mark.usefixtures("fanleshow") +class TestFanLeshow(TestCase): + def is_on(self): + return self.device.status().is_on + + def state(self): + return self.device.status() + + def test_on(self): + self.device.off() # ensure off + assert self.is_on() is False + + self.device.on() + assert self.is_on() is True + + def test_off(self): + self.device.on() # ensure on + assert self.is_on() is True + + self.device.off() + assert self.is_on() is False + + def test_status(self): + self.device._reset_state() + + assert repr(self.state()) == repr(FanLeshowStatus(self.device.start_state)) + + assert self.is_on() is True + assert self.state().mode == OperationMode(self.device.start_state["mode"]) + assert self.state().speed == self.device.start_state["blow"] + assert self.state().buzzer is (self.device.start_state["sound"] == 1) + assert self.state().oscillate is (self.device.start_state["yaw"] == 1) + assert self.state().delay_off_countdown == self.device.start_state["timer"] + assert self.state().error_detected is (self.device.start_state["fault"] == 1) + + def test_set_speed(self): + def speed(): + return self.device.status().speed + + self.device.set_speed(0) + assert speed() == 0 + self.device.set_speed(1) + assert speed() == 1 + self.device.set_speed(100) + assert speed() == 100 + + with pytest.raises(FanLeshowException): + self.device.set_speed(-1) + + with pytest.raises(FanLeshowException): + self.device.set_speed(101) + + def test_set_oscillate(self): + def oscillate(): + return self.device.status().oscillate + + self.device.set_oscillate(True) + assert oscillate() is True + + self.device.set_oscillate(False) + assert oscillate() is False + + def test_set_buzzer(self): + def buzzer(): + return self.device.status().buzzer + + self.device.set_buzzer(True) + assert buzzer() is True + + self.device.set_buzzer(False) + assert buzzer() is False + + def test_delay_off(self): + def delay_off_countdown(): + return self.device.status().delay_off_countdown + + self.device.delay_off(100) + assert delay_off_countdown() == 100 + self.device.delay_off(200) + assert delay_off_countdown() == 200 + self.device.delay_off(0) + assert delay_off_countdown() == 0 + + with pytest.raises(FanLeshowException): + self.device.delay_off(-1) + + with pytest.raises(FanLeshowException): + self.device.delay_off(1000)