From a012ec549ebe9f0dd6bdc0e52b02509187864003 Mon Sep 17 00:00:00 2001 From: "Jason A. Cox" Date: Mon, 23 Nov 2020 21:44:20 -0800 Subject: [PATCH] Clean up python code formatting --- tinytuya/__init__.py | 460 ++++++++++++++++++++++++------------------- 1 file changed, 252 insertions(+), 208 deletions(-) diff --git a/tinytuya/__init__.py b/tinytuya/__init__.py index 75b55485..6a501109 100644 --- a/tinytuya/__init__.py +++ b/tinytuya/__init__.py @@ -81,7 +81,7 @@ __author__ = 'jasonacox' log = logging.getLogger(__name__) -#logging.basicConfig(level=logging.DEBUG) # Uncomment to Debug +# logging.basicConfig(level=logging.DEBUG) # Uncomment to Debug log.debug('%s version %s', __name__, __version__) log.debug('Python %s on %s', sys.version, sys.platform) @@ -92,7 +92,7 @@ log.debug('Using PyCrypto %r', Crypto.version_info) log.debug('Using PyCrypto from %r', Crypto.__file__) -## Tuya Command Types +# Tuya Command Types UDP = 0 AP_CONFIG = 1 ACTIVE = 2 @@ -126,26 +126,30 @@ LAN_GW_UPDATE = 251 LAN_SET_GW_CHANNEL = 252 -## Protocol Versions +# Protocol Versions PROTOCOL_VERSION_BYTES_31 = b'3.1' PROTOCOL_VERSION_BYTES_33 = b'3.3' -## Python 2 Support +# Python 2 Support IS_PY2 = sys.version_info[0] == 2 -## Cryptography Helpers +# Cryptography Helpers + + class AESCipher(object): def __init__(self, key): self.bs = 16 self.key = key - def encrypt(self, raw, use_base64 = True): + + def encrypt(self, raw, use_base64=True): if Crypto: raw = self._pad(raw) cipher = AES.new(self.key, mode=AES.MODE_ECB) crypted_text = cipher.encrypt(raw) else: _ = self._pad(raw) - cipher = pyaes.blockfeeder.Encrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16 + cipher = pyaes.blockfeeder.Encrypter( + pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16 crypted_text = cipher.feed(raw) crypted_text += cipher.feed() # flush final block @@ -153,7 +157,7 @@ def encrypt(self, raw, use_base64 = True): return base64.b64encode(crypted_text) else: return crypted_text - + def decrypt(self, enc, use_base64=True): if use_base64: enc = base64.b64decode(enc) @@ -164,7 +168,8 @@ def decrypt(self, enc, use_base64=True): return self._unpad(raw).decode('utf-8') else: - cipher = pyaes.blockfeeder.Decrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16 + cipher = pyaes.blockfeeder.Decrypter( + pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16 plain_text = cipher.feed(enc) plain_text += cipher.feed() # flush final block return plain_text @@ -172,10 +177,12 @@ def decrypt(self, enc, use_base64=True): def _pad(self, s): padnum = self.bs - len(s) % self.bs return s + padnum * chr(padnum).encode() + @staticmethod def _unpad(s): return s[:-ord(s[len(s)-1:])] + def bin2hex(x, pretty=False): if pretty: space = ' ' @@ -187,6 +194,7 @@ def bin2hex(x, pretty=False): result = ''.join('%02X%s' % (y, space) for y in x) return result + def hex2bin(x): if IS_PY2: return x.decode('hex') @@ -196,70 +204,72 @@ def hex2bin(x): # Tuya Device Dictionary - Commands and Payload Template # See requests.json payload at https://github.com/codetheweb/tuyapi + payload_dict = { - # Default Device - "default": { - CONTROL: { # Set Control Values on Device - "hexByte": "07", - "command": {"devId": "", "uid": "", "t": ""} - }, - STATUS: { # Get Status from Device - "hexByte": "08", - "command": {"gwId": "", "devId": ""} - }, - HEART_BEAT: { - "hexByte": "09", - "command": {} - }, - DP_QUERY: { # Get Data Points from Device - "hexByte": "0a", - "command": {"gwId": "", "devId": "", "uid": "", "t": ""}, - }, - CONTROL_NEW: { - "hexByte": "0d", - "command": {"devId": "", "uid": "", "t": ""} - }, - DP_QUERY_NEW: { - "hexByte": "0f", - "command": {"devId": "", "uid": "", "t": ""} - }, - "prefix": "000055aa00000000000000", - # Next byte is command "hexByte" + length of remaining payload + command + suffix - # (unclear if multiple bytes used for length, zero padding implies could be more + # Default Device + "default": { + CONTROL: { # Set Control Values on Device + "hexByte": "07", + "command": {"devId": "", "uid": "", "t": ""} + }, + STATUS: { # Get Status from Device + "hexByte": "08", + "command": {"gwId": "", "devId": ""} + }, + HEART_BEAT: { + "hexByte": "09", + "command": {} + }, + DP_QUERY: { # Get Data Points from Device + "hexByte": "0a", + "command": {"gwId": "", "devId": "", "uid": "", "t": ""}, + }, + CONTROL_NEW: { + "hexByte": "0d", + "command": {"devId": "", "uid": "", "t": ""} + }, + DP_QUERY_NEW: { + "hexByte": "0f", + "command": {"devId": "", "uid": "", "t": ""} + }, + "prefix": "000055aa00000000000000", + # Next byte is command "hexByte" + length of remaining payload + command + suffix + # (unclear if multiple bytes used for length, zero padding implies could be more # than one byte) - "suffix": "000000000000aa55" - }, - # Special Case Device with 22 character ID - Some of these devices - # Require the 0d command as the DP_QUERY status request and the list of - # dps requested payload - "device22": { - DP_QUERY: { # Get Data Points from Device - "hexByte": "0d", # Uses CONTROL_NEW command for some reason - "command": {"devId": "", "uid": "", "t": ""} - }, - CONTROL: { # Set Control Values on Device - "hexByte": "07", - "command": {"devId": "", "uid": "", "t": ""} - }, - HEART_BEAT: { - "hexByte": "09", - "command": {} + "suffix": "000000000000aa55" }, - "prefix": "000055aa00000000000000", - "suffix": "000000000000aa55" - } + # Special Case Device with 22 character ID - Some of these devices + # Require the 0d command as the DP_QUERY status request and the list of + # dps requested payload + "device22": { + DP_QUERY: { # Get Data Points from Device + "hexByte": "0d", # Uses CONTROL_NEW command for some reason + "command": {"devId": "", "uid": "", "t": ""} + }, + CONTROL: { # Set Control Values on Device + "hexByte": "07", + "command": {"devId": "", "uid": "", "t": ""} + }, + HEART_BEAT: { + "hexByte": "09", + "command": {} + }, + "prefix": "000055aa00000000000000", + "suffix": "000000000000aa55" + } } + class XenonDevice(object): def __init__(self, dev_id, address, local_key="", dev_type="default", connection_timeout=10): """ Represents a Tuya device. - + Args: dev_id (str): The device id. address (str): The network address. local_key (str, optional): The encryption key. Defaults to None. - + Attributes: port (int): The port to connect to. """ @@ -280,68 +290,67 @@ def __init__(self, dev_id, address, local_key="", dev_type="default", connection def __del__(self): # In case we have a lingering socket connection, close it - if self.socket!=None: + if self.socket != None: self.socket.close() - self.socket=None + self.socket = None def __repr__(self): - return '%r' % ((self.id, self.address),) # FIXME can do better than this + # FIXME can do better than this + return '%r' % ((self.id, self.address),) - def _get_socket(self,renew): - if(renew and self.socket!=None): + def _get_socket(self, renew): + if(renew and self.socket != None): self.socket.close() - self.socket=None - if(self.socket==None): - self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - if(self.socketNODELAY): - self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - self.socket.settimeout(self.connection_timeout) - self.socket.connect((self.address, self.port)) + self.socket = None + if(self.socket == None): + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + if(self.socketNODELAY): + self.socket.setsockopt( + socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + self.socket.settimeout(self.connection_timeout) + self.socket.connect((self.address, self.port)) def _send_receive(self, payload): """ Send single buffer `payload` and receive a single buffer. - + Args: payload(bytes): Data to send. """ - success=False - retries=0 + success = False + retries = 0 while not success: - # make sure I have a socket (may already exist) - self._get_socket(False) - try: - self.socket.send(payload) - data = self.socket.recv(1024) - # Some devices fail to send full payload in first response - if self.retry and len(data) < 40: + # make sure I have a socket (may already exist) + self._get_socket(False) + try: + self.socket.send(payload) + data = self.socket.recv(1024) + # Some devices fail to send full payload in first response + if self.retry and len(data) < 40: + time.sleep(0.1) + data = self.socket.recv(1024) # try again + success = True + # Legacy/default mode avoids persisting socket across commands + if(not self.socketPersistent): + self.socket.close() + self.socket = None + except: + retries = retries+1 + log.debug('Exception with low level TinyTuya socket!!! retry ' + + str(retries)+'/'+str(self.socketRetryLimit)) + # if we exceed the limit of retries then lets get out of here + if(retries > self.socketRetryLimit): + if(self.socket != None): + self.socket.close() + self.socket = None + log.exception( + 'Exceeded tinytuya retry limit ('+str(self.socketRetryLimit)+')') + # goodbye + raise + # retry: wait a bit, toss old socket and get new one time.sleep(0.1) - data = self.socket.recv(1024) # try again - success=True - # Legacy/default mode avoids persisting socket across commands - if(not self.socketPersistent): - self.socket.close() - self.socket=None - except: - retries=retries+1 - log.debug('Exception with low level TinyTuya socket!!! retry '+str(retries)+'/'+str(self.socketRetryLimit)) - #print('Exception with low level TinyTuya socket!!! retry '+str(retries)+'/'+str(self.socketRetryLimit)) - # - # if we exceed the limit of retries then lets get out of here by reraising to a higher power - # - if(retries>self.socketRetryLimit): - if(self.socket!=None): - self.socket.close() - self.socket=None - log.exception('Exceeded tinytuya retry limit ('+str(self.socketRetryLimit)+')') - # - # goodbye - # - raise - # retry: wait a bit, toss old socket and get new one - time.sleep(0.1) - self._get_socket(True) - # except + self._get_socket(True) + # except # while return data @@ -359,7 +368,7 @@ def set_socketRetryLimit(self, limit): def set_dpsUsed(self, dpsUsed): self.dpsUsed = dpsUsed - + def set_retry(self, retry): self.retry = retry @@ -392,46 +401,55 @@ def generate_payload(self, command, data=None): # Create byte buffer from hex data json_payload = json.dumps(json_data) - json_payload = json_payload.replace(' ', '') # if spaces are not removed device does not respond! + # if spaces are not removed device does not respond! + json_payload = json_payload.replace(' ', '') json_payload = json_payload.encode('utf-8') log.debug('json_payload=%r', json_payload) if self.version == 3.3: - self.cipher = AESCipher(self.local_key) # expect to connect and then disconnect to set new + # expect to connect and then disconnect to set new + self.cipher = AESCipher(self.local_key) json_payload = self.cipher.encrypt(json_payload, False) self.cipher = None if command_hb != '0a': # add the 3.3 header - json_payload = PROTOCOL_VERSION_BYTES_33 + b"\0\0\0\0\0\0\0\0\0\0\0\0" + json_payload - elif command == CONTROL: + json_payload = PROTOCOL_VERSION_BYTES_33 + \ + b"\0\0\0\0\0\0\0\0\0\0\0\0" + json_payload + elif command == CONTROL: # need to encrypt - self.cipher = AESCipher(self.local_key) # expect to connect and then disconnect to set new + # expect to connect and then disconnect to set new + self.cipher = AESCipher(self.local_key) json_payload = self.cipher.encrypt(json_payload) - preMd5String = b'data=' + json_payload + b'||lpv=' + PROTOCOL_VERSION_BYTES_31 + b'||' + self.local_key + preMd5String = b'data=' + json_payload + b'||lpv=' + \ + PROTOCOL_VERSION_BYTES_31 + b'||' + self.local_key m = md5() m.update(preMd5String) hexdigest = m.hexdigest() - json_payload = PROTOCOL_VERSION_BYTES_31 + hexdigest[8:][:24].encode('latin1') + json_payload + json_payload = PROTOCOL_VERSION_BYTES_31 + \ + hexdigest[8:][:24].encode('latin1') + json_payload self.cipher = None # expect to connect and then disconnect to set new - postfix_payload = hex2bin(bin2hex(json_payload) + payload_dict[self.dev_type]['suffix']) + postfix_payload = hex2bin( + bin2hex(json_payload) + payload_dict[self.dev_type]['suffix']) assert len(postfix_payload) <= 0xff - postfix_payload_hex_len = '%x' % len(postfix_payload) # single byte 0-255 (0x00-0xff) - buffer = hex2bin( payload_dict[self.dev_type]['prefix'] + - payload_dict[self.dev_type][command]['hexByte'] + - '000000' + - postfix_payload_hex_len ) + postfix_payload + postfix_payload_hex_len = '%x' % len( + postfix_payload) # single byte 0-255 (0x00-0xff) + buffer = hex2bin(payload_dict[self.dev_type]['prefix'] + + payload_dict[self.dev_type][command]['hexByte'] + + '000000' + + postfix_payload_hex_len) + postfix_payload # calc the CRC of everything except where the CRC goes and the suffix hex_crc = format(binascii.crc32(buffer[:-8]) & 0xffffffff, '08X') buffer = buffer[:-8] + hex2bin(hex_crc) + buffer[-4:] return buffer - + + class Device(XenonDevice): def __init__(self, dev_id, address, local_key="", dev_type="default"): super(Device, self).__init__(dev_id, address, local_key, dev_type) - + def status(self): log.debug('status() entry (dev_type is %s)', self.dev_type) # open device, send request, then close connection @@ -456,7 +474,8 @@ def status(self): # expect resulting json to look similar to:: {"devId":"ID","dps":{"1":true,"2":0},"t":EPOCH_SECS,"s":3_DIGIT_NUM} # NOTE dps.2 may or may not be present result = result[len(PROTOCOL_VERSION_BYTES_31):] # remove version header - result = result[16:] # Remove 16-bytes appears to be MD5 hexdigest of payload + # Remove 16-bytes appears to be MD5 hexdigest of payload + result = result[16:] cipher = AESCipher(self.local_key) result = cipher.decrypt(result) log.debug('decrypted result=%r', result) @@ -478,7 +497,7 @@ def status(self): def set_status(self, on, switch=1): """ Set status of the device to 'on' or 'off'. - + Args: on(bool): True for 'on', False for 'off'. switch(int): The switch to set @@ -486,13 +505,13 @@ def set_status(self, on, switch=1): # open device, send request, then close connection if isinstance(switch, int): switch = str(switch) # index and payload is a string - payload = self.generate_payload(CONTROL, {switch:on}) + payload = self.generate_payload(CONTROL, {switch: on}) data = self._send_receive(payload) log.debug('set_status received data=%r', data) return data - + def set_value(self, index, value): """ Set int value of any index. @@ -507,11 +526,11 @@ def set_value(self, index, value): payload = self.generate_payload(CONTROL, { index: value}) - + data = self._send_receive(payload) - + return data - + def turn_on(self, switch=1): """Turn the device on""" self.set_status(True, switch) @@ -523,7 +542,7 @@ def turn_off(self, switch=1): def set_timer(self, num_secs): """ Set a timer. - + Args: num_secs(int): Number of seconds """ @@ -535,44 +554,48 @@ def set_timer(self, num_secs): devices_numbers.sort() dps_id = devices_numbers[-1] - payload = self.generate_payload(CONTROL, {dps_id:num_secs}) + payload = self.generate_payload(CONTROL, {dps_id: num_secs}) data = self._send_receive(payload) log.debug('set_timer received data=%r', data) return data + class OutletDevice(Device): """ Represents a Tuya based Smart Plug or Switch. - + Args: dev_id (str): The device id. address (str): The network address. local_key (str, optional): The encryption key. Defaults to None. """ + def __init__(self, dev_id, address, local_key="", dev_type="default"): - super(OutletDevice, self).__init__(dev_id, address, local_key, dev_type) + super(OutletDevice, self).__init__( + dev_id, address, local_key, dev_type) + class CoverDevice(Device): """ Represents a Tuya based Smart Window Cover. - + Args: dev_id (str): The device id. address (str): The network address. local_key (str, optional): The encryption key. Defaults to None. """ - DPS_INDEX_MOVE = '1' - DPS_INDEX_BL = '101' + DPS_INDEX_MOVE = '1' + DPS_INDEX_BL = '101' DPS_2_STATE = { - '1':'movement', - '101':'backlight', - } + '1': 'movement', + '101': 'backlight', + } def __init__(self, dev_id, address, local_key="", dev_type="default"): super(CoverDevice, self).__init__(dev_id, address, local_key, dev_type) - + def open_cover(self, switch=1): """Open the cover""" self.set_status('on', switch) @@ -585,37 +608,38 @@ def stop_cover(self, switch=1): """Stop the motion of the cover""" self.set_status('stop', switch) + class BulbDevice(Device): """ Represents a Tuya based Smart Light/Bulb. - + Args: dev_id (str): The device id. address (str): The network address. local_key (str, optional): The encryption key. Defaults to None. """ - DPS_INDEX_ON = '1' - DPS_INDEX_MODE = '2' + DPS_INDEX_ON = '1' + DPS_INDEX_MODE = '2' DPS_INDEX_BRIGHTNESS = '3' DPS_INDEX_COLOURTEMP = '4' - DPS_INDEX_COLOUR = '5' + DPS_INDEX_COLOUR = '5' - DPS = 'dps' + DPS = 'dps' DPS_MODE_COLOUR = 'colour' - DPS_MODE_WHITE = 'white' + DPS_MODE_WHITE = 'white' # DPS_MODE_SCENE_1 = 'scene_1' # nature DPS_MODE_SCENE_2 = 'scene_2' DPS_MODE_SCENE_3 = 'scene_3' # rave DPS_MODE_SCENE_4 = 'scene_4' # rainbow - + DPS_2_STATE = { - '1':'is_on', - '2':'mode', - '3':'brightness', - '4':'colourtemp', - '5':'colour', - } + '1': 'is_on', + '2': 'mode', + '3': 'brightness', + '4': 'colourtemp', + '5': 'colour', + } def __init__(self, dev_id, address, local_key="", dev_type="default"): super(BulbDevice, self).__init__(dev_id, address, local_key, dev_type) @@ -624,25 +648,25 @@ def __init__(self, dev_id, address, local_key="", dev_type="default"): def _rgb_to_hexvalue(r, g, b): """ Convert an RGB value to the hex representation expected by tuya. - + Index '5' (DPS_INDEX_COLOUR) is assumed to be in the format: rrggbb0hhhssvv - + While r, g and b are just hexadecimal values of the corresponding Red, Green and Blue values, the h, s and v values (which are values between 0 and 1) are scaled to 360 (h) and 255 (s and v) respectively. - + Args: r(int): Value for the colour red as int from 0-255. g(int): Value for the colour green as int from 0-255. b(int): Value for the colour blue as int from 0-255. """ - rgb = [r,g,b] + rgb = [r, g, b] hsv = colorsys.rgb_to_hsv(rgb[0]/255, rgb[1]/255, rgb[2]/255) hexvalue = "" for value in rgb: - temp = str(hex(int(value))).replace("0x","") + temp = str(hex(int(value))).replace("0x", "") if len(temp) == 1: temp = "0" + temp hexvalue = hexvalue + temp @@ -650,7 +674,7 @@ def _rgb_to_hexvalue(r, g, b): hsvarray = [int(hsv[0] * 360), int(hsv[1] * 255), int(hsv[2] * 255)] hexvalue_hsv = "" for value in hsvarray: - temp = str(hex(int(value))).replace("0x","") + temp = str(hex(int(value))).replace("0x", "") if len(temp) == 1: temp = "0" + temp hexvalue_hsv = hexvalue_hsv + temp @@ -666,7 +690,7 @@ def _hexvalue_to_rgb(hexvalue): """ Converts the hexvalue used by Tuya for colour representation into an RGB value. - + Args: hexvalue(string): The hex representation generated by BulbDevice._rgb_to_hexvalue() """ @@ -681,7 +705,7 @@ def _hexvalue_to_hsv(hexvalue): """ Converts the hexvalue used by Tuya for colour representation into an HSV value. - + Args: hexvalue(string): The hex representation generated by BulbDevice._rgb_to_hexvalue() """ @@ -699,22 +723,23 @@ def set_scene(self, scene): scene(int): Value for the scene as int from 1-4. """ if not 1 <= scene <= 4: - raise ValueError("The value for scene needs to be between 1 and 4.") + raise ValueError( + "The value for scene needs to be between 1 and 4.") - #print(BulbDevice) + # print(BulbDevice) - if(scene==1): - s=self.DPS_MODE_SCENE_1 - elif(scene==2): - s=self.DPS_MODE_SCENE_2 - elif(scene==3): - s=self.DPS_MODE_SCENE_3 + if(scene == 1): + s = self.DPS_MODE_SCENE_1 + elif(scene == 2): + s = self.DPS_MODE_SCENE_2 + elif(scene == 3): + s = self.DPS_MODE_SCENE_3 else: - s=self.DPS_MODE_SCENE_4 + s = self.DPS_MODE_SCENE_4 payload = self.generate_payload(CONTROL, { self.DPS_INDEX_MODE: s - }) + }) data = self._send_receive(payload) return data @@ -728,13 +753,15 @@ def set_colour(self, r, g, b): b(int): Value for the colour blue as int from 0-255. """ if not 0 <= r <= 255: - raise ValueError("The value for red needs to be between 0 and 255.") + raise ValueError( + "The value for red needs to be between 0 and 255.") if not 0 <= g <= 255: - raise ValueError("The value for green needs to be between 0 and 255.") + raise ValueError( + "The value for green needs to be between 0 and 255.") if not 0 <= b <= 255: - raise ValueError("The value for blue needs to be between 0 and 255.") + raise ValueError( + "The value for blue needs to be between 0 and 255.") - #print(BulbDevice) hexvalue = BulbDevice._rgb_to_hexvalue(r, g, b) payload = self.generate_payload(CONTROL, { @@ -754,7 +781,8 @@ def set_white(self, brightness, colourtemp): if not 25 <= brightness <= 255: raise ValueError("The brightness needs to be between 25 and 255.") if not 0 <= colourtemp <= 255: - raise ValueError("The colour temperature needs to be between 0 and 255.") + raise ValueError( + "The colour temperature needs to be between 0 and 255.") payload = self.generate_payload(CONTROL, { self.DPS_INDEX_MODE: self.DPS_MODE_WHITE, @@ -774,7 +802,8 @@ def set_brightness(self, brightness): if not 25 <= brightness <= 255: raise ValueError("The brightness needs to be between 25 and 255.") - payload = self.generate_payload(CONTROL, {self.DPS_INDEX_BRIGHTNESS: brightness}) + payload = self.generate_payload( + CONTROL, {self.DPS_INDEX_BRIGHTNESS: brightness}) data = self._send_receive(payload) return data @@ -786,9 +815,11 @@ def set_colourtemp(self, colourtemp): colourtemp(int): Value for the colour temperature (0-255). """ if not 0 <= colourtemp <= 255: - raise ValueError("The colour temperature needs to be between 0 and 255.") + raise ValueError( + "The colour temperature needs to be between 0 and 255.") - payload = self.generate_payload(CONTROL, {self.DPS_INDEX_COLOURTEMP: colourtemp}) + payload = self.generate_payload( + CONTROL, {self.DPS_INDEX_COLOURTEMP: colourtemp}) data = self._send_receive(payload) return data @@ -816,8 +847,8 @@ def state(self): state = {} for key in status[self.DPS].keys(): - if(int(key)<=5): - state[self.DPS_2_STATE[key]]=status[self.DPS][key] + if(int(key) <= 5): + state[self.DPS_2_STATE[key]] = status[self.DPS][key] return state @@ -830,20 +861,29 @@ def state(self): UDPPORTS = 6667 # Tuya 3.3 encrypted UDP Port TIMEOUT = 3.0 # Seconds to wait for a broadcast -# UDP packet payload decryption - credit to tuya-convert -pad = lambda s: s + (16 - len(s) % 16) * chr(16 - len(s) % 16) -unpad = lambda s: s[:-ord(s[len(s) - 1:])] -encrypt = lambda msg, key: AES.new(key, AES.MODE_ECB).encrypt(pad(msg).encode()) -decrypt = lambda msg, key: unpad(AES.new(key, AES.MODE_ECB).decrypt(msg)).decode() +# UDP packet payload decryption - credit to tuya-convert + + +def pad(s): return s + (16 - len(s) % 16) * chr(16 - len(s) % 16) +def unpad(s): return s[:-ord(s[len(s) - 1:])] + + +def encrypt(msg, key): return AES.new( + key, AES.MODE_ECB).encrypt(pad(msg).encode()) +def decrypt(msg, key): return unpad( + AES.new(key, AES.MODE_ECB).decrypt(msg)).decode() + + udpkey = md5(b"yGAdlopoPVldABfn").digest() -decrypt_udp = lambda msg: decrypt(msg, udpkey) +def decrypt_udp(msg): return decrypt(msg, udpkey) # Return positive number or zero def floor(x): if x > 0: - return x + return x else: - return 0 + return 0 + def appenddevice(newdevice, devices): if(newdevice['ip'] in devices): @@ -857,13 +897,13 @@ def appenddevice(newdevice, devices): return False # Scan function shortcut -def scan(maxretry = MAXCOUNT): +def scan(maxretry=MAXCOUNT): """Scans your network for Tuya devices with output to stdout """ - d = deviceScan(True,maxretry) + d = deviceScan(True, maxretry) # Scan function -def deviceScan(verbose = False,maxretry = MAXCOUNT): +def deviceScan(verbose=False, maxretry=MAXCOUNT): """Scans your network for Tuya devices and returns dictionary of devices discovered devices = tinytuya.deviceScan(verbose) @@ -884,21 +924,24 @@ def deviceScan(verbose = False,maxretry = MAXCOUNT): """ # Enable UDP listening broadcasting mode on UDP port 6666 - 3.1 Devices - client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + client = socket.socket( + socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) client.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) client.bind(("", UDPPORT)) - client.settimeout(TIMEOUT) + client.settimeout(TIMEOUT) # Enable UDP listening broadcasting mode on encrypted UDP port 6667 - 3.3 Devices - clients = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + clients = socket.socket( + socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) clients.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) clients.bind(("", UDPPORTS)) clients.settimeout(TIMEOUT) if(verbose): - print("Scanning on UDP ports %s and %s for devices (%s retries)...\n"%(UDPPORT,UDPPORTS,maxretry)) + print("Scanning on UDP ports %s and %s for devices (%s retries)...\n" % + (UDPPORT, UDPPORTS, maxretry)) # globals - devices={} + devices = {} count = 0 counts = 0 spinnerx = 0 @@ -907,7 +950,7 @@ def deviceScan(verbose = False,maxretry = MAXCOUNT): while (count + counts) <= maxretry: note = 'invalid' if(verbose): - print("Scanning... %s\r" % (spinner[spinnerx]), end = '') + print("Scanning... %s\r" % (spinner[spinnerx]), end='') spinnerx = (spinnerx + 1) % 4 if (count <= counts): # alternate between 6666 and 6667 ports @@ -927,7 +970,7 @@ def deviceScan(verbose = False,maxretry = MAXCOUNT): ip = addr[0] gwId = productKey = version = "" result = data - try: + try: result = data[20:-8] try: result = decrypt_udp(result) @@ -949,12 +992,13 @@ def deviceScan(verbose = False,maxretry = MAXCOUNT): # check to see if we have seen this device before and add to devices array if appenddevice(result, devices) == False: # new device found - back off count if we keep getting new devices - if(version=='3.1'): + if(version == '3.1'): count = floor(count - 1) else: counts = floor(counts - 1) if(verbose): - print("FOUND Device [%s payload]: %s\n ID = %s, product = %s, Version = %s" % (note,ip,gwId,productKey,version)) + print("FOUND Device [%s payload]: %s\n ID = %s, product = %s, Version = %s" % ( + note, ip, gwId, productKey, version)) try: if(version == '3.1'): # Version 3.1 - no device key requires - poll for status data points @@ -970,16 +1014,16 @@ def deviceScan(verbose = False,maxretry = MAXCOUNT): print(" No Stats - Device Key required to poll for status") except: if(verbose): - print(" No Stats for %s: Unable to poll"%ip) + print(" No Stats for %s: Unable to poll" % ip) devices[ip]['err'] = 'Unable to poll' else: - if(version=='3.1'): + if(version == '3.1'): count = count + 1 else: counts = counts + 1 if(verbose): - print(" \nScan Complete! Found %s devices.\n"%len(devices)) - + print(" \nScan Complete! Found %s devices.\n" % + len(devices)) + return(devices) -