From 4dd3cd8469355cefe6a8793450f9ff5227e10a81 Mon Sep 17 00:00:00 2001 From: starkillerOG Date: Thu, 14 Jul 2022 17:00:30 +0200 Subject: [PATCH] Add push server implementation to enable event handling (#1446) Co-authored-by: Teemu Rytilahti --- .../push_server/gateway_alarm_trigger.py | 44 +++ .../push_server/gateway_button_press.py | 50 +++ docs/index.rst | 1 + docs/push_server.rst | 224 +++++++++++++ miio/__init__.py | 1 + miio/push_server/__init__.py | 6 + miio/push_server/eventinfo.py | 27 ++ miio/push_server/server.py | 294 ++++++++++++++++++ miio/push_server/serverprotocol.py | 126 ++++++++ 9 files changed, 773 insertions(+) create mode 100644 docs/examples/push_server/gateway_alarm_trigger.py create mode 100644 docs/examples/push_server/gateway_button_press.py create mode 100644 docs/push_server.rst create mode 100644 miio/push_server/__init__.py create mode 100644 miio/push_server/eventinfo.py create mode 100644 miio/push_server/server.py create mode 100644 miio/push_server/serverprotocol.py diff --git a/docs/examples/push_server/gateway_alarm_trigger.py b/docs/examples/push_server/gateway_alarm_trigger.py new file mode 100644 index 000000000..72d1e0dd2 --- /dev/null +++ b/docs/examples/push_server/gateway_alarm_trigger.py @@ -0,0 +1,44 @@ +import asyncio +import logging + +from miio import Gateway, PushServer +from miio.push_server import EventInfo + +_LOGGER = logging.getLogger(__name__) +logging.basicConfig(level="INFO") + +gateway_ip = "192.168.1.IP" +token = "TokenTokenToken" # nosec + + +async def asyncio_demo(loop): + def alarm_callback(source_device, action, params): + _LOGGER.info( + "callback '%s' from '%s', params: '%s'", action, source_device, params + ) + + push_server = PushServer(gateway_ip) + gateway = Gateway(gateway_ip, token) + + await push_server.start() + + push_server.register_miio_device(gateway, alarm_callback) + + event_info = EventInfo( + action="alarm_triggering", + extra="[1,19,1,111,[0,1],2,0]", + trigger_token=gateway.token, + ) + + await loop.run_in_executor(None, push_server.subscribe_event, gateway, event_info) + + _LOGGER.info("Listening") + + await asyncio.sleep(30) + + push_server.stop() + + +if __name__ == "__main__": + loop = asyncio.get_event_loop() + loop.run_until_complete(asyncio_demo(loop)) diff --git a/docs/examples/push_server/gateway_button_press.py b/docs/examples/push_server/gateway_button_press.py new file mode 100644 index 000000000..d4eac3047 --- /dev/null +++ b/docs/examples/push_server/gateway_button_press.py @@ -0,0 +1,50 @@ +import asyncio +import logging + +from miio import Gateway, PushServer +from miio.push_server import EventInfo + +_LOGGER = logging.getLogger(__name__) +logging.basicConfig(level="INFO") + +gateway_ip = "192.168.1.IP" +token = "TokenTokenToken" # nosec +button_sid = "lumi.123456789abcdef" + + +async def asyncio_demo(loop): + def subdevice_callback(source_device, action, params): + _LOGGER.info( + "callback '%s' from '%s', params: '%s'", action, source_device, params + ) + + push_server = PushServer(gateway_ip) + gateway = Gateway(gateway_ip, token) + + await push_server.start() + + push_server.register_miio_device(gateway, subdevice_callback) + + await loop.run_in_executor(None, gateway.discover_devices) + + button = gateway.devices[button_sid] + + event_info = EventInfo( + action="click_ch0", + extra="[1,13,1,85,[0,1],0,0]", + source_sid=button.sid, + source_model=button.zigbee_model, + ) + + await loop.run_in_executor(None, push_server.subscribe_event, gateway, event_info) + + _LOGGER.info("Listening") + + await asyncio.sleep(30) + + push_server.stop() + + +if __name__ == "__main__": + loop = asyncio.get_event_loop() + loop.run_until_complete(asyncio_demo(loop)) diff --git a/docs/index.rst b/docs/index.rst index d365ccb7b..dbd883d34 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -30,5 +30,6 @@ who have helped to extend this to cover not only the vacuum cleaner. troubleshooting contributing device_docs/index + push_server API diff --git a/docs/push_server.rst b/docs/push_server.rst new file mode 100644 index 000000000..8102d4707 --- /dev/null +++ b/docs/push_server.rst @@ -0,0 +1,224 @@ +Push Server +=========== + +The package provides a push server to act on events from devices, +such as those from Zigbee devices connected to a gateway device. +The server itself acts as a miio device receiving the events it has :ref:`subscribed to receive`, +and calling the registered callbacks accordingly. + +.. note:: + + While the eventing has been so far tested only on gateway devices, other devices that allow scene definitions on the + mobile app may potentially support this functionality. See :ref:`how to obtain event information` for details + how to check if your target device supports this functionality. + + +1. The push server is started and listens for incoming messages (:meth:`PushServer.start`) +2. A miio device and its callback needs to be registered to the push server (:meth:`PushServer.register_miio_device`). +3. A message is sent to the miio device to subscribe a specific event to the push server, + basically a local scene is made with as target the push server (:meth:`PushServer.subscribe_event`). +4. The device will start keep alive communication with the push server (pings). +5. When the device triggers an event (e.g., a button is pressed), + the push server gets notified by the device and executes the registered callback. + + +Events +------ + +Events are the triggers for a scene in the mobile app. +Most triggers that can be used in the mobile app can be converted to a event that can be registered to the push server. +For example: pressing a button, opening a door-sensor, motion being detected, vibrating a sensor or flipping a cube. +When such a event happens, +the miio device will immediately send a message to to push server, +which will identify the sender and execute its callback function. +The callback function can be used to act on the event, +for instance when motion is detected turn on the light. + +Callbacks +--------- + +Gateway-like devices will have a single callback for all connected Zigbee devices. +The `source_device` argument is set to the device that caused the event e.g. "lumi.123456789abcdef". + +Multiple events of the same device can be subscribed to, for instance both opening and closing a door-sensor. +The `action` argument is set to the action e.g., "open" or "close" , +that was defined in the :class:`PushServer.EventInfo` used for subscribing to the event. + +Lastly, the `params` argument provides additional information about the event, if available. + +Therefore, the callback functions need to have the following signature: + +.. code-block:: + + def callback(source_device, action, params): + + +.. _events_subscribe: + +Subscribing to Events +~~~~~~~~~~~~~~~~~~~~~ +In order to subscribe to a event a few steps need to be taken, +we assume that a device class has already been initialized to which the events belong: + +1. Create a push server instance: + +:: + + server = PushServer(miio_device.ip) + +.. note:: + + The server needs an IP address of a real, working miio device as it connects to it to find the IP address to bind on. + +2. Start the server: + +:: + + await push_server.start() + +3. Define a callback function: + +:: + + def callback_func(source_device, action, params): + _LOGGER.info("callback '%s' from '%s', params: '%s'", action, source_device, params) + +4. Register the miio device to the server and its callback function to receive events from this device: + +:: + + push_server.register_miio_device(miio_device, callback_func) + +5. Create an :class:`PushServer.EventInfo` (:ref:`how to obtain event info`) + object with the event to subscribe to: + +:: + + event_info = EventInfo( + action="alarm_triggering", + extra="[1,19,1,111,[0,1],2,0]", + trigger_token=miio_device.token, + ) + +6. Send a message to the device to subscribe for the event to receive messages on the push_server: + +:: + + push_server.subscribe_event(miio_device, event_info) + +7. The callback function should now be called whenever a matching event occurs. + +8. You should stop the server when you are done with it. + This will automatically inform all devices with event subscriptions + to stop sending more events to the server. + +:: + + push_server.stop() + + +.. _obtain_event_info: + +Obtaining Event Information +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +When you want to support a new type of event in python-miio, +you need to first perform a packet capture of the mobile Xiaomi Home app +to retrieve the necessary information for that event. + +1. Prepare your system to capture traffic between the gateway device and your mobile phone. You can, for example, use `BlueStacks emulator `_ to run the Xiaomi Home app, and `WireShark `_ to capture the network traffic. +2. In the Xiaomi Home app go to `Scene` --> `+` --> for "If" select the device for which you want to make the new event +3. Select the event you want to add +4. For "Then" select the same gateway as the Zigbee device is connected to (or the gateway itself). +5. Select the any action, e.g., "Control nightlight" --> "Switch gateway light color", + and click the finish checkmark and accept the default name. +6. Repeat the steps 3-5 for all new events you want to implement. +7. After you are done, you can remove the created scenes from the app and stop the traffic capture. +8. You can use `devtools/parse_pcap.py` script to parse the captured PCAP files. + +:: + + python devtools/parse_pcap.py --token + + +.. note:: + + Note, you can repeat `--token` parameter to list all tokens you know to decrypt traffic from all devices: + +10. You should now see the decoded communication of between the Xiaomi Home app and your gateway. +11. You should see packets like the following in the output, + the most important information is stored under the `data` key: + +:: + + { + "id" : 1234, + "method" : "send_data_frame", + "params" : { + "cur" : 0, + "data" : "[[\"x.scene.1234567890\",[\"1.0\",1234567890,[\"0\",{\"src\":\"device\",\"key\":\"event.lumi.sensor_magnet.aq2.open\",\"did\":\"lumi.123456789abcde\",\"model\":\"lumi.sensor_magnet.aq2\",\"token\":\"\",\"extra\":\"[1,6,1,0,[0,1],2,0]\",\"timespan\":[\"0 0 * * 0,1,2,3,4,5,6\",\"0 0 * * 0,1,2,3,4,5,6\"]}],[{\"command\":\"lumi.gateway.v3.set_rgb\",\"did\":\"12345678\",\"extra\":\"[1,19,7,85,[40,123456],0,0]\",\"id\":1,\"ip\":\"192.168.1.IP\",\"model\":\"lumi.gateway.v3\",\"token\":\"encrypted0token0we0need000000000\",\"value\":123456}]]]]", + "data_tkn" : 12345, + "total" : 1, + "type" : "scene" + } + } + + +12. Now, extract the necessary information form the packet capture to create :class:`PushServer.EventInfo` objects. + +13. Locate the element containing `"key": "event.*"` in the trace, + this is the event triggering the command in the trace. + The `action` of the `EventInfo` is normally the last part of the `key` value, e.g., + `open` (from `event.lumi.sensor_magnet.aq2.open`) in the example above. + +14. The `extra` parameter is the most important piece containing the event details, + which you can directly copy from the packet capture. + +:: + + event_info = EventInfo( + action="open", + extra="[1,6,1,0,[0,1],2,0]", + ) + + +.. note:: + + The `action` is an user friendly name of the event, can be set arbitrarily and will be received by the server as the name of the event. + The `extra` is the identification of the event. + +Most times this information will be enough, however the :class:`miio.EventInfo` class allows for additional information. +For example, on Zigbee sub-devices you also need to define `source_sid` and `source_model`, +see :ref:`button press <_button_press_example>` for an example. +See the :class:`PushServer.EventInfo` for more detailed documentation. + + +Examples +-------- + +Gateway alarm trigger +~~~~~~~~~~~~~~~~~~~~~ + +The following example shows how to create a push server and make it to listen for alarm triggers from a gateway device. +This is proper async python code that can be executed as a script. + + +.. literalinclude:: examples/push_server/gateway_alarm_trigger.py + :language: python + + + +.. _button_press_example: + +Button press +~~~~~~~~~~~~ + +The following examples shows a more complex use case of acting on button presses of Aqara Zigbee button. +Since the source device (the button) differs from the communicating device (the gateway), +some additional parameters are needed for the :class:`PushServer.EventInfo`: `source_sid` and `source_model`. + +.. literalinclude:: examples/push_server/gateway_button_press.py + :language: python + + +:py:class:`API ` diff --git a/miio/__init__.py b/miio/__init__.py index 0e50a293b..dad82c50c 100644 --- a/miio/__init__.py +++ b/miio/__init__.py @@ -78,6 +78,7 @@ ) from miio.powerstrip import PowerStrip from miio.protocol import Message, Utils +from miio.push_server import EventInfo, PushServer from miio.pwzn_relay import PwznRelay from miio.scishare_coffeemaker import ScishareCoffee from miio.toiletlid import Toiletlid diff --git a/miio/push_server/__init__.py b/miio/push_server/__init__.py new file mode 100644 index 000000000..c8e93fd53 --- /dev/null +++ b/miio/push_server/__init__.py @@ -0,0 +1,6 @@ +"""Async UDP push server acting as a fake miio device to handle event notifications from +other devices.""" + +# flake8: noqa +from .eventinfo import EventInfo +from .server import PushServer diff --git a/miio/push_server/eventinfo.py b/miio/push_server/eventinfo.py new file mode 100644 index 000000000..818105760 --- /dev/null +++ b/miio/push_server/eventinfo.py @@ -0,0 +1,27 @@ +from typing import Any, Optional + +import attr + + +@attr.s(auto_attribs=True) +class EventInfo: + """Event info to register to the push server. + + action: user friendly name of the event, can be set arbitrarily and will be received by the server as the name of the event. + extra: the identification of this event, this determines on what event the callback is triggered. + event: defaults to the action. + command_extra: will be received by the push server, hopefully this will allow us to obtain extra information about the event for instance the vibration intesisty or light level that triggered the event (still experimental). + trigger_value: Only needed if the trigger has a certain threshold value (like a temperature for a wheather sensor), a "value" key will be present in the first part of a scene packet capture. + trigger_token: Only needed for protected events like the alarm feature of a gateway, equal to the "token" of the first part of of a scene packet caputure. + source_sid: Normally not needed and obtained from device, only needed for zigbee devices: the "did" key. + source_model: Normally not needed and obtained from device, only needed for zigbee devices: the "model" key. + """ + + action: str + extra: str + event: Optional[str] = None + command_extra: str = "" + trigger_value: Optional[Any] = None + trigger_token: str = "" + source_sid: Optional[str] = None + source_model: Optional[str] = None diff --git a/miio/push_server/server.py b/miio/push_server/server.py new file mode 100644 index 000000000..eb6409a16 --- /dev/null +++ b/miio/push_server/server.py @@ -0,0 +1,294 @@ +import asyncio +import logging +import socket +from json import dumps +from random import randint + +from ..device import Device +from ..protocol import Utils +from .eventinfo import EventInfo +from .serverprotocol import ServerProtocol + +_LOGGER = logging.getLogger(__name__) + +SERVER_PORT = 54321 +FAKE_DEVICE_ID = "120009025" +FAKE_DEVICE_MODEL = "chuangmi.plug.v3" + + +def calculated_token_enc(token): + token_bytes = bytes.fromhex(token) + encrypted_token = Utils.encrypt(token_bytes, token_bytes) + encrypted_token_hex = encrypted_token.hex() + return encrypted_token_hex[0:32] + + +class PushServer: + """Async UDP push server acting as a fake miio device to handle event notifications + from other devices. + + Assuming you already have a miio_device class initialized: + + # First create the push server + push_server = PushServer(miio_device.ip) + # Then start the server + await push_server.start() + # Register the miio device to the server and specify a callback function to receive events for this device + # The callback function schould have the form of "def callback_func(source_device, action, params):" + push_server.register_miio_device(miio_device, callback_func) + # create a EventInfo object with the information about the event you which to subscribe to (information taken from packet captures of automations in the mi home app) + event_info = EventInfo( + action="alarm_triggering", + extra="[1,19,1,111,[0,1],2,0]", + trigger_token=miio_device.token, + ) + # Send a message to the miio_device to subscribe for the event to receive messages on the push_server + await loop.run_in_executor(None, push_server.subscribe_event, miio_device, event_info) + # Now you will see the callback function beeing called whenever the event occurs + await asyncio.sleep(30) + # When done stop the push_server, this will send messages to all subscribed miio_devices to unsubscribe all events + push_server.stop() + """ + + def __init__(self, device_ip): + """Initialize the class.""" + self._device_ip = device_ip + + self._address = "0.0.0.0" # nosec + self._server_ip = None + self._server_id = int(FAKE_DEVICE_ID) + self._server_model = FAKE_DEVICE_MODEL + + self._listen_couroutine = None + self._registered_devices = {} + + self._event_id = 1000000 + + async def start(self): + """Start Miio push server.""" + if self._listen_couroutine is not None: + _LOGGER.error("Miio push server already started, not starting another one.") + return + + listen_task = self._create_udp_server() + _, self._listen_couroutine = await listen_task + + def stop(self): + """Stop Miio push server.""" + if self._listen_couroutine is None: + return + + for ip in list(self._registered_devices): + self.unregister_miio_device(self._registered_devices[ip]["device"]) + + self._listen_couroutine.close() + self._listen_couroutine = None + + def register_miio_device(self, device: Device, callback): + """Register a miio device to this push server.""" + if device.ip is None: + _LOGGER.error( + "Can not register miio device to push server since it has no ip" + ) + return + if device.token is None: + _LOGGER.error( + "Can not register miio device to push server since it has no token" + ) + return + + event_ids = [] + if device.ip in self._registered_devices: + _LOGGER.error( + "A device for ip '%s' was already registed, overwriting previous callback", + device.ip, + ) + event_ids = self._registered_devices[device.ip]["event_ids"] + + self._registered_devices[device.ip] = { + "callback": callback, + "token": bytes.fromhex(device.token), + "event_ids": event_ids, + "device": device, + } + + def unregister_miio_device(self, device: Device): + """Unregister a miio device from this push server.""" + device_info = self._registered_devices.get(device.ip) + if device_info is None: + _LOGGER.debug("Device with ip %s not registered, bailing out", device.ip) + return + + for event_id in device_info["event_ids"]: + self.unsubscribe_event(device, event_id) + self._registered_devices.pop(device.ip) + _LOGGER.debug("push server: unregistered miio device with ip %s", device.ip) + + def subscribe_event(self, device: Device, event_info: EventInfo): + """Subscribe to a event such that the device will start pushing data for that + event.""" + if device.ip not in self._registered_devices: + _LOGGER.error("Can not subscribe event, miio device not yet registered") + return None + + if self.server_ip is None: + _LOGGER.error("Can not subscribe event withouth starting the push server") + return None + + self._event_id = self._event_id + 1 + event_id = f"x.scene.{self._event_id}" + + event_payload = self._construct_event(event_id, event_info, device) + + response = device.send( + "send_data_frame", + { + "cur": 0, + "data": event_payload, + "data_tkn": 29576, + "total": 1, + "type": "scene", + }, + ) + + if response != ["ok"]: + _LOGGER.error( + "Error subscribing event, response %s, event_payload %s", + response, + event_payload, + ) + return None + + event_ids = self._registered_devices[device.ip]["event_ids"] + event_ids.append(event_id) + + return event_id + + def unsubscribe_event(self, device: Device, event_id): + """Unsubscribe from a event by id.""" + result = device.send("miIO.xdel", [event_id]) + if result == ["ok"]: + event_ids = self._registered_devices[device.ip]["event_ids"] + if event_id in event_ids: + event_ids.remove(event_id) + else: + _LOGGER.error("Error removing event_id %s: %s", event_id, result) + + return result + + def _get_server_ip(self): + """Connect to the miio device to get server_ip using a one time use socket.""" + get_ip_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) + get_ip_socket.bind((self._address, SERVER_PORT)) + get_ip_socket.connect((self._device_ip, SERVER_PORT)) + server_ip = get_ip_socket.getsockname()[0] + get_ip_socket.close() + _LOGGER.debug("Miio push server device ip=%s", server_ip) + return server_ip + + def _create_udp_server(self): + """Create the UDP socket and protocol.""" + self._server_ip = self._get_server_ip() + + # Create a fresh socket that will be used for the push server + udp_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) + udp_socket.bind((self._address, SERVER_PORT)) + + loop = asyncio.get_event_loop() + + return loop.create_datagram_endpoint( + lambda: ServerProtocol(loop, udp_socket, self), + sock=udp_socket, + ) + + def _construct_event( # nosec + self, + event_id, + info: EventInfo, + device: Device, + ): + """Construct the event data payload needed to subscribe to an event.""" + if info.event is None: + info.event = info.action + if info.source_sid is None: + info.source_sid = str(device.device_id) + if info.source_model is None: + info.source_model = device.model + + token_enc = calculated_token_enc(device.token) + source_id = info.source_sid.replace(".", "_") + command = f"{self.server_model}.{info.action}:{source_id}" + key = f"event.{info.source_model}.{info.event}" + message_id = 0 + magic_number = randint( + 1590161094, 1642025774 + ) # nosec, min/max taken from packet captures, unknown use + + if len(command) > 49: + _LOGGER.error( + "push server event command can be max 49 chars long," + " '%s' is %i chars, received callback command will be truncated", + command, + len(command), + ) + + trigger_data = { + "did": info.source_sid, + "extra": info.extra, + "key": key, + "model": info.source_model, + "src": "device", + "timespan": [ + "0 0 * * 0,1,2,3,4,5,6", + "0 0 * * 0,1,2,3,4,5,6", + ], + "token": info.trigger_token, + } + + if info.trigger_value is not None: + trigger_data["value"] = info.trigger_value + + target_data = { + "command": command, + "did": str(self.server_id), + "extra": info.command_extra, + "id": message_id, + "ip": self.server_ip, + "model": self.server_model, + "token": token_enc, + "value": "", + } + + event_data = [ + [ + event_id, + [ + "1.0", + magic_number, + [ + "0", + trigger_data, + ], + [target_data], + ], + ] + ] + + event_payload = dumps(event_data, separators=(",", ":")) + + return event_payload + + @property + def server_ip(self): + """Return the IP of the device running this server.""" + return self._server_ip + + @property + def server_id(self): + """Return the ID of the fake device beeing emulated.""" + return self._server_id + + @property + def server_model(self): + """Return the model of the fake device beeing emulated.""" + return self._server_model diff --git a/miio/push_server/serverprotocol.py b/miio/push_server/serverprotocol.py new file mode 100644 index 000000000..75b82ac13 --- /dev/null +++ b/miio/push_server/serverprotocol.py @@ -0,0 +1,126 @@ +import calendar +import datetime +import logging +import struct + +from ..protocol import Message + +_LOGGER = logging.getLogger(__name__) + +HELO_BYTES = bytes.fromhex( + "21310020ffffffffffffffffffffffffffffffffffffffffffffffffffffffff" +) + + +class ServerProtocol: + """Handle responding to UDP packets.""" + + def __init__(self, loop, udp_socket, server): + """Initialize the class.""" + self.transport = None + self._loop = loop + self._sock = udp_socket + self.server = server + self._connected = False + + def _build_ack(self): + # Original devices are using year 1970, but it seems current datetime is fine + timestamp = calendar.timegm(datetime.datetime.now().timetuple()) + # ACK packet not signed, 16 bytes header + 16 bytes of zeroes + return struct.pack( + ">HHIII16s", 0x2131, 32, 0, self.server.server_id, timestamp, bytes(16) + ) + + def connection_made(self, transport): + """Set the transport.""" + self.transport = transport + self._connected = True + _LOGGER.info( + "Miio push server started with address=%s server_id=%s", + self.server._address, + self.server.server_id, + ) + + def connection_lost(self, exc): + """Handle connection lost.""" + if self._connected: + _LOGGER.error("Connection unexpectedly lost in Miio push server: %s", exc) + + def send_ping_ACK(self, host, port): + _LOGGER.debug("%s:%s=>PING", host, port) + m = self._build_ack() + self.transport.sendto(m, (host, port)) + _LOGGER.debug("%s:%s<=ACK(server_id=%s)", host, port, self.server.server_id) + + def send_msg_OK(self, host, port, msg_id, token): + # This result means OK, but some methods return ['ok'] instead of 0 + # might be necessary to use different results for different methods + result = {"result": 0, "id": msg_id} + header = { + "length": 0, + "unknown": 0, + "device_id": self.server.server_id, + "ts": datetime.datetime.now(), + } + msg = { + "data": {"value": result}, + "header": {"value": header}, + "checksum": 0, + } + response = Message.build(msg, token=token) + self.transport.sendto(response, (host, port)) + _LOGGER.debug(">> %s:%s: %s", host, port, result) + + def datagram_received(self, data, addr): + """Handle received messages.""" + try: + (host, port) = addr + if data == HELO_BYTES: + self.send_ping_ACK(host, port) + return + + if host not in self.server._registered_devices: + _LOGGER.warning( + "Datagram received from unknown device (%s:%s)", + host, + port, + ) + return + + token = self.server._registered_devices[host]["token"] + callback = self.server._registered_devices[host]["callback"] + + msg = Message.parse(data, token=token) + msg_value = msg.data.value + msg_id = msg_value["id"] + _LOGGER.debug("<< %s:%s: %s", host, port, msg_value) + + # Parse message + action, device_call_id = msg_value["method"].rsplit(":", 1) + source_device_id = device_call_id.replace("_", ".") + + callback(source_device_id, action, msg_value.get("params")) + + # Send OK + self.send_msg_OK(host, port, msg_id, token) + + except Exception: + _LOGGER.exception( + "Cannot process Miio push server packet: '%s' from %s:%s", + data, + host, + port, + ) + + def error_received(self, exc): + """Log UDP errors.""" + _LOGGER.error("UDP error received in Miio push server: %s", exc) + + def close(self): + """Stop the server.""" + _LOGGER.debug("Miio push server shutting down") + self._connected = False + if self.transport: + self.transport.close() + self._sock.close() + _LOGGER.info("Miio push server stopped")