Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework device finding for auto-IP detection, and unpack_message() retcode fix #186

Merged
merged 6 commits into from
Oct 4, 2022
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 79 additions & 73 deletions tinytuya/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import json
import logging
import socket
import select
import struct
import sys
import time
Expand Down Expand Up @@ -100,6 +101,7 @@

# Globals Network Settings
MAXCOUNT = 15 # How many tries before stopping
SCANTIME = 18 # How many seconds to wait before stopping device discovery
UDPPORT = 6666 # Tuya 3.1 UDP Port
UDPPORTS = 6667 # Tuya 3.3 encrypted UDP Port
TCPPORT = 6668 # Tuya TCP Local Port
Expand Down Expand Up @@ -339,7 +341,7 @@ def unpack_message(data, hmac_key=None, header=None, no_retcode=False):
log.debug('unpack_message(): not enough data to unpack payload! need %d but only have %d', header_len+header.length, len(data))
raise DecodeError('Not enough data to unpack payload')

retcode = 0 if no_retcode else struct.unpack(MESSAGE_RETCODE_FMT, data[header_len:headret_len])
retcode = 0 if no_retcode else struct.unpack(MESSAGE_RETCODE_FMT, data[header_len:headret_len])[0]
# the retcode is technically part of the payload, but strip it as we do not want it here
payload = data[header_len+retcode_len:header_len+header.length]
crc, suffix = struct.unpack(end_fmt, payload[-end_len:])
Expand Down Expand Up @@ -400,6 +402,77 @@ def error_json(number=None, payload=None):

return json.loads('{ "Error":"%s", "Err":"%s", "Payload":%s }' % vals)

def find_device(dev_id=None, address=None):
"""Scans network for Tuya devices with ID = dev_id

Parameters:
dev_id = The specific Device ID you are looking for (returns only IP and Version)

Response:
(ip, version, dev_id)
"""
if dev_id is None and address is None:
return (None, None, None)
log.debug("Listening for device %s on the network", dev_id)
# Enable UDP listening broadcasting mode on UDP port 6666 - 3.1 Devices
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
client.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
client.bind(("", UDPPORT))

Check warning

Code scanning / CodeQL

Binding a socket to all network interfaces

'' binds a socket to all interfaces.
client.setblocking(False)
# Enable UDP listening broadcasting mode on encrypted UDP port 6667 - 3.3 Devices
clients = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
clients.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
clients.bind(("", UDPPORTS))

Check warning

Code scanning / CodeQL

Binding a socket to all network interfaces

'' binds a socket to all interfaces.
clients.setblocking(False)

deadline = time.time() + SCANTIME
selecttime = SCANTIME
ret = (None, None)

while (ret[0] is None) and (selecttime > 0):
rd, _, _ = select.select( [client, clients], [], [], selecttime )
for sock in rd:
try:
data, addr = sock.recvfrom(4048)
except:
# Timeout
continue
ip = addr[0]
gwId = version = ""
result = data
try:
result = data[20:-8]
try:
result = decrypt_udp(result)
except:
result = result.decode()

result = json.loads(result)
ip = result["ip"]
gwId = result["gwId"]
version = result["version"]
log.debug( 'find() received broadcast from %r: %r', ip, result )
except:
result = {"ip": ip}
log.debug( 'find() failed to decode broadcast from %r: %r', addr, data )

# Check to see if we are only looking for one device
if dev_id and gwId == dev_id:
# We found it by dev_id!
ret = (ip, version, gwId)
break
elif address and address == ip:
# We found it by ip!
ret = (ip, version, gwId)
break

selecttime = deadline - time.time()

# while
clients.close()
client.close()
log.debug( 'find() is returning: %r', ret )
return ret

# Tuya Device Dictionary - Command and Payload Overrides
#
Expand Down Expand Up @@ -489,7 +562,7 @@ def __init__(
self.dps_cache = {}
if address is None or address == "Auto" or address == "0.0.0.0":
# try to determine IP address automatically
(addr, ver) = self.find(dev_id)
(addr, ver, did) = find_device(dev_id)
if addr is None:
log.debug("Unable to find device on network (specify IP address)")
raise Exception("Unable to find device on network (specify IP address)")
Expand Down Expand Up @@ -1073,77 +1146,10 @@ def set_sendWait(self, s):
def close(self):
self.__del__()

def find(self, did=None):
"""Scans network for Tuya devices with ID = did

Parameters:
did = The specific Device ID you are looking for (returns only IP and Version)

Response:
(ip, version)
"""
if did is None:
return (None, None)
log.debug("Listening for device %s on the network", did)
# Enable UDP listening broadcasting mode on UDP port 6666 - 3.1 Devices
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
client.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
client.bind(("", UDPPORT))
client.setblocking(False)
# Enable UDP listening broadcasting mode on encrypted UDP port 6667 - 3.3 Devices
clients = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
clients.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
clients.bind(("", UDPPORTS))
clients.setblocking(False)

count = 0
counts = 0
maxretry = 180
ret = (None, None)

while maxretry:
maxretry -= 1
time.sleep(0.1)
while True:
data = addr = None
try:
data, addr = client.recvfrom(4048)
except:
# Timeout
try:
data, addr = clients.recvfrom(4048)
except:
# Timeout
break
ip = addr[0]
gwId = version = ""
result = data
try:
result = data[20:-8]
try:
result = decrypt_udp(result)
except:
result = result.decode()

result = json.loads(result)
ip = result["ip"]
gwId = result["gwId"]
version = result["version"]
except:
result = {"ip": ip}

# Check to see if we are only looking for one device
if gwId == did:
# We found it!
ret = (ip, version)
maxretry = False
break

# while
clients.close()
client.close()
log.debug(ret)
return ret
@staticmethod
def find(did):
(ip, ver, dev_id) = find_device(dev_id=did)
return (ip, ver)

def generate_payload(self, command, data=None, gwId=None, devId=None, uid=None):
"""
Expand Down