Skip to content

Commit

Permalink
feat: Support GCM encryption for Gree devices (#92)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Rami Mosleh <[email protected]>
Co-authored-by: Rami Mousleh <[email protected]>
  • Loading branch information
3 people authored Aug 5, 2024
1 parent 10490fe commit 7122cdd
Show file tree
Hide file tree
Showing 16 changed files with 601 additions and 307 deletions.
5 changes: 5 additions & 0 deletions .idea/codeStyles/codeStyleConfig.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions .idea/material_theme_project_new.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions .idea/ruff.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions .idea/runConfigurations/pytest_in__.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion emulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import socket
import time

import machine
import network
import ubinascii
from ucryptolib import aes
Expand Down
88 changes: 88 additions & 0 deletions greeclimate/cipher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import base64
import json
import logging
from typing import Union, Tuple

from Crypto.Cipher import AES

_logger = logging.getLogger(__name__)

class CipherBase:
def __init__(self, key: bytes) -> None:
self._key: bytes = key

@property
def key(self) -> str:
return self._key.decode()

@key.setter
def key(self, value: str) -> None:
self._key = value.encode()

def encrypt(self, data) -> Tuple[str, Union[str, None]]:
raise NotImplementedError

def decrypt(self, data) -> dict:
raise NotImplementedError


class CipherV1(CipherBase):
def __init__(self, key: bytes = b'a3K8Bx%2r8Y7#xDh') -> None:
super().__init__(key)

def __create_cipher(self) -> AES:
return AES.new(self._key, AES.MODE_ECB)

def __pad(self, s) -> str:
return s + (16 - len(s) % 16) * chr(16 - len(s) % 16)

def encrypt(self, data) -> Tuple[str, Union[str, None]]:
_logger.debug("Encrypting data: %s", data)
cipher = self.__create_cipher()
padded = self.__pad(json.dumps(data)).encode()
encrypted = cipher.encrypt(padded)
encoded = base64.b64encode(encrypted).decode()
_logger.debug("Encrypted data: %s", encoded)
return encoded, None

def decrypt(self, data) -> dict:
_logger.debug("Decrypting data: %s", data)
cipher = self.__create_cipher()
decoded = base64.b64decode(data)
decrypted = cipher.decrypt(decoded).decode()
t = decrypted.replace(decrypted[decrypted.rindex('}') + 1:], '')
_logger.debug("Decrypted data: %s", t)
return json.loads(t)


class CipherV2(CipherBase):
GCM_NONCE = b'\x54\x40\x78\x44\x49\x67\x5a\x51\x6c\x5e\x63\x13'
GCM_AEAD = b'qualcomm-test'

def __init__(self, key: bytes = b'{yxAHAY_Lm6pbC/<') -> None:
super().__init__(key)

def __create_cipher(self) -> AES:
cipher = AES.new(self._key, AES.MODE_GCM, nonce=self.GCM_NONCE)
cipher.update(self.GCM_AEAD)
return cipher

def encrypt(self, data) -> Tuple[str, str]:
_logger.debug("Encrypting data: %s", data)
cipher = self.__create_cipher()
encrypted, tag = cipher.encrypt_and_digest(json.dumps(data).encode())
encoded = base64.b64encode(encrypted).decode()
tag = base64.b64encode(tag).decode()
_logger.debug("Encrypted data: %s", encoded)
_logger.debug("Cipher digest: %s", tag)
return encoded, tag

def decrypt(self, data) -> dict:
_logger.info("Decrypting data: %s", data)
cipher = self.__create_cipher()
decoded = base64.b64decode(data)
decrypted = cipher.decrypt(decoded).decode()
t = decrypted.replace(decrypted[decrypted.rindex('}') + 1:], '')
_logger.debug("Decrypted data: %s", t)
return json.loads(t)

69 changes: 47 additions & 22 deletions greeclimate/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
import re
from asyncio import AbstractEventLoop
from enum import IntEnum, unique
from typing import List
from typing import Union

import greeclimate.network as network
from greeclimate.cipher import CipherV1, CipherV2
from greeclimate.deviceinfo import DeviceInfo
from greeclimate.network import DeviceProtocol2, IPAddr
from greeclimate.exceptions import DeviceNotBoundError, DeviceTimeoutError
from greeclimate.network import DeviceProtocol2
from greeclimate.taskable import Taskable


Expand Down Expand Up @@ -155,28 +155,35 @@ class Device(DeviceProtocol2, Taskable):
water_full: A bool to indicate the water tank is full
"""

def __init__(self, device_info: DeviceInfo, timeout: int = 120, loop: AbstractEventLoop = None):
def __init__(self, device_info: DeviceInfo, timeout: int = 120, bind_timeout: int = 10, loop: AbstractEventLoop = None):
"""Initialize the device object
Args:
device_info (DeviceInfo): Information about the physical device
timeout (int): Timeout for device communication
bind_timeout (int): Timeout for binding to the device, keep this short to prevent delays determining the
correct device cipher to use
loop (AbstractEventLoop): The event loop to run the device operations on
"""
DeviceProtocol2.__init__(self, timeout)
Taskable.__init__(self, loop)
self._logger = logging.getLogger(__name__)

self.device_info: DeviceInfo = device_info


self._bind_timeout = bind_timeout

""" Device properties """
self.hid = None
self.version = None
self.check_version = True
self._properties = {}
self._dirty = []

async def bind(self, key=None):
async def bind(
self,
key: str = None,
cipher: Union[CipherV1, CipherV2, None] = None,
):
"""Run the binding procedure.
Binding is a finicky procedure, and happens in 1 of 2 ways:
Expand All @@ -187,14 +194,23 @@ async def bind(self, key=None):
Both approaches result in a device_key which is used as like a persistent session id.
Args:
cipher (CipherV1 | CipherV2): The cipher type to use for encryption, if None will attempt to detect the correct one
key (str): The device key, when provided binding is a NOOP, if None binding will
attempt to negotiate the key with the device.
attempt to negotiate the key with the device. cipher must be provided.
Raises:
DeviceNotBoundError: If binding was unsuccessful and no key returned
DeviceTimeoutError: The device didn't respond
"""

if key:
if not cipher:
raise ValueError("cipher must be provided when key is provided")
else:
cipher.key = key
self.device_cipher = cipher
return

if not self.device_info:
raise DeviceNotBoundError

Expand All @@ -206,29 +222,38 @@ async def bind(self, key=None):
self._logger.info("Starting device binding to %s", str(self.device_info))

try:
if key:
self.device_key = key
if cipher is not None:
await self.__bind_internal(cipher)
else:
await self.send(self.create_bind_message(self.device_info))
# Special case, wait for binding to complete so we know that the device is ready
task = asyncio.create_task(self.ready.wait())
await asyncio.wait_for(task, timeout=self._timeout)
""" Try binding with CipherV1 first, if that fails try CipherV2"""
try:
self._logger.info("Attempting to bind to device using CipherV1")
await self.__bind_internal(CipherV1())
except asyncio.TimeoutError:
self._logger.info("Attempting to bind to device using CipherV2")
await self.__bind_internal(CipherV2())

except asyncio.TimeoutError:
raise DeviceTimeoutError

if not self.device_key:
if not self.device_cipher:
raise DeviceNotBoundError
else:
self._logger.info("Bound to device using key %s", self.device_key)
self._logger.info("Bound to device using key %s", self.device_cipher.key)

async def __bind_internal(self, cipher: Union[CipherV1, CipherV2]):
"""Internal binding procedure, do not call directly"""
await self.send(self.create_bind_message(self.device_info), cipher=cipher)
task = asyncio.create_task(self.ready.wait())
await asyncio.wait_for(task, timeout=self._bind_timeout)

def handle_device_bound(self, key) -> None:
def handle_device_bound(self, key: str) -> None:
"""Handle the device bound message from the device"""
self.device_key = key
self.device_cipher.key = key

async def request_version(self) -> None:
"""Request the firmware version from the device."""
if not self.device_key:
if not self.device_cipher:
await self.bind()

try:
Expand All @@ -243,7 +268,7 @@ async def update_state(self, wait_for: float = 30):
Args:
wait_for (object): How long to wait for an update from the device
"""
if not self.device_key:
if not self.device_cipher:
await self.bind()

self._logger.debug("Updating device properties for (%s)", str(self.device_info))
Expand Down Expand Up @@ -288,7 +313,7 @@ async def push_state_update(self, wait_for: float = 30):
if not self._dirty:
return

if not self.device_key:
if not self.device_cipher:
await self.bind()

self._logger.debug("Pushing state updates to (%s)", str(self.device_info))
Expand Down Expand Up @@ -316,7 +341,7 @@ def __eq__(self, other):
"""Compare two devices for equality based on their properties state and device info."""
return self.device_info == other.device_info \
and self.raw_properties == other.raw_properties \
and self.device_key == other.device_key
and self.device_cipher.key == other.device_cipher.key

def __ne__(self, other):
return not self.__eq__(other)
Expand Down
2 changes: 2 additions & 0 deletions greeclimate/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from asyncio.events import AbstractEventLoop
from ipaddress import IPv4Address

from greeclimate.cipher import CipherV1
from greeclimate.device import DeviceInfo
from greeclimate.network import BroadcastListenerProtocol, IPAddr
from greeclimate.taskable import Taskable
Expand Down Expand Up @@ -45,6 +46,7 @@ def __init__(
"""
BroadcastListenerProtocol.__init__(self, timeout)
Taskable.__init__(self, loop)
self.device_cipher = CipherV1()
self._allow_loopback: bool = allow_loopback
self._device_infos: list[DeviceInfo] = []
self._listeners: list[Listener] = []
Expand Down
Loading

0 comments on commit 7122cdd

Please sign in to comment.