Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scanner fixes and Wizard rework #261

Merged
merged 1 commit into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 4 additions & 15 deletions tinytuya/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
from . import wizard
from . import scanner

retries = 0
retries = None
state = 0
color = True
retriesprovided = False
force = False
force_list = []
last_force = False
Expand Down Expand Up @@ -58,7 +57,6 @@
else:
try:
retries = int(i)
retriesprovided = True
except:
state = 10

Expand All @@ -69,28 +67,19 @@

# State 0 = Run Network Scan
if state == 0:
if retriesprovided:
scanner.scan(scantime=retries, color=color, forcescan=force, discover=broadcast_listen, assume_yes=assume_yes)
else:
scanner.scan(color=color, forcescan=force, discover=broadcast_listen, assume_yes=assume_yes)
scanner.scan(scantime=retries, color=color, forcescan=force, discover=broadcast_listen, assume_yes=assume_yes)

# State 1 = Run Setup Wizard
if state == 1:
if retriesprovided:
wizard.wizard(color=color, retries=retries, forcescan=force)
else:
wizard.wizard(color=color, forcescan=force)
wizard.wizard(color=color, retries=retries, forcescan=force)

# State 2 = Snapshot Display and Scan
if state == 2:
scanner.snapshot(color=color)

# State 3 = Scan All Devices
if state == 3:
if retriesprovided:
scanner.alldevices(color=color, scantime=retries)
else:
scanner.alldevices(color=color)
scanner.alldevices(color=color, scantime=retries)

# State 4 = Scan All Devices
if state == 4:
Expand Down
17 changes: 15 additions & 2 deletions tinytuya/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1192,11 +1192,22 @@ def _process_response(self, response):
return response

def _negotiate_session_key(self):
rkey = self._send_receive_quick( self._negotiate_session_key_generate_step_1(), 2 )
step3 = self._negotiate_session_key_generate_step_3( rkey )
if not step3:
return False
self._send_receive_quick( step3, None )
self._negotiate_session_key_generate_finalize()
return True

def _negotiate_session_key_generate_step_1( self ):
self.local_nonce = b'0123456789abcdef' # not-so-random random key
self.remote_nonce = b''
self.local_key = self.real_local_key

rkey = self._send_receive_quick( MessagePayload(SESS_KEY_NEG_START, self.local_nonce), 2 )
return MessagePayload(SESS_KEY_NEG_START, self.local_nonce)

def _negotiate_session_key_generate_step_3( self, rkey ):
if not rkey or type(rkey) != TuyaMessage or len(rkey.payload) < 48:
# error
log.debug("session key negotiation failed on step 1")
Expand Down Expand Up @@ -1228,12 +1239,14 @@ def _negotiate_session_key(self):

if hmac_check != payload[16:48]:
log.debug("session key negotiation step 2 failed HMAC check! wanted=%r but got=%r", binascii.hexlify(hmac_check), binascii.hexlify(payload[16:48]))
return False

log.debug("session local nonce: %r remote nonce: %r", self.local_nonce, self.remote_nonce)

rkey_hmac = hmac.new(self.local_key, self.remote_nonce, sha256).digest()
self._send_receive_quick( MessagePayload(SESS_KEY_NEG_FINISH, rkey_hmac), None )
return MessagePayload(SESS_KEY_NEG_FINISH, rkey_hmac)

def _negotiate_session_key_generate_finalize( self ):
if IS_PY2:
k = [ chr(ord(a)^ord(b)) for (a,b) in zip(self.local_nonce,self.remote_nonce) ]
self.local_key = ''.join(k)
Expand Down
169 changes: 78 additions & 91 deletions tinytuya/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def getmyIPs( term, verbose, ask ):
answer = input( '%sScan network %s from interface %s?%s ([Y]es/[n]o/[a]ll yes): ' % (term.bold, k, str(interface), term.normal) )
if answer[0:1].lower() == 'a':
ask = 2
elif answer[0:1].lower() == 'n':
elif answer.lower().find('n') >= 0:
continue
if verbose:
print(term.dim + 'Adding Network', k, 'to the force-scan list')
Expand Down Expand Up @@ -271,71 +271,25 @@ def get_peer(self):

