Skip to content

Commit

Permalink
Merge pull request #186 from uzlonewolf/dev-find-ip
Browse files Browse the repository at this point in the history
Rework device finding for auto-IP detection, and unpack_message() retcode fix
  • Loading branch information
jasonacox authored Oct 4, 2022
2 parents f400ae9 + 9f96d45 commit 02a46df
Showing 1 changed file with 79 additions and 73 deletions.
152 changes: 79 additions & 73 deletions tinytuya/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import json
import logging
import socket
import select
import struct
import sys
import time
Expand Down Expand Up @@ -100,6 +101,7 @@

# Globals Network Settings
MAXCOUNT = 15 # How many tries before stopping
SCANTIME = 18 # How many seconds to wait before stopping device discovery
UDPPORT = 6666 # Tuya 3.1 UDP Port
UDPPORTS = 6667 # Tuya 3.3 encrypted UDP Port
TCPPORT = 6668 # Tuya TCP Local Port
Expand Down Expand Up @@ -339,7 +341,7 @@ def unpack_message(data, hmac_key=None, header=None, no_retcode=False):
log.debug('unpack_message(): not enough data to unpack payload! need %d but only have %d', header_len+header.length, len(data))
raise DecodeError('Not enough data to unpack payload')

retcode = 0 if no_retcode else struct.unpack(MESSAGE_RETCODE_FMT, data[header_len:headret_len])
retcode = 0 if no_retcode else struct.unpack(MESSAGE_RETCODE_FMT, data[header_len:headret_len])[0]
# the retcode is technically part of the payload, but strip it as we do not want it here
payload = data[header_len+retcode_len:header_len+header.length]
crc, suffix = struct.unpack(end_fmt, payload[-end_len:])
Expand Down Expand Up @@ -400,6 +402,77 @@ def error_json(number=None, payload=None):

return json.loads('{ "Error":"%s", "Err":"%s", "Payload":%s }' % vals)

def find_device(dev_id=None, address=None):
"""Scans network for Tuya devices with ID = dev_id
Parameters:
dev_id = The specific Device ID you are looking for (returns only IP and Version)
Response:
(ip, version, dev_id)
"""
if dev_id is None and address is None:
return (None, None, None)
log.debug("Listening for device %s on the network", dev_id)
# Enable UDP listening broadcasting mode on UDP port 6666 - 3.1 Devices
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
client.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
client.bind(("", UDPPORT))
client.setblocking(False)
# Enable UDP listening broadcasting mode on encrypted UDP port 6667 - 3.3 Devices
clients = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
clients.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
clients.bind(("", UDPPORTS))
clients.setblocking(False)

deadline = time.time() + SCANTIME
selecttime = SCANTIME
ret = (None, None, None)

while (ret[0] is None) and (selecttime > 0):
rd, _, _ = select.select( [client, clients], [], [], selecttime )
for sock in rd:
try:
data, addr = sock.recvfrom(4048)
except:
# Timeout
continue
ip = addr[0]
gwId = version = ""
result = data
try:
result = data[20:-8]
try:
result = decrypt_udp(result)
except:
result = result.decode()

result = json.loads(result)
ip = result["ip"]
gwId = result["gwId"]
version = result["version"]
log.debug( 'find() received broadcast from %r: %r', ip, result )
except:
result = {"ip": ip}
log.debug( 'find() failed to decode broadcast from %r: %r', addr, data )

# Check to see if we are only looking for one device
if dev_id and gwId == dev_id:
# We found it by dev_id!
ret = (ip, version, gwId)
break
elif address and address == ip:
# We found it by ip!
ret = (ip, version, gwId)
break

selecttime = deadline - time.time()

# while
clients.close()
client.close()
log.debug( 'find() is returning: %r', ret )
return ret

# Tuya Device Dictionary - Command and Payload Overrides
#
Expand Down Expand Up @@ -489,7 +562,7 @@ def __init__(
self.dps_cache = {}
if address is None or address == "Auto" or address == "0.0.0.0":
# try to determine IP address automatically
(addr, ver) = self.find(dev_id)
(addr, ver, did) = find_device(dev_id)
if addr is None:
log.debug("Unable to find device on network (specify IP address)")
raise Exception("Unable to find device on network (specify IP address)")
Expand Down Expand Up @@ -1073,77 +1146,10 @@ def set_sendWait(self, s):
def close(self):
self.__del__()

def find(self, did=None):
"""Scans network for Tuya devices with ID = did
Parameters:
did = The specific Device ID you are looking for (returns only IP and Version)
Response:
(ip, version)
"""
if did is None:
return (None, None)
log.debug("Listening for device %s on the network", did)
# Enable UDP listening broadcasting mode on UDP port 6666 - 3.1 Devices
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
client.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
client.bind(("", UDPPORT))
client.setblocking(False)
# Enable UDP listening broadcasting mode on encrypted UDP port 6667 - 3.3 Devices
clients = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
clients.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
clients.bind(("", UDPPORTS))
clients.setblocking(False)

count = 0
counts = 0
maxretry = 180
ret = (None, None)

while maxretry:
maxretry -= 1
time.sleep(0.1)
while True:
data = addr = None
try:
data, addr = client.recvfrom(4048)
except:
# Timeout
try:
data, addr = clients.recvfrom(4048)
except:
# Timeout
break
ip = addr[0]
gwId = version = ""
result = data
try:
result = data[20:-8]
try:
result = decrypt_udp(result)
except:
result = result.decode()

result = json.loads(result)
ip = result["ip"]
gwId = result["gwId"]
version = result["version"]
except:
result = {"ip": ip}

# Check to see if we are only looking for one device
if gwId == did:
# We found it!
ret = (ip, version)
maxretry = False
break

# while
clients.close()
client.close()
log.debug(ret)
return ret
@staticmethod
def find(did):
(ip, ver, dev_id) = find_device(dev_id=did)
return (ip, ver)

def generate_payload(self, command, data=None, gwId=None, devId=None, uid=None):
"""
Expand Down

0 comments on commit 02a46df

Please sign in to comment.