Skip to content

Commit

Permalink
Add force-scanning v3.5 devices to scanner
Browse files Browse the repository at this point in the history
  • Loading branch information
uzlonewolf committed Jun 23, 2024
1 parent d3bc3d0 commit d088823
Showing 1 changed file with 37 additions and 9 deletions.
46 changes: 37 additions & 9 deletions tinytuya/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ def __init__( self, ip, deviceinfo, options, debug ):
self.timeo = 0
self.resets = 0
self.step = FSCAN_NOT_STARTED
self.try_v35 = False
self.cur_key = None
self.hard_time_limit = time.time() + 30
self.initial_connect_retries = options['retries']
Expand Down Expand Up @@ -275,6 +276,11 @@ def v34_negotiate_sess_key_start( self ):
print('v3.4/5 trying key', self.ip, self.device.real_local_key)
step1 = self.device._negotiate_session_key_generate_step_1()
self.sock.sendall( self.device._encode_message( step1 ) )
if self.try_v35 and self.device.version == 3.4:
self.device.version = 3.5
step1 = self.device._negotiate_session_key_generate_step_1()
self.sock.sendall( self.device._encode_message( step1 ) )
self.device.version = 3.4
if self.debug:
print('v3.4/5 session key neg start, debug ip', self.ip)

Expand All @@ -297,6 +303,8 @@ def __init__( self, ip, deviceinfo, options, debug ):
self.retries = 0
self.keygen = None
self.brute_force_data = []
self.try_v35 = True
self.v34_connect_ok = False

self.connect()

Expand Down Expand Up @@ -334,7 +342,7 @@ def timeout( self, forced=False ):
print('ForceScannedDevice: Debug sock', self.ip, 'connect timed out!')
elif self.step == FSCAN_INITIAL_CONNECT:
if self.debug:
print('ForceScannedDevice: Debug sock', self.ip, 'socket send failed,', 'no data received' if forced else 'receive timed out')
print('ForceScannedDevice: Debug sock', self.ip, 'socket send failed,', 'no data received,' if forced else 'receive timed out,', 'current retry:', self.retries)
if self.retries < 2:
self.retries += 1
self.connect()
Expand Down Expand Up @@ -366,8 +374,9 @@ def timeout( self, forced=False ):
self.remove = True
else:
self.connect()
elif self.step == FSCAN_v34_BRUTE_FORCE_ACTIVE: # or self.step == FSCAN_v33_BRUTE_FORCE_ACTIVE or self.step == FSCAN_v31_BRUTE_FORCE_ACTIVE:
if not forced:
self.v34_connect_ok = False
elif self.step == FSCAN_v34_BRUTE_FORCE_ACTIVE:
if( (not forced) and (not self.v34_connect_ok) ):
# actual timeout, connect failed
if self.retries < 2:
self.retries += 1
Expand All @@ -382,6 +391,7 @@ def timeout( self, forced=False ):
return
# brute-forcing the key
self.v3x_brute_force_try_next_key()
self.v34_connect_ok = False
elif self.step == FSCAN_v31_BRUTE_FORCE_ACTIVE:
# brute-forcing the key
self.v3x_brute_force_try_next_key()
Expand Down Expand Up @@ -418,6 +428,8 @@ def write_data( self ):
# 'False' when connection was made but then closed
# The IP address when the connection is still open
addr = self.get_peer()
if self.debug:
print('ForceScannedDevice: device', self.ip, 'addr is:', addr)
if addr is None:
# refused
self.close()
Expand All @@ -435,6 +447,7 @@ def write_data( self ):
#self.timeo = time.time() + self.options['data_timeout']
self.timeo = time.time() + 1.5
self.found = True
self.v34_connect_ok = True

if len(self.send_queue) > 0:
self.sock.sendall( self.device._encode_message( self.send_queue[0] ) )
Expand Down Expand Up @@ -502,15 +515,30 @@ def read_data( self ):
if self.debug:
print('ForceScannedDevice:', self.ip, 'got step', self.step, 'data:', data )

if len(data) == 0:
if len(data) == 0:
self.timeout( True )
return

while len(data):
try:
prefix_offset = data.find(tinytuya.PREFIX_BIN)
if prefix_offset > 0:
data = data[prefix_offset:]
if self.deviceinfo['version'] == 3.5:
prefix_offset = data.find(tinytuya.PREFIX_6699_BIN)
if prefix_offset > 0:
data = data[prefix_offset:]
else:
prefix_offset = data.find(tinytuya.PREFIX_BIN)
if prefix_offset >= 0:
data = data[prefix_offset:]
elif self.try_v35 and self.deviceinfo['version'] == 3.4:
prefix_offset = data.find(tinytuya.PREFIX_6699_BIN)
if prefix_offset >= 0:
if self.debug:
print('ForceScannedDevice: device is v3.5!')
data = data[prefix_offset:]
self.try_v35 = False
self.deviceinfo['version'] = 3.5
self.device.set_version(3.5)
self.ver_found = True
hmac_key = self.device.local_key if self.deviceinfo['version'] >= 3.4 else None
msg = tinytuya.unpack_message(data, hmac_key=hmac_key)
except:
Expand Down Expand Up @@ -668,7 +696,7 @@ def brute_force_v3x_data( self ):
self.brute_force_found_key()
return True

self.brute_force_data = []
self.brute_force_data = []
return False

def v3x_brute_force_try_next_key( self ):
Expand Down Expand Up @@ -794,7 +822,7 @@ def write_data( self ):
self.write = False

try:
# connected, send the query
# connected, send the query
if self.device.version >= 3.4 :
# self.device.real_local_key, self.device.local_key
self.v34_negotiate_sess_key_start()
Expand Down

0 comments on commit d088823

Please sign in to comment.