def v34_negotiate_sess_key_start( self ):
if self.debug:
print('v3.4 trying key', self.ip, self.device.real_local_key)
self.device.local_nonce = b'0123456789abcdef'
self.device.remote_nonce = b''
self.device.local_key = self.device.real_local_key
self.sock.sendall( self.device._encode_message( tinytuya.MessagePayload(tinytuya.SESS_KEY_NEG_START, self.device.local_nonce) ) )
print('v3.4/5 trying key', self.ip, self.device.real_local_key)
step1 = self.device._negotiate_session_key_generate_step_1()
self.sock.sendall( self.device._encode_message( step1 ) )
if self.debug:
print('v3.4 session key neg start, debug ip', self.ip)

print('v3.4/5 session key neg start, debug ip', self.ip)

def v34_negotiate_sess_key_step_2( self, rkey ):
if not rkey or type(rkey) != tinytuya.TuyaMessage or len(rkey.payload) < 48:
# error
self.deviceinfo["err"] = 'v3.4 device session key negotiation failed on step 1'
log.debug(self.deviceinfo["err"])
return False

lastloglevel = log.level
if self.debug:
log.setLevel(logging.DEBUG)

payload = rkey.payload
try:
log.debug("decrypting=%r", payload)
cipher = tinytuya.AESCipher(self.device.real_local_key)
payload = cipher.decrypt(payload, False, decode_text=False)
except:
self.deviceinfo["err"] = 'v3.4 device session key step 2 decrypt failed, payload=%r (len:%d) l-key:%r l-nonce:%r' % (payload, len(payload), self.device.real_local_key, self.device.local_nonce)
log.warning(self.deviceinfo["err"], exc_info=True)
step3 = self.device._negotiate_session_key_generate_step_3( rkey )
if not step3:
log.setLevel(lastloglevel)
return False

log.debug("decrypted session key negotiation step 2 payload=%r", payload)
log.debug("payload type = %s len = %d", type(payload), len(payload))

if len(payload) < 48:
self.deviceinfo["err"] = "v3.4 device session key negotiation step 2 failed, too short response"
log.debug(self.deviceinfo["err"])
log.setLevel(lastloglevel)
return False

self.device.remote_nonce = payload[:16]
hmac_check = hmac.new(self.device.local_key, self.device.local_nonce, sha256).digest()

if hmac_check != payload[16:48]:
log.debug("session key negotiation step 2 failed HMAC check! wanted=%r but got=%r", binascii.hexlify(hmac_check), binascii.hexlify(payload[16:48]))

log.debug("session local nonce: %r remote nonce: %r", self.device.local_nonce, self.device.remote_nonce)

rkey_hmac = hmac.new(self.device.local_key, self.device.remote_nonce, sha256).digest()
self.sock.sendall( self.device._encode_message( tinytuya.MessagePayload(tinytuya.SESS_KEY_NEG_FINISH, rkey_hmac) ) )

if tinytuya.IS_PY2:
k = [ chr(ord(a)^ord(b)) for (a,b) in zip(self.device.local_nonce,self.device.remote_nonce) ]
self.device.local_key = ''.join(k)
else:
self.device.local_key = bytes( [ a^b for (a,b) in zip(self.device.local_nonce,self.device.remote_nonce) ] )
log.debug("Session nonce XOR'd: %r" % self.device.local_key)

cipher = tinytuya.AESCipher(self.device.real_local_key)
self.device.local_key = cipher.encrypt(self.device.local_key, False, pad=False)
log.debug("Session key negotiate success! session key: %r", self.device.local_key)
self.sock.sendall( self.device._encode_message( step3 ) )
self.device._negotiate_session_key_generate_finalize()
log.setLevel(lastloglevel)
return True


class ForceScannedDevice(DeviceDetect):
def __init__( self, ip, deviceinfo, options, debug ):
super(ForceScannedDevice, self).__init__( ip, deviceinfo, options, debug )
Expand Down Expand Up @@ -504,7 +458,7 @@ def write_data( self ):
# v3.1 devices will return the status
# v3.2 devices will ???
# v3.3 devices will return an encrypted rejection message
# v3.4 devices will slam the door in our face by dropping the connection
# v3.4/3.5 devices will slam the door in our face by dropping the connection
if self.deviceinfo['dev_type'] == 'device22':
msg = tinytuya.MessagePayload(tinytuya.CONTROL_NEW, b'')
else:
Expand Down Expand Up @@ -1051,6 +1005,13 @@ def tuyaLookup(deviceid):
# No Device info
pass

