-
-
Notifications
You must be signed in to change notification settings - Fork 561
/
protocol.py
242 lines (203 loc) · 8.07 KB
/
protocol.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
"""miIO protocol implementation.
This module contains the implementation of the routines to encrypt and decrypt
miIO payloads with a device-specific token.
The payloads to be encrypted (to be passed to a device) are expected to be
JSON objects, the same applies for decryption where they are converted
automatically to JSON objects.
If the decryption fails, raw bytes as returned by the device are returned.
An usage example can be seen in the source of :func:`miio.Device.send`.
If the decryption fails, raw bytes as returned by the device are returned.
"""
import calendar
import datetime
import hashlib
import json
import logging
from typing import Any, Union
from construct import (
Adapter,
Bytes,
Checksum,
Const,
Default,
GreedyBytes,
Hex,
IfThenElse,
Int16ub,
Int32ub,
Pointer,
RawCopy,
Rebuild,
Struct,
)
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from miio.exceptions import PayloadDecodeException
_LOGGER = logging.getLogger(__name__)
class Utils:
"""This class is adapted from the original xpn.py code by gst666."""
@staticmethod
def verify_token(token: bytes):
"""Checks if the given token is of correct type and length."""
if not isinstance(token, bytes):
raise TypeError("Token must be bytes")
if len(token) != 16:
raise ValueError("Wrong token length")
@staticmethod
def md5(data: bytes) -> bytes:
"""Calculates a md5 hashsum for the given bytes object."""
checksum = hashlib.md5() # nosec
checksum.update(data)
return checksum.digest()
@staticmethod
def key_iv(token: bytes) -> tuple[bytes, bytes]:
"""Generate an IV used for encryption based on given token."""
key = Utils.md5(token)
iv = Utils.md5(key + token)
return key, iv
@staticmethod
def encrypt(plaintext: bytes, token: bytes) -> bytes:
"""Encrypt plaintext with a given token.
:param bytes plaintext: Plaintext (json) to encrypt
:param bytes token: Token to use
:return: Encrypted bytes
"""
if not isinstance(plaintext, bytes):
raise TypeError("plaintext requires bytes")
Utils.verify_token(token)
key, iv = Utils.key_iv(token)
padder = padding.PKCS7(128).padder()
padded_plaintext = padder.update(plaintext) + padder.finalize()
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
return encryptor.update(padded_plaintext) + encryptor.finalize()
@staticmethod
def decrypt(ciphertext: bytes, token: bytes) -> bytes:
"""Decrypt ciphertext with a given token.
:param bytes ciphertext: Ciphertext to decrypt
:param bytes token: Token to use
:return: Decrypted bytes object
"""
if not isinstance(ciphertext, bytes):
raise TypeError("ciphertext requires bytes")
Utils.verify_token(token)
key, iv = Utils.key_iv(token)
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
decryptor = cipher.decryptor()
padded_plaintext = decryptor.update(ciphertext) + decryptor.finalize()
unpadder = padding.PKCS7(128).unpadder()
unpadded_plaintext = unpadder.update(padded_plaintext)
unpadded_plaintext += unpadder.finalize()
return unpadded_plaintext
@staticmethod
def checksum_field_bytes(ctx: dict[str, Any]) -> bytearray:
"""Gather bytes for checksum calculation."""
x = bytearray(ctx["header"].data)
x += ctx["_"]["token"]
if "data" in ctx:
x += ctx["data"].data
# print("DATA: %s" % ctx["data"])
return x
@staticmethod
def get_length(x) -> int:
"""Return total packet length."""
datalen = x._.data.length # type: int
return datalen + 32
@staticmethod
def is_hello(x) -> bool:
"""Return if packet is a hello packet."""
# not very nice, but we know that hellos are 32b of length
val = x.get("length", x.header.value["length"])
return val == 32
class TimeAdapter(Adapter):
"""Adapter for timestamp conversion."""
def _encode(self, obj, context, path):
return calendar.timegm(obj.timetuple())
def _decode(self, obj, context, path):
return datetime.datetime.fromtimestamp(obj, tz=datetime.timezone.utc)
class EncryptionAdapter(Adapter):
"""Adapter to handle communication encryption."""
def _encode(self, obj, context, path):
"""Encrypt the given payload with the token stored in the context.
:param obj: JSON object to encrypt
"""
# pp(context)
return Utils.encrypt(
json.dumps(obj).encode("utf-8") + b"\x00", context["_"]["token"]
)
def _decode(self, obj, context, path) -> Union[dict, bytes]:
"""Decrypts the payload using the token stored in the context."""
# Missing payload is expected for discovery messages.
if not obj:
return obj
try:
decrypted = Utils.decrypt(obj, context["_"]["token"])
decrypted = decrypted.rstrip(b"\x00")
except Exception:
_LOGGER.debug("Unable to decrypt, returning raw bytes: %s", obj)
return obj
# list of adaption functions for malformed json payload (quirks)
decrypted_quirks = [
# try without modifications first
lambda decrypted_bytes: decrypted_bytes,
# powerstrip returns malformed JSON if the device is not
# connected to the cloud, so we try to fix it here carefully.
lambda decrypted_bytes: decrypted_bytes.replace(
b',,"otu_stat"', b',"otu_stat"'
),
# xiaomi cloud returns malformed json when answering _sync.batch_gen_room_up_url
# command so try to sanitize it
lambda decrypted_bytes: (
decrypted_bytes[: decrypted_bytes.rfind(b"\x00")]
if b"\x00" in decrypted_bytes
else decrypted_bytes
),
# fix double-oh values for 090615.curtain.jldj03, ##1411
lambda decrypted_bytes: decrypted_bytes.replace(
b'"value":00', b'"value":0'
),
# fix double commas for xiaomi.vacuum.b112, fw: 2.2.4_0049
lambda decrypted_bytes: decrypted_bytes.replace(b",,", b","),
# fix "result":," no sense key for xiaomi.vacuum.b112, fw:2.2.4_0050
lambda decrypted_bytes: decrypted_bytes.replace(b'"result":,', b""),
]
for i, quirk in enumerate(decrypted_quirks):
try:
decoded = quirk(decrypted).decode("utf-8")
return json.loads(decoded)
except Exception as ex:
# log the error when decrypted bytes couldn't be loaded
# after trying all quirk adaptions
if i == len(decrypted_quirks) - 1:
_LOGGER.error("Unable to parse json '%s': %s", decrypted, ex)
raise PayloadDecodeException(
"Unable to parse message payload"
) from ex
raise Exception("this should never happen")
Message = Struct(
# for building we need data before anything else.
"data" / Pointer(32, RawCopy(EncryptionAdapter(GreedyBytes))),
"header"
/ RawCopy(
Struct(
Const(0x2131, Int16ub),
"length" / Rebuild(Int16ub, Utils.get_length),
"unknown" / Default(Int32ub, 0x00000000),
"device_id" / Hex(Bytes(4)),
"ts"
/ TimeAdapter(
Default(
Int32ub,
datetime.datetime.now(tz=datetime.timezone.utc),
)
),
)
),
"checksum"
/ IfThenElse(
Utils.is_hello,
Bytes(16),
Checksum(Bytes(16), Utils.md5, Utils.checksum_field_bytes),
),
)