if forcescan and not len(tuyadevices):
if discover:
print(term.alert + 'Warning: Force-scan requires keys in %s but no keys were found. Disabling force-scan.' % DEVICEFILE + term.normal)
forcescan = False
else:
raise RuntimeError('Force-scan requires keys in %s but no keys were found.' % DEVICEFILE)

if discover:
# Enable UDP listening broadcasting mode on UDP port 6666 - 3.1 Devices
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
Expand Down Expand Up @@ -1109,6 +1070,7 @@ def tuyaLookup(deviceid):
ip_force_wants_end = False
ip_scan = False
ip_scan_running = False
ip_scan_delay = False
scan_end_time = time.time() + scantime
device_end_time = 0
log.debug("Listening for Tuya devices on UDP " + str(UDPPORT) + " and " + str(UDPPORTS) + " and " + str(UDPPORTAPP))
Expand Down Expand Up @@ -1174,6 +1136,8 @@ def tuyaLookup(deviceid):
if networks:
scan_ips = _generate_ip( networks, verbose, term )
ip_scan = ip_scan_running = True
if discover:
ip_scan_delay = time.time() + 5

# Warn user of scan duration
if verbose:
Expand Down Expand Up @@ -1238,8 +1202,11 @@ def tuyaLookup(deviceid):
devicelist.remove(dev)

if show_timer:
end_time = int((scan_end_time if scan_end_time > device_end_time else device_end_time) - time.time())
if end_time < 0: end_time = 0
if scan_end_time > device_end_time:
end_time = int(scan_end_time - time.time())
if end_time < 0: end_time = 0
else:
end_time = 'Devs:'+str(len(devicelist))
tim = 'FS:'+str(current_ip) if ip_scan_running else str(end_time)
print("%sScanning... %s (%s) %s \r" % (term.dim, spinner[spinnerx], tim, devices_with_timers), end="")
spinnerx = (spinnerx + 1) % 4
Expand All @@ -1251,7 +1218,11 @@ def tuyaLookup(deviceid):
need_sleep = 0.2
# time out any sockets which have not yet connected
# no need to run this every single time through the loop
if len(write_socks) < max_parallel:
if ip_scan_delay:
if ip_scan_delay < time.time():
ip_scan_delay = False
if (not ip_scan_delay) and len(write_socks) < max_parallel:
ip_scan_delay = False
want = max_parallel - len(write_socks)
# only open 10 at most during each pass through select()
if want > 10: want = 10
Expand Down Expand Up @@ -1291,9 +1262,13 @@ def tuyaLookup(deviceid):

if len(write_socks) > 0:
rd, wr, _ = select.select( read_socks, write_socks, [], 0 )
else:
elif len(read_socks) > 0:
rd, _, _ = select.select( read_socks, [], [], 0 )
wr = []
else:
# not listening for broadcasts and no open sockets yet
rd = []
wr = []
except KeyboardInterrupt as err:
log.debug('Keyboard Interrupt')
if verbose: print("\n**User Break**")
Expand Down Expand Up @@ -1368,6 +1343,9 @@ def tuyaLookup(deviceid):
result["key"] = dkey
result["mac"] = mac

if 'id' not in result:
result['id'] = result['gwId']

if verbose:
broadcast_messages[ip] = term.alertdim + term.dim + 'New Broadcast from ' + str(ip) + ' / ' + str(mac) + ' ' + str(result) + term.normal
if False:
Expand Down Expand Up @@ -1656,32 +1634,33 @@ def snapshot(color=True):
try:
with open(SNAPSHOTFILE) as json_file:
data = json.load(json_file)
except:
print("%s ERROR: Missing %s file\n" % (term.alert, SNAPSHOTFILE))
except Exception as e:
#traceback.print_exc(0)
print("%s ERROR: Missing %s file:%s %s: %s\n" % (term.alert, SNAPSHOTFILE, term.normal, type(e).__name__, e))
return

print("%sLoaded %s - %d devices:\n" % (term.dim, SNAPSHOTFILE, len(data["devices"])))

# Print a table with all devices
table = []
print("%s%-25s %-24s %-18s %-17s %-5s" % (term.normal, "Name","ID", "IP","Key","Version"))
print("%s%-25s %-24s %-15s %-17s %-5s" % (term.normal, "Name","ID", "IP","Key","Version"))
print(term.dim)
by_ip = {}
devicesx = sorted(data["devices"], key=lambda x: x['name'])
for idx in devicesx:
device = _build_item( idx, None )
ips = device['ip'] if device['ip'] else (term.alert + "Error: No IP found" + term.normal)
ips = device['ip'].ljust(15) if device['ip'] else (term.alert + "E: No IP found " + term.normal)
dname = device['name']
if dname == "":
dname = device['gwId']
print("%s%-25.25s %s%-24s %s%-18s %s%-17s %s%-5s" %
print("%s%-25.25s %s%-24s %s%s %s%-17s %s%-5s" %
(term.dim, dname, term.cyan, device['gwId'], term.subbold, ips, term.red, device['key'], term.yellow, device['version']))
if device['ip']:
by_ip[device['ip']] = device

# Find out if we should poll all devices
answer = input(term.subbold + '\nPoll local devices? ' + term.normal + '(Y/n): ')
if answer[0:1].lower() != 'n':
if answer.lower().find('n') < 0:
print("")
print("%sPolling %s local devices from last snapshot..." % (term.normal, len(devicesx)))
result = devices(verbose=False, scantime=0, color=color, poll=True, byID=True, discover=False, snapshot=by_ip)
Expand Down Expand Up @@ -1738,42 +1717,50 @@ def alldevices(color=True, scantime=None):
print(output)

# Find out if we should poll all devices
answer = 'y' #input(term.subbold + '\nPoll local devices? ' + term.normal + '(Y/n): ')
if answer[0:1].lower() != 'n':
by_id = [x['id'] for x in tuyadevices]
# Scan network for devices and provide polling data
print(term.normal + "\nScanning local network for Tuya devices...")
result = devices(verbose=False, poll=True, byID=True, scantime=scantime, wantids=by_id, show_timer=True)
print(" %s%s local devices discovered%s" % (term.dim, len(result), term.normal))
print("")
answer = input(term.subbold + '\nPoll local devices? ' + term.normal + '(Y/n): ')
if answer.lower().find('n') < 0:
poll_and_display( tuyadevices, color=color, scantime=scantime, snapshot=True )

polling = []
print("Polling local devices...")
# devices = sorted(data["devices"], key=lambda x: x['name'])
for idx in sorted(tuyadevices, key=lambda x: x['name']):
gwId = _get_gwid( idx )
if gwId and gwId in result:
item = _build_item( idx, result[gwId] )
if 'dps' in result[gwId] and 'dps' in result[gwId]['dps']:
_display_status( item, result[gwId]['dps']['dps'], term )
else:
_display_status( item, None, term )
print("%s\nDone.\n" % term.dim)
return

def poll_and_display( tuyadevices, color=True, scantime=None, snapshot=False, forcescan=False ):
termcolors = tinytuya.termcolor(color)
term = TermColors( *termcolors )

by_id = [x['id'] for x in tuyadevices]
# Scan network for devices and provide polling data
print(term.normal + "\nScanning local network for Tuya devices...")
result = devices(verbose=False, poll=True, byID=True, scantime=scantime, wantids=by_id, show_timer=True, forcescan=forcescan)
print(" %s%s local devices discovered%s" % (term.dim, len(result), term.normal))
print("")

polling = []
print("Polling local devices...")
# devices = sorted(data["devices"], key=lambda x: x['name'])
for idx in sorted(tuyadevices, key=lambda x: x['name']):
gwId = _get_gwid( idx )
if gwId and gwId in result:
item = _build_item( idx, result[gwId] )
if 'dps' in result[gwId] and 'dps' in result[gwId]['dps']:
_display_status( item, result[gwId]['dps']['dps'], term )
else:
item = _build_item( idx, None )
_display_status( item, None, term )
polling.append(item)
# for loop
else:
item = _build_item( idx, None )
_display_status( item, None, term )
polling.append(item)
# for loop

if snapshot:
# Save polling data snapsot
current = {'timestamp' : time.time(), 'devices' : polling}
output = json.dumps(current, indent=4)
print(term.bold + "\n>> " + term.normal + "Saving device snapshot data to " + SNAPSHOTFILE)
with open(SNAPSHOTFILE, "w") as outfile:
outfile.write(output)

print("%s\nDone.\n" % term.dim)
return

return polling

# Scan Devices in tuyascan.json - respond in JSON
def snapshotjson():
Expand Down
Loading