From ec71a100a0ad1eaca97a02acb6a64125321eaa32 Mon Sep 17 00:00:00 2001 From: uzlonewolf Date: Thu, 10 Nov 2022 18:22:33 -0800 Subject: [PATCH 1/4] Add head_key_to_pulses() and pulses_to_head_key() functions --- tinytuya/Contrib/IRRemoteControlDevice.py | 637 ++++++++++++++++++++++ 1 file changed, 637 insertions(+) diff --git a/tinytuya/Contrib/IRRemoteControlDevice.py b/tinytuya/Contrib/IRRemoteControlDevice.py index 8e7329e5..30d10003 100644 --- a/tinytuya/Contrib/IRRemoteControlDevice.py +++ b/tinytuya/Contrib/IRRemoteControlDevice.py @@ -28,6 +28,14 @@ IRRemoteControlDevice.pulses_to_base64 ( pulses ) -> convert sequence of pulses and gaps length to Base64-encoded button code + IRRemoteControlDevice.head_key_to_pulses ( head='...', key='...' ) + -> convert head/key pair to sequence of pulses and gaps + 'head' can be omitted if key starts with '1' + 'key' must begin with '000' through '1FF' + + IRRemoteControlDevice.pulses_to_head_key ( pulses ) + -> attempt to pack a sequence of pulses and gaps into a head/key pair + IRRemoteControlDevice.hex_to_pulses ( code_hex ) -> convert HEX-encoded button code to sequence of pulses and gaps length HEX-encoded codes are used in the Cloud API @@ -73,6 +81,8 @@ import base64 import json import logging +import struct +from math import ceil, floor from ..core import Device, log, CONTROL @@ -95,6 +105,7 @@ class IRRemoteControlDevice(Device): NSDP_DELAY = "delay" # Actually used but not documented NSDP_HEAD = "head" # Actually used but not documented NSDP_KEY1 = "key1" # Actually used but not documented + KEY1_SYMBOL_LIST = "@#$%^&*()" # Timing symbols used in key1 def __init__(self, *args, **kwargs): # set the default version to 3.3 as there are no 3.1 devices @@ -184,6 +195,632 @@ def pulses_to_base64( pulses ): raw_bytes = [x for xs in raw_bytes for x in xs] # flatten return base64.b64encode(bytes(raw_bytes)).decode("ascii") + @staticmethod + def head_key_to_pulses( head, key ): + if len(key) < 4: + raise ValueError( '"key" must be at least 4 characters' ) + + keytype = int(key[0], 16) + if keytype < 0 or keytype > 1: + raise ValueError( 'First digit of "key" must be "0" or "1" (got: %r)' % keytype ) + elif keytype == 1: + return IRRemoteControlDevice.base64_to_pulses( key ) + + if len(head) < 18: + raise ValueError( '"head" must be at least 18 characters' ) + + head = bytearray.fromhex( head ) + headtype, timescale, unused1, unused2, num_timings = struct.unpack( '>BHHHH', head[:9] ) + headlen = num_timings * 2 # times are 16-bit + timebase = 100000.0 / timescale + symbols = '@#$%^&*()'[:num_timings] + try: + repeat = int( key[1:3], 16 ) + except: + raise ValueError( 'First digit of "key" must be "0" or "1" and the next 2 digits must be a hexidecimal byte' ) + key = key[3:] + + # 'head' type 1 uses '@' for 0 and '#' for 1 + # 'head' type 2 uses '#' for 0 and '$' for 1 + + if headtype == 1: + bit_timimgs = ( '@@', '@#' ) + elif headtype == 2: + bit_timimgs = ( '@#', '@$' ) + else: + raise ValueError( 'Unhandled "head" type: %d' % headtype ) + + if len(head) != (headlen+9): + raise ValueError( '"head" must be %d characters' % ((headlen+9)*2) ) + + # unpack the timing values, however many there are + fmt = '>%dH' % num_timings + timings = struct.unpack( fmt, head[9:] ) + symbol_timings = {} + + for i in range(num_timings): + symbol_timings[symbols[i]] = round(timebase * timings[i]) + + if False: + print( 'Head:' ) + print( 'Frequency: %r kHz, Time Base: %r' % (timescale / 100.0, timebase) ) + print( 'Bit Symbols: 0 = %s, 1 = %s' % bit_timimgs) + print( 'Symbol Timings:' ) + for i in range(num_timings): + print( ' %s = %r microseconds' % (symbols[i], symbol_timings[symbols[i]]) ) + print( '' ) + print( 'Key:' ) + print( 'Send count:', repeat ) + print( 'Code:', key ) + + # although it's not as effiecient, it's easier to see what's going on if + # you first unpack the packed bits into their symbol pairs, and then + # expand those symbols to their timing times + + expanded = '' + while key: + cnt = 0 + # first, copy symbols as-is + for c in key: + if c not in symbols: + break + + expanded += c + cnt += 1 + + key = key[cnt:] + if not key: + # all finished + break + + # next, expand packed bits + #print( 'Unpacking:', key[:4] ) + byts, bits = struct.unpack( '>BB', bytearray.fromhex( key[:4] ) ) + key = key[4:] + if byts != 0: + # if the first byte is not 0, read in and transmit bytes until a symbol is encountered + cnt = 0 + bits = 0 + data = '' + for c in key: + c2 = ord(c.upper()) + if c2 < 0x30 or c2 > 0x46 or (c2 > 0x39 and c2 < 0x41): + # it's a symbol, we're done + break + data += c + cnt += 1 + bits += 4 + if (len(data) % 2): + data += '0' + else: + # if the first byte is 0, the next byte is how many bits to transmit + byts = ceil( bits / 8 ) + cnt = byts * 2 + data = key[:cnt] + + key = key[cnt:] + byts = bytearray.fromhex( data ) + # unpack the bits into symbol pairs + while bits: + d = byts[0] + byts = byts[1:] + for i in range(8): + if not bits: + break + # devices transmit MSB first + if (d & 0x80) == 0x80: + expanded += bit_timimgs[1] + else: + expanded += bit_timimgs[0] + d <<= 1 + bits -= 1 + + if False: + print( 'Expanded Code:', expanded ) + print( '' ) + print( 'Pulse train:' ) + + # expand the symbols into their pulses + for c in expanded: + print( symbol_timings[c], end=' ' ) + print( '' ) + + return [symbol_timings[c] for c in expanded] + + + @staticmethod + def pulses_to_head_key( pulses, fudge=0.1 ): + # see if it will decode with NEC/Samsung + res = None #IRRemoteControlDevice.pulses_to_space_encoded_head_key( pulses ) + print( 'res:', res ) + #return + #if not res: + # # next try SIRC + # res = IRRemoteControlDevice.pulses_to_pulse_encoded_head_key( pulses ) + + ps_count = { } + for p_in in pulses: + if p_in not in ps_count: + ps_count[p_in] = 1 + else: + ps_count[p_in] += 1 + + ps_map = IRRemoteControlDevice._merge_similar_pulse_times( ps_count, fudge ) + + p_count = { } + s_count = { } + is_pulse = False + for p_in in pulses: + is_pulse = not is_pulse + + if p_in in ps_map: + p_in = ps_map[p_in] + + if is_pulse: + if p_in not in p_count: + p_count[p_in] = 1 + else: + p_count[p_in] += 1 + else: + if p_in not in s_count: + s_count[p_in] = 1 + else: + s_count[p_in] += 1 + + print( 'p_count:', p_count, 's_count:', s_count ) + #p_map = IRRemoteControlDevice._merge_similar_pulse_times( p_count, fudge ) + #s_map = IRRemoteControlDevice._merge_similar_pulse_times( s_count, fudge ) + p_map = s_map = ps_map + + symbol_pattern = '' + symbol_list = { } + p_key_map = { } + s_key_map = { } + is_pulse = False + for p_in in pulses: + is_pulse = not is_pulse + if is_pulse: + k = p_map[p_in] if p_in in p_map else p_in + if k not in p_key_map: + mk = chr(len(p_key_map) + 0x41) # A-z + #print('adding pulse', k, mk) + p_key_map[k] = { 'count': 1, 'char': mk } + if mk not in symbol_list: + symbol_list[mk] = [k, False] + else: + p_key_map[k]['count'] += 1 + symbol_pattern += p_key_map[k]['char'] + else: + k = s_map[p_in] if p_in in s_map else p_in + if k not in s_key_map: + mk = chr(len(s_key_map) + 0x61) # a-z + #print('adding SPACE', k, mk) + s_key_map[k] = { 'count': 1, 'char': mk } + if mk not in symbol_list: + symbol_list[mk] = [k, False] + else: + s_key_map[k]['count'] += 1 + symbol_pattern += s_key_map[k]['char'] + + print( 'symbol_pattern:', symbol_pattern ) + print( 'symbol_list:', symbol_list ) + + pmax = { 'count': 0, 'time': 0 } + smax = { 'count': 0, 'time': 0 } + for k in p_key_map: + if p_key_map[k]['count'] > pmax['count']: + pmax['count'] = p_key_map[k]['count'] + pmax['time'] = k + for k in s_key_map: + if s_key_map[k]['count'] > smax['count']: + smax['count'] = s_key_map[k]['count'] + smax['time'] = k + print( 'smax, pmax:', smax, pmax ) + + #if False and smax['count'] > pmax['count']: + # # probably pulse-width encoded + # k = smax['time'] + # pat = s_key_map[k]['char'] + #else: + # # probably space-width encoded + # k = pmax['time'] + # pat = p_key_map[k]['char'] + #del pmax + #del smax + #print( 'pat:', pat ) + k = smax['time'] + s_pat = s_key_map[k]['char'] + + k = pmax['time'] + p_pat = p_key_map[k]['char'] + + print( 'p_pat s_pat:', p_pat, s_pat ) + + offset_0_shortest = offset_1_shortest = None + for offset in range( 2 ): + print( 'Trying offset', offset ) + + pat = p_pat if offset == 0 else s_pat + pat_counts = {} + for i in range( offset, len(symbol_pattern), 2 ): + if symbol_pattern[i] == pat: + k = symbol_pattern[i:i+2] + if len(k) == 2: + if k not in pat_counts: + pat_counts[k] = 1 + else: + pat_counts[k] += 1 + + print( 'pat_counts:', pat_counts ) + + # find the most-common pattern pair, and the next-most-common pattern pair + pat_max = [0, ''] + pat_next_max = [0, ''] + for k in pat_counts: + if pat_counts[k] > pat_max[0]: + pat_max[0] = pat_counts[k] + pat_max[1] = k + for k in pat_counts: + if k != pat_max[1] and pat_counts[k] > pat_next_max[0]: + pat_next_max[0] = pat_counts[k] + pat_next_max[1] = k + + print( 'pat_max', pat_max, 'pat_next_max', pat_next_max) + + # reset from the previous 'offset' loop + for k in symbol_list: + symbol_list[k][1] = False + + try_bitfield = True + new_symbol_pattern = '' + full_symbol_pattern = symbol_pattern + + a = pat_max[1][0] + b = pat_next_max[1][0] + if symbol_list[a][0] == symbol_list[b][0]: + # pulses are the same, it might be space-width encoded + symbol_list[a][1] = symbol_list[b][1] = '@' + a = pat_max[1][1] + b = pat_next_max[1][1] + if symbol_list[a][0] < symbol_list[b][0]: + symbol_list[a][1] = '#' + symbol_list[b][1] = '$' + zero_symbol = pat_max[1] + one_symbol = pat_next_max[1] + else: + symbol_list[a][1] = '$' + symbol_list[b][1] = '#' + zero_symbol = pat_next_max[1] + one_symbol = pat_max[1] + else: + # pulses are not the same + if symbol_list[a][0] < symbol_list[b][0]: + symbol_list[a][1] = '#' + symbol_list[b][1] = '$' + zero_symbol = pat_max[1] + one_symbol = pat_next_max[1] + else: + symbol_list[a][1] = '$' + symbol_list[b][1] = '#' + zero_symbol = pat_next_max[1] + one_symbol = pat_max[1] + + a = pat_max[1][1] + b = pat_next_max[1][1] + if symbol_list[a][0] == symbol_list[b][0]: + # but all spaces are the same, probably pulse-width encoded + symbol_list[a][1] = symbol_list[b][1] = '@' + new_symbol_pattern = symbol_pattern[0] + full_symbol_pattern = symbol_pattern[1:] + else: + symbol_list[a][1] = '@' + try_bitfield = False + + time_symbols = { } + for k in symbol_list: + if symbol_list[k][1]: + c = symbol_list[k][0] + time_symbols[c] = symbol_list[k][1] + + symbol_set = '%^&*()' + symbols_available = [] + need_abort = False + for c in symbol_set: + symbols_available.append(c) + print('sls1:', symbol_list ) + for k in symbol_list: + if not symbol_list[k][1]: + t = symbol_list[k][0] + if t in time_symbols: + symbol_list[k][1] = time_symbols[t] + continue + if not symbols_available: + #raise ValueError( 'Cannot convert pulses to head/key, too many unique pulse/space values' ) + print( 'Cannot convert pulses to head/key, too many unique pulse/space values' ) + #return None + need_abort = True + break + s = symbols_available.pop( 0 ) + symbol_list[k][1] = s + time_symbols[t] = s + print('sls2:', symbol_list ) + if need_abort: + continue + + print( 'zero:', zero_symbol, 'one:', one_symbol ) + + raw_symbol_pattern = '' + for c in symbol_pattern: + raw_symbol_pattern += symbol_list[c][1] + + if try_bitfield: + if offset: + c = full_symbol_pattern[0] + new_symbol_pattern += symbol_list[c][1] + + bits = data = 0 + byts = [] + for i in range( offset, len(full_symbol_pattern), 2 ): + k = full_symbol_pattern[i:i+2] + if k == zero_symbol: + bits += 1 + if bits == 8: + byts.append( data ) + bits = data = 0 + elif k == one_symbol: + bits += 1 + data |= 1 << (8 - bits) + if bits == 8: + byts.append( data ) + bits = data = 0 + else: + if bits or byts: + new_symbol_pattern += IRRemoteControlDevice._build_key_bitfield( bits, data, byts ) + bits = data = 0 + byts = [] + + for c in k: + new_symbol_pattern += symbol_list[c][1] + else: + print( 'Not attempting bitfield' ) + new_symbol_pattern = raw_symbol_pattern + + if len(new_symbol_pattern) > len(raw_symbol_pattern): + print( 'Bitfield pattern is longer than pulse/space symbol lists, using shorter symbol list' ) + new_symbol_pattern = raw_symbol_pattern + + if offset: + offset_1_shortest = new_symbol_pattern + + offset_1_symbol_list = { } + for k in symbol_list: + j = symbol_list[k][1] + offset_1_symbol_list[j] = symbol_list[k][0] + else: + offset_0_shortest = new_symbol_pattern + offset_0_symbol_list = { } + for k in symbol_list: + j = symbol_list[k][1] + offset_0_symbol_list[j] = symbol_list[k][0] + + if not offset_0_shortest: + new_symbol_pattern = offset_1_shortest + new_symbol_list = offset_1_symbol_list + elif not offset_1_shortest: + new_symbol_pattern = offset_0_shortest + new_symbol_list = offset_0_symbol_list + elif len(offset_0_shortest) < len(offset_1_shortest): + new_symbol_pattern = offset_0_shortest + new_symbol_list = offset_0_symbol_list + else: + new_symbol_pattern = offset_1_shortest + new_symbol_list = offset_1_symbol_list + + header = IRRemoteControlDevice._build_head_field( 2, 3800, new_symbol_list ) + + print( 'pattern options:', offset_0_shortest, offset_1_shortest ) + print( 'symbol list options:', offset_0_symbol_list, offset_1_symbol_list ) + print( 'new pattern:', header, new_symbol_pattern ) + print( p_key_map ) + print( s_key_map ) + #print( symbol_pattern ) + return header, '001' + new_symbol_pattern + + @staticmethod + def _merge_similar_pulse_times( p_count, fudge ): + p_map = { } + mod = True + while mod: + mod = False + merge = None + for p_in in p_count: + pfudge = p_in * fudge + pmin = p_in - pfudge + pmax = p_in + pfudge + for p_check in p_count: + if p_in == p_check: + continue + if p_check >= pmin and p_check <= pmax: + merge = (p_in, p_check) + print( 'merging', merge ) + break + if merge: + break + if merge: + mod = True + a = merge[0] + b = merge[1] + new_count = p_count[a] + p_count[b] + #new_p = round(((p_count[a] * a) + (p_count[b] * b)) / new_count) + new_p = round((a + b) / 2) + del p_count[a] + del p_count[b] + p_count[new_p] = new_count + p_map[a] = new_p + p_map[b] = new_p + + print('final map', p_map) + return p_map + + @staticmethod + def pulses_to_space_encoded_head_key( pulses ): + results = [] + bits = 0 + data = 0 + byts = [] + start_pulse = 0 + start_space = 0 + last_space = 0 + + is_pulse = False + fail_if_again = False + for p_in in pulses: + if fail_if_again: + return None + is_pulse = not is_pulse + if is_pulse: + print( 'p', p_in ) + if p_in >= 7900 and p_in <= 10100: + # NEC protocol start pulse + if start_pulse > 0: + if not start_space: + return None + bits2 = (len(byts) * 8) + bits + if bits2 == 0 and start_space == 2250: + # repeat code + pass + elif bits2 < 8 or bits2 > 100: + return None + results.append( (start_pulse, start_space, byts, bits, data, last_space) ) + start_pulse = 9000 + bits = data = start_space = last_space = 0 + byts = [] + elif p_in >= 3400 and p_in <= 5600: + # Samsung procol start pulse + if start_pulse > 0: + if not start_space: + return None + bits2 = (len(byts) * 8) + bits + if bits2 == 0 and start_space == 2250: + # repeat code + pass + elif bits2 < 8 or bits2 > 100: + return None + results.append( (start_pulse, start_space, byts, bits, data, last_space) ) + start_pulse = 4500 + bits = data = start_space = last_space = 0 + byts = [] + elif p_in > 665 or p_in < 400: + # not NEC/Samsung + return None + elif start_space == 0: + # not NEC/Samsung + return None + else: # not is_pulse + print( 's', p_in ) + if start_space == 0: + if p_in >= 3400 and p_in <= 5600: + # normal start space + start_space = 4500 + elif p_in >= 1150 and p_in <= 3350: + # repeat code + start_space = 2250 + else: + # not NEC/Samsung + return None + else: + if p_in > 3350: + # gap between transmissions + if start_pulse > 0: + if not start_space: + return None + bits2 = (len(byts) * 8) + bits + #if bits2 < 8 or bits2 > 100: + # return None + results.append( (start_pulse, start_space, byts, bits, data, last_space) ) + start_pulse = 9000 + bits = data = start_space = last_space = 0 + byts = [] + elif p_in >= 400 and p_in <= 665: + # zero + bits += 1 + if bits == 8: + byts.append(data) + bits = data = 0 + elif p_in >= 1400 and p_in <= 1800: + # one + bits += 1 + data |= (1 << (8 - bits)) + if bits == 8: + byts.append(data) + bits = data = 0 + else: + fail_if_again = True + if start_pulse > 0: + if not start_space: + return None + bits2 = (len(byts) * 8) + bits + if bits2 == 0 and start_space == 2250: + # repeat code + pass + elif bits2 < 8 or bits2 > 100: + return None + results.append( (start_pulse, start_space, byts, bits, data, last_space) ) + + if not results: + return None + + count = -1 + for r in results: + count += 1 + if count == 0: + continue + # make sure start pulse is the same + if r[0] != results[0][0]: + return None + + result_string = '' + symbols = { results[0][0]: '%', 4500: '^', 2250: '&', results[0][5]: '*' } + for r in results: + if r[0] not in symbols or r[1] not in symbols: + return None + result_string += symbols[r[0]] + symbols[r[1]] + result_string += IRRemoteControlDevice._build_key_bitfield( r[3], r[4], r[2] ) + result_string += '@*' + + return result_string + + @staticmethod + def _build_key_bitfield( bits, bitdata, byts ): + numbits = bits + (len(byts) * 8) + result_string = '%02X%02X' % (0, numbits) + for b in byts: + result_string += '%02X' % b + if bits: + result_string += '%02X' % bitdata + print('bitfield:', result_string) + return result_string + + @staticmethod + def _build_head_field( typ, freq, symbol_list ): + max_symbol = 0 + for c in symbol_list: + i = IRRemoteControlDevice.KEY1_SYMBOL_LIST.index( c ) + if i >= max_symbol: + max_symbol = i + 1 + + time_base = (100000 / freq) + print(time_base) + + # 02 0ed8 0000 0000 0007 00100014001500380026009a013c + header = '%02x%04x%04x%04x%04x' % (typ, freq, 0, 0, max_symbol) + for i in range( max_symbol ): + k = IRRemoteControlDevice.KEY1_SYMBOL_LIST[i] + print( k, symbol_list[k], time_base, round(symbol_list[k] / time_base)) + header += '%04x' % round(symbol_list[k] / time_base) + + return header + @staticmethod def hex_to_pulses( code_hex ): raw_bytes = bytes.fromhex(code_hex) From 057a1fb989cc4d18f0a0f48a93e81ac7710c60b3 Mon Sep 17 00:00:00 2001 From: uzlonewolf Date: Fri, 2 Dec 2022 04:34:26 -0800 Subject: [PATCH 2/4] Add support for newer IR devices, and several IR format converters --- .../Contrib/IRRemoteControlDevice-example.py | 108 ++- tinytuya/Contrib/IRRemoteControlDevice.py | 793 +++++++++++------- tinytuya/core.py | 18 +- 3 files changed, 620 insertions(+), 299 deletions(-) diff --git a/examples/Contrib/IRRemoteControlDevice-example.py b/examples/Contrib/IRRemoteControlDevice-example.py index 2254f9cf..9185b0f0 100644 --- a/examples/Contrib/IRRemoteControlDevice-example.py +++ b/examples/Contrib/IRRemoteControlDevice-example.py @@ -5,6 +5,7 @@ Example script using the community-contributed Python module for Tuya WiFi smart universal remote control simulators Author: Alexey 'Cluster' Avdyukhin (https://github.com/clusterm) + Rewritten by: uzlonewolf (https://github.com/uzlonewolf) For more information see https://github.com/jasonacox/tinytuya """ @@ -13,35 +14,112 @@ from tinytuya import Contrib from time import sleep -# tinytuya.set_debug(toggle=True, color=True) +#tinytuya.set_debug(toggle=True, color=True) -# discrete on/off codes for Samsung + + + +# parsing and converting between data formats + + +# discrete on/off codes for Samsung in Pronto format pronto_samsung_on = '0000 006D 0000 0022 00AC 00AC 0015 0040 0015 0040 0015 0040 0015 0015 0015 0015 0015 0015 0015 0015 0015 0015 0015 0040 0015 0040 0015 0040 0015 0015 0015 0015 0015 0015 0015 0015 0015 0015 0015 0040 0015 0015 0015 0015 0015 0040 0015 0040 0015 0015 0015 0015 0015 0040 0015 0015 0015 0040 0015 0040 0015 0015 0015 0015 0015 0040 0015 0040 0015 0015 0015 0689' pronto_samsung_off = '0000 006D 0000 0022 00AC 00AC 0015 0040 0015 0040 0015 0040 0015 0015 0015 0015 0015 0015 0015 0015 0015 0015 0015 0040 0015 0040 0015 0040 0015 0015 0015 0015 0015 0015 0015 0015 0015 0015 0015 0015 0015 0015 0015 0015 0015 0040 0015 0040 0015 0015 0015 0015 0015 0040 0015 0040 0015 0040 0015 0040 0015 0015 0015 0015 0015 0040 0015 0040 0015 0015 0015 0689' + +# convert the Pronto format into pulses pulses_samsung_on = Contrib.IRRemoteControlDevice.pronto_to_pulses( pronto_samsung_on ) pulses_samsung_off = Contrib.IRRemoteControlDevice.pronto_to_pulses( pronto_samsung_off ) -print( 'Samsung on code:', Contrib.IRRemoteControlDevice.pulses_to_samsung( pulses_samsung_on )[0] ) + +# decode the pulses as Samsung format (similar to NEC but with a half-width start burst) +# there may be more than one code in the data stream, so this returns a list of codes +samsung_on_code = Contrib.IRRemoteControlDevice.pulses_to_samsung( pulses_samsung_on ) +samsung_off_code = Contrib.IRRemoteControlDevice.pulses_to_samsung( pulses_samsung_off ) + +# print only the first code +print( 'Samsung on code:', samsung_on_code[0] ) # Samsung on code: {'type': 'samsung', 'uint32': 3772815718, 'address': 7, 'data': 153, 'hex': 'E0E09966'} -print( 'Samsung off code:', Contrib.IRRemoteControlDevice.pulses_to_samsung( pulses_samsung_off )[0] ) + +print( 'Samsung off code:', samsung_off_code[0] ) # Samsung off code: {'type': 'samsung', 'uint32': 3772783078, 'address': 7, 'data': 152, 'hex': 'E0E019E6'} + + + + + # discrete on/off codes for LG hex_lg_on = 0x20DF23DC hex_lg_off = 0x20DFA35C + +# convert the 32-bit integers into a stream of pulses pulses_lg_on = Contrib.IRRemoteControlDevice.nec_to_pulses( hex_lg_on ) pulses_lg_off = Contrib.IRRemoteControlDevice.nec_to_pulses( hex_lg_off ) -print( 'LG on code:', Contrib.IRRemoteControlDevice.pulses_to_nec( pulses_lg_on )[0] ) + +# decode the pulses to verify and print them like the above Samsung +lg_on_code = Contrib.IRRemoteControlDevice.pulses_to_nec( pulses_lg_on ) +print( 'LG on code:', lg_on_code[0] ) # LG on code: {'type': 'nec', 'uint32': 551494620, 'address': 4, 'data': 196, 'hex': '20DF23DC'} -print( 'LG off code:', Contrib.IRRemoteControlDevice.pulses_to_nec( pulses_lg_off )[0] ) + +lg_off_code = Contrib.IRRemoteControlDevice.pulses_to_nec( pulses_lg_off ) +print( 'LG off code:', lg_off_code[0] ) # LG off code: {'type': 'nec', 'uint32': 551527260, 'address': 4, 'data': 197, 'hex': '20DFA35C'} -ir = Contrib.IRRemoteControlDevice( 'abcdefghijklmnop123456', '172.28.321.475', '1234567890123abc' ) -# turn the Samsung tv on + + + +# both Pronto codes and pulses can also be turned into head/key format +# Pronto will have the correct frequency in the data +headkey = Contrib.IRRemoteControlDevice.pronto_to_head_key( pronto_samsung_on ) +if headkey: + head, key = headkey +# but the pulses frequency needs to be specified manually if it is not 38 kHz +headkey = Contrib.IRRemoteControlDevice.pulses_to_head_key( pulses_samsung_on, freq=38 ) +if headkey: + head, key = headkey + + + + +# learned codes can also be converted +pulses = Contrib.IRRemoteControlDevice.base64_to_pulses('IyOvEToCZQI5AkoCOgJNAjYCTwI4AlACNQJMAjkCTQI2ApsGSwKZBkkClwZMAp8GLALLBhgC0wYRAtMGEwLRBhMCbgIdAmkCGwLKBhsCagIaAsoGGgJzAhACbwIWAnICFAJvAh0CxgYdAmoCFwLMBhoCcAIUAtAGFALRBhQC0QYUAtAGFQKXnBgjCAkXAiDL') +# default frequency is 38 kHz +headkey = Contrib.IRRemoteControlDevice.pulses_to_head_key( pulses ) +if headkey: + head, key = headkey + + + + +# now onto talking to the device! + + +# create the device. this will connect to it to try and determine which DPS it uses +#ir = Contrib.IRRemoteControlDevice( 'abcdefghijklmnop123456', '172.28.321.475', '1234567890123abc', persist=True ) +ir = Contrib.IRRemoteControlDevice("8357021598cdac0aff74", address='172.20.10.150', local_key='3ba3a31d71dd9ee8', version=3.3, persist=True) + + +print( 'Turning the Samsung tv on with pulses' ) ir.send_button( ir.pulses_to_base64( pulses_samsung_on ) ) -# turn the LG tv on +sleep(0.5) +print( 'Turning the LG tv on with pulses' ) ir.send_button( ir.pulses_to_base64( pulses_lg_on ) ) +sleep(0.5) + + +print( 'Turning the Samsung tv off with head/key' ) +head, key = Contrib.IRRemoteControlDevice.pronto_to_head_key( pronto_samsung_off ) +ir.send_key( head, key ) +sleep(0.5) +print( 'Turning the LG tv off with head/key' ) +head, key = Contrib.IRRemoteControlDevice.pulses_to_head_key( pulses_lg_off ) +ir.send_key( head, key ) +sleep(0.5) + + + +# learn a new remote print("Press button on your remote control") button = ir.receive_button(timeout=15) if (button == None): @@ -50,10 +128,16 @@ print("Received button:", button) pulses = ir.base64_to_pulses(button) -print("Pulses and gaps (microseconds): " + - ' '.join([f'{"p" if i % 2 == 0 else "g"}{pulses[i]}' for i in range(len(pulses))])) +print( Contrib.IRRemoteControlDevice.print_pulses( pulses ) ) +headkey = Contrib.IRRemoteControlDevice.pulses_to_head_key( pulses ) +if headkey: + head, key = headkey + print( 'Head:', head ) + print( 'Key:', key ) for i in range(10): print("Simulating button press...") - ir.send_button(button) + # either works + #ir.send_button(button) + ir.send_key( head, key ) sleep(1) diff --git a/tinytuya/Contrib/IRRemoteControlDevice.py b/tinytuya/Contrib/IRRemoteControlDevice.py index 30d10003..5dab6354 100644 --- a/tinytuya/Contrib/IRRemoteControlDevice.py +++ b/tinytuya/Contrib/IRRemoteControlDevice.py @@ -6,6 +6,7 @@ This module attempts to provide everything needed so there is no need to import the base tinytuya module Module Author: Alexey 'Cluster' Avdyukhin (https://github.com/clusterm) + Rewritten by uzlonewolf (https://github.com/uzlonewolf) for new devices and IR format conversion Local Control Classes IRRemoteControlDevice(..., version=3.3) @@ -13,14 +14,42 @@ See OutletDevice() for the other constructor arguments Functions: - ir = IRRemoteControlDevice(...) + ir = IRRemoteControlDevice(..., control_type=None) + -> will immediately connect to the device to try and detect the control type if control_type is not provided + + ir.detect_control_type() + -> polls device status to try and detect the control type + + ir.send_command( mode, data={} ) + -> sends a command to the device + when mode is 'send', data is parsed for the data to send + data = { "base64_code": "..." } or + data = { "head": "...", "key": "..." } + all other commands are sent though as-is without data + + ir.study_start() + ir.study_end() + -> start or end a study session ir.receive_button( timeout ) -> call this method and press button on real remote control to read its code in Base64 format timeout - maximum time to wait for button press ir.send_button( base64_code ) - -> simulate button press + -> simulate a learned (raw base64-encoded) button press + + ir.send_key( head, key ) + -> send a head/key pair + + ir.build_head( freq=38, bit_time=0, zero_time=0, one_time=0, bit_time_type=1, timings=[], convert_time=True ) + -> build a 'head' section + 'freq' is in kHz + if bit_time, zero_time, or one_time evaluate to False, timings are taken from timings[] as needed + if convert_time is True, timings are in microseconds and converted as needed. + when False, timings are sent as-is + + IRRemoteControlDevice.print_pulses ( pulses ) + -> pretty-print a sequence of pulses and gaps length IRRemoteControlDevice.base64_to_pulses ( code_base_64 ) -> convert Base64-encoded button code to sequence of pulses and gaps length @@ -30,11 +59,12 @@ IRRemoteControlDevice.head_key_to_pulses ( head='...', key='...' ) -> convert head/key pair to sequence of pulses and gaps - 'head' can be omitted if key starts with '1' - 'key' must begin with '000' through '1FF' + 'head' can be None when the key is raw bytes in base64 + 'key' must begin with '00' through 'FF' when it is not raw bytes in base64 - IRRemoteControlDevice.pulses_to_head_key ( pulses ) - -> attempt to pack a sequence of pulses and gaps into a head/key pair + IRRemoteControlDevice.pulses_to_head_key ( pulses, fudge=0.1, freq=38 ) + -> attempts to pack a sequence of pulses and gaps into a head/key pair + pulses/gaps within 10% (fudge=0.1) are assumed to be the same and are merged together IRRemoteControlDevice.hex_to_pulses ( code_hex ) -> convert HEX-encoded button code to sequence of pulses and gaps length @@ -82,13 +112,28 @@ import json import logging import struct +import time from math import ceil, floor from ..core import Device, log, CONTROL class IRRemoteControlDevice(Device): - DP_SEND_IR = "201" # ir_send, send and report (read-write) - DP_LEARNED_ID = "202" # ir_study_code, report only (read-only) + CMD_SEND_KEY_CODE = "send_ir" # Command to start sending a key + DP_SEND_IR = "201" # ir_send, send and report (read-write) + DP_LEARNED_ID = "202" # ir_study_code, report only (read-only) + DP_MODE = "1" + DP_LEARNED_REPORT = "2" + DP_HEAD = "3" + DP_KEY_CODE = "4" + DP_KEY_CODE2 = "5" + DP_KEY_CODE3 = "6" + DP_KEY_CODE4 = "11" + DP_KEY_STUDY = "7" + DP_KEY_STUDY2 = "8" + DP_KEY_STUDY3 = "9" + DP_KEY_STUDY4 = "12" + DP_SEND_DELAY = "10" + DP_CODE_TYPE = "13" NSDP_CONTROL = "control" # The control commands NSDP_STUDY_CODE = "study_code" # Report learned IR codes NSDP_IR_CODE = "ir_code" # IR signal decoding2 @@ -105,105 +150,199 @@ class IRRemoteControlDevice(Device): NSDP_DELAY = "delay" # Actually used but not documented NSDP_HEAD = "head" # Actually used but not documented NSDP_KEY1 = "key1" # Actually used but not documented - KEY1_SYMBOL_LIST = "@#$%^&*()" # Timing symbols used in key1 + KEY1_SYMBOL_LIST = "@#$%^&*()QWRLTXKVNM{}[]JUP<>|=HS~" # Timing symbols used in key1 def __init__(self, *args, **kwargs): # set the default version to 3.3 as there are no 3.1 devices if 'version' not in kwargs or not kwargs['version']: kwargs['version'] = 3.3 + + control_type = 0 + if 'control_type' in kwargs: + control_type = kwargs['control_type'] + del kwargs['control_type'] + super(IRRemoteControlDevice, self).__init__(*args, **kwargs) - def receive_button( self, timeout ): + self.control_type = control_type + if not self.control_type: + self.detect_control_type() + + def detect_control_type( self ): + status = self.status() + if status and 'dps' in status: + # original devices using DPS 201/202 + if self.DP_SEND_IR in status['dps']: + self.control_type = 1 + # newer devices using DPS 1-13 + elif self.DP_MODE in status['dps']: + self.control_type = 2 + + def send_command( self, mode, data={} ): + if mode == 'send': + if self.control_type == 1: + command = { + IRRemoteControlDevice.NSDP_CONTROL: "send_ir", + IRRemoteControlDevice.NSDP_TYPE: 0, + } + + if 'base64_code' in data: + command[IRRemoteControlDevice.NSDP_HEAD] = '' + command[IRRemoteControlDevice.NSDP_KEY1] = '1' + data['base64_code'] + elif 'head' in data and 'key' in data: + command[IRRemoteControlDevice.NSDP_HEAD] = data['head'] + command[IRRemoteControlDevice.NSDP_KEY1] = '0' + data['key'] + self.set_value( IRRemoteControlDevice.DP_SEND_IR, json.dumps(command), nowait=True ) + elif self.control_type == 2: + mode = 'study_key' if 'base64_code' in data else 'send_ir' + command = { + IRRemoteControlDevice.DP_MODE: mode, + IRRemoteControlDevice.DP_CODE_TYPE: 0, + } + if 'base64_code' in data: + command[IRRemoteControlDevice.DP_KEY_STUDY] = data['base64_code'] + elif 'head' in data and 'key' in data: + command[IRRemoteControlDevice.DP_HEAD] = data['head'] + command[IRRemoteControlDevice.DP_KEY_CODE] = data['key'] + self.set_multiple_values( command, nowait=True ) + elif self.control_type == 1: + command = { IRRemoteControlDevice.NSDP_CONTROL: mode } + self.set_value( IRRemoteControlDevice.DP_SEND_IR, json.dumps(command), nowait=True ) + elif self.control_type == 2: + self.set_value( IRRemoteControlDevice.DP_MODE, mode, nowait=True ) + + def study_start( self ): + self.send_command( 'study' ) + + def study_end( self ): + self.send_command( 'study_exit' ) + + def receive_button( self, timeout=30 ): log.debug("Receiving button") # Exit study mode in case it's enabled - command = { - IRRemoteControlDevice.NSDP_CONTROL: "study_exit", - } - payload = self.generate_payload(CONTROL, {IRRemoteControlDevice.DP_SEND_IR: json.dumps(command)}) - self.send(payload) - # Enable study mode - command = { - IRRemoteControlDevice.NSDP_CONTROL: "study", - } - payload = self.generate_payload(CONTROL, {IRRemoteControlDevice.DP_SEND_IR: json.dumps(command)}) - self.send(payload) + self.study_end() + # Enable study mode + self.study_start() # Receiving button code - button = None + response = None + response_code = None + found = False # Remember old timeout and set new timeout old_timeout = self.connection_timeout - self.set_socketTimeout(timeout) + end_at_time = time.time() + timeout try: - log.debug("Waiting for button...") - button = self._send_receive(None) - if button == None: - # Nothing received - log.debug("Timeout") - base64_code = None - elif type(button) != dict or "dps" not in button or IRRemoteControlDevice.DP_LEARNED_ID not in button["dps"]: - # Some unexpected result - log.debug(f"Unexpected response: {button}") - base64_code = button # Some error message? Pass it. - else: - # Button code received, extracting it as Base64 string - base64_code = button["dps"][IRRemoteControlDevice.DP_LEARNED_ID] - # Some debug info - if log.getEffectiveLevel() <= logging.DEBUG: - pulses = self.base64_to_pulses(base64_code) - log.debug("Pulses and gaps (microseconds): " + - ' '.join([f'{"p" if i % 2 == 0 else "g"}{pulses[i]}' for i in range(len(pulses))])) + while end_at_time > time.time(): + timeo = round(time.time() - end_at_time) + if timeo < 1: timeo = 1 + self.set_socketTimeout(timeo) + + log.debug("Waiting for button...") + response = self._send_receive(None) + if response == None: + # Nothing received + log.debug("Timeout") + elif type(response) != dict or "dps" not in response: + # Some unexpected result + log.debug("Unexpected response: %r", response) + response_code = response # Some error message? Pass it. + break + elif self.DP_LEARNED_ID in response["dps"]: + # Button code received, extracting it as Base64 string + response_code = response["dps"][self.DP_LEARNED_ID] + found = True + break + elif self.DP_LEARNED_REPORT in response["dps"]: + response_code = response["dps"][self.DP_LEARNED_REPORT] + found = True + break + else: + # Unknown DPS + log.debug("Unknown DPS in response: %r", response) + response_code = response # Pass it if we do not get a response we like + # try again finally: # Revert timeout self.set_socketTimeout(old_timeout) + if found: + self.print_pulses( response_code ) + # Exit study mode - command = { - IRRemoteControlDevice.NSDP_CONTROL: "study_exit", - } - payload = self.generate_payload(CONTROL, {IRRemoteControlDevice.DP_SEND_IR: json.dumps(command)}) - self.send(payload) + self.study_end() - return base64_code + return response_code def send_button( self, base64_code ): - if len(base64_code) % 4 == 0: base64_code = '1' + base64_code; # code need to be padded with "1" (wtf?) - log.debug("Sending IR Button: " + base64_code) - # Some debug info + log.debug( 'Sending Learned Button: ' + base64_code) + self.print_pulses( base64_code ) + return self.send_command( 'send', {'base64_code': base64_code} ) + + def send_key( self, head, key ): + log.debug( 'Sending Key: %r / %r', head, key ) + return self.send_command( 'send', { 'head': head, 'key': key } ) + + @staticmethod + def build_head( freq=38, bit_time=0, zero_time=0, one_time=0, bit_time_type=1, timings=[], convert_time=True ): + timings = list(timings) + freq = round( freq * 100) + if not bit_time and len(timings) > 0: + bit_time = timings[0] + timings = timings[1:] + if not zero_time and len(timings) > 0: + zero_time = timings[0] + timings = timings[1:] + if not one_time and len(timings) > 0: + one_time = timings[0] + timings = timings[1:] + + if convert_time: + time_base = 100000.0 / freq + bit_time = round( bit_time / time_base ) + zero_time = round( zero_time / time_base ) + one_time = round( one_time / time_base ) + for i in range(len(timings)): + timings[i] = round( timings[i] / time_base ) + + head = '%02X%04X0000000000' % (bit_time_type, freq) + head += '%02X%04X%04X%04X' % (len(timings) + 3, bit_time, zero_time, one_time) + for i in timings: + head += '%04X' % i + + return head + + @staticmethod + def print_pulses( base64_code, use_log=None ): + if not use_log: use_log = log + if type(base64_code) == list: + pulses = base64_code + else: + pulses = IRRemoteControlDevice.base64_to_pulses(base64_code) + message = "Pulses and gaps (microseconds): " + ' '.join([f'{"p" if i % 2 == 0 else "g"}{pulses[i]}' for i in range(len(pulses))]) if log.getEffectiveLevel() <= logging.DEBUG: - pulses = self.base64_to_pulses(base64_code) - log.debug("Pulses and gaps (microseconds): " + - ' '.join([f'{"p" if i % 2 == 0 else "g"}{pulses[i]}' for i in range(len(pulses))])) - command = { - IRRemoteControlDevice.NSDP_CONTROL: "send_ir", - IRRemoteControlDevice.NSDP_KEY1: base64_code, - IRRemoteControlDevice.NSDP_TYPE: 0, - } - payload = self.generate_payload(CONTROL, {IRRemoteControlDevice.DP_SEND_IR: json.dumps(command)}) - return self.send(payload) + log.debug( message ) + return message @staticmethod def base64_to_pulses( code_base_64 ): if len(code_base_64) % 4 == 1 and code_base_64.startswith("1"): - # code can be padded with "1" (wtf?) + # code can be padded with "1" code_base_64 = code_base_64[1:] raw_bytes = base64.b64decode(code_base_64) - return [int.from_bytes(raw_bytes[i:i+2], byteorder="little") for i in range(0, len(raw_bytes), 2)] + fmt = '<%dH' % (len(raw_bytes) >> 1) + return list(struct.unpack(fmt, raw_bytes)) @staticmethod def pulses_to_base64( pulses ): - raw_bytes = [x.to_bytes(2, byteorder="little") for x in pulses] - raw_bytes = [x for xs in raw_bytes for x in xs] # flatten - return base64.b64encode(bytes(raw_bytes)).decode("ascii") + fmt = '<' + str(len(pulses)) + 'H' + return base64.b64encode( struct.pack(fmt, *pulses) ).decode("ascii") @staticmethod def head_key_to_pulses( head, key ): if len(key) < 4: raise ValueError( '"key" must be at least 4 characters' ) - keytype = int(key[0], 16) - if keytype < 0 or keytype > 1: - raise ValueError( 'First digit of "key" must be "0" or "1" (got: %r)' % keytype ) - elif keytype == 1: + if not head: return IRRemoteControlDevice.base64_to_pulses( key ) if len(head) < 18: @@ -213,12 +352,12 @@ def head_key_to_pulses( head, key ): headtype, timescale, unused1, unused2, num_timings = struct.unpack( '>BHHHH', head[:9] ) headlen = num_timings * 2 # times are 16-bit timebase = 100000.0 / timescale - symbols = '@#$%^&*()'[:num_timings] + symbols = IRRemoteControlDevice.KEY1_SYMBOL_LIST[:num_timings] try: - repeat = int( key[1:3], 16 ) + repeat = int( key[:2], 16 ) except: - raise ValueError( 'First digit of "key" must be "0" or "1" and the next 2 digits must be a hexidecimal byte' ) - key = key[3:] + raise ValueError( 'First 2 digit of "key" must be a hexidecimal byte' ) + key = key[2:] # 'head' type 1 uses '@' for 0 and '#' for 1 # 'head' type 2 uses '#' for 0 and '$' for 1 @@ -329,82 +468,104 @@ def head_key_to_pulses( head, key ): @staticmethod - def pulses_to_head_key( pulses, fudge=0.1 ): - # see if it will decode with NEC/Samsung - res = None #IRRemoteControlDevice.pulses_to_space_encoded_head_key( pulses ) - print( 'res:', res ) - #return - #if not res: - # # next try SIRC - # res = IRRemoteControlDevice.pulses_to_pulse_encoded_head_key( pulses ) + def pulses_to_head_key( pulses, fudge=0.1, freq=38 ): + mylog = log.getChild( 'pulses_to_head_key' ) + + if len(pulses) < 2: + return None + + if len(pulses) % 2 == 1: + pulses = list(pulses) + pulses.append(pulses[0]) ps_count = { } - for p_in in pulses: - if p_in not in ps_count: - ps_count[p_in] = 1 + for current_ps_time in pulses: + if current_ps_time not in ps_count: + ps_count[current_ps_time] = 1 else: - ps_count[p_in] += 1 - + ps_count[current_ps_time] += 1 ps_map = IRRemoteControlDevice._merge_similar_pulse_times( ps_count, fudge ) - p_count = { } - s_count = { } - is_pulse = False - for p_in in pulses: - is_pulse = not is_pulse - - if p_in in ps_map: - p_in = ps_map[p_in] - - if is_pulse: - if p_in not in p_count: - p_count[p_in] = 1 - else: - p_count[p_in] += 1 - else: - if p_in not in s_count: - s_count[p_in] = 1 + # should we process the pulses and spaces separately? + # combining them seems to give good results + if False: + p_count = { } + s_count = { } + is_pulse = False + for current_ps_time in pulses: + is_pulse = not is_pulse + + if current_ps_time in ps_map: + current_ps_time = ps_map[current_ps_time] + + if is_pulse: + if current_ps_time not in p_count: + p_count[current_ps_time] = 1 + else: + p_count[current_ps_time] += 1 else: - s_count[p_in] += 1 - - print( 'p_count:', p_count, 's_count:', s_count ) - #p_map = IRRemoteControlDevice._merge_similar_pulse_times( p_count, fudge ) - #s_map = IRRemoteControlDevice._merge_similar_pulse_times( s_count, fudge ) - p_map = s_map = ps_map + if current_ps_time not in s_count: + s_count[current_ps_time] = 1 + else: + s_count[current_ps_time] += 1 + mylog.debug( 'p_count: %r, s_count: %r', p_count, s_count ) + p_map = IRRemoteControlDevice._merge_similar_pulse_times( p_count, fudge ) + s_map = IRRemoteControlDevice._merge_similar_pulse_times( s_count, fudge ) + mylog.debug('merged pulse map: %r', p_map) + mylog.debug('merged space map: %r', s_map) + else: + p_map = s_map = ps_map + mylog.debug('merged pulse+space map: %r', ps_map) + + # convert the list of pulse and space lengths into a string to + # make it easier to group and count unique sequences. + # the first unique pulse will get the symbol 'A' while + # the first space becomes 'a'. The next is 'B' and 'b'. + # all pulses/spaces of the same length get the same letter + # I.e. [ 4523 4523 552 1683 552 1683 552 552 552 552 ] becomes + # A a B b B b B c B c -> AaBbBbBcBc + # we can then substring count 'Bb' and 'Bc' symbol_pattern = '' symbol_list = { } p_key_map = { } s_key_map = { } is_pulse = False - for p_in in pulses: + for current_ps_time in pulses: is_pulse = not is_pulse if is_pulse: - k = p_map[p_in] if p_in in p_map else p_in + #if this length was combined, use the new (averaged) value + k = p_map[current_ps_time] if current_ps_time in p_map else current_ps_time + + # if this length has not been seen yet, assign it a letter if k not in p_key_map: - mk = chr(len(p_key_map) + 0x41) # A-z - #print('adding pulse', k, mk) - p_key_map[k] = { 'count': 1, 'char': mk } - if mk not in symbol_list: - symbol_list[mk] = [k, False] + next_letter = chr(len(p_key_map) + 0x41) # A-Z + #mylog.debug('adding pulse %r %r', k, next_letter) + p_key_map[k] = { 'count': 1, 'char': next_letter } + if next_letter not in symbol_list: + symbol_list[next_letter] = [k, False] else: p_key_map[k]['count'] += 1 symbol_pattern += p_key_map[k]['char'] else: - k = s_map[p_in] if p_in in s_map else p_in + #if this length was combined, use the new (averaged) value + k = s_map[current_ps_time] if current_ps_time in s_map else current_ps_time + + # if this length has not been seen yet, assign it a letter if k not in s_key_map: - mk = chr(len(s_key_map) + 0x61) # a-z - #print('adding SPACE', k, mk) - s_key_map[k] = { 'count': 1, 'char': mk } - if mk not in symbol_list: - symbol_list[mk] = [k, False] + next_letter = chr(len(s_key_map) + 0x61) # a-z + #mylog.debug('adding SPACE %r %r', k, next_letter) + s_key_map[k] = { 'count': 1, 'char': next_letter } + if next_letter not in symbol_list: + symbol_list[next_letter] = [k, False] else: s_key_map[k]['count'] += 1 symbol_pattern += s_key_map[k]['char'] - print( 'symbol_pattern:', symbol_pattern ) - print( 'symbol_list:', symbol_list ) + mylog.debug( 'symbol pattern: %r', symbol_pattern ) + mylog.debug( 'symbol list: %r', symbol_list ) + # find the most-commonly-ocurring pulse and space lengths pmax = { 'count': 0, 'time': 0 } smax = { 'count': 0, 'time': 0 } for k in p_key_map: @@ -415,35 +576,29 @@ def pulses_to_head_key( pulses, fudge=0.1 ): if s_key_map[k]['count'] > smax['count']: smax['count'] = s_key_map[k]['count'] smax['time'] = k - print( 'smax, pmax:', smax, pmax ) - - #if False and smax['count'] > pmax['count']: - # # probably pulse-width encoded - # k = smax['time'] - # pat = s_key_map[k]['char'] - #else: - # # probably space-width encoded - # k = pmax['time'] - # pat = p_key_map[k]['char'] - #del pmax - #del smax - #print( 'pat:', pat ) + k = smax['time'] - s_pat = s_key_map[k]['char'] + space_letter = s_key_map[k]['char'] k = pmax['time'] - p_pat = p_key_map[k]['char'] - - print( 'p_pat s_pat:', p_pat, s_pat ) - - offset_0_shortest = offset_1_shortest = None - for offset in range( 2 ): - print( 'Trying offset', offset ) - - pat = p_pat if offset == 0 else s_pat + pulse_letter = p_key_map[k]['char'] + + mylog.debug( 'most common space: %r %r', space_letter, smax ) + mylog.debug( 'most common pulse: %r %r', pulse_letter, pmax ) + + encoding_type_shortest = [None, None] + encoding_type_symbol_list = [{}, {}] + bit_time_types = [0, 0] + # calculate the head and key for both space-width and pulse-width encoding + # we will use the shorter of the 2 as the final head/key + for encoding_type in range( 2 ): + mylog.debug( '' ) + current_letter = pulse_letter if encoding_type == 0 else space_letter + encoding_type_name = 'pulse' if not encoding_type else 'space' + mylog.debug( 'Trying encoding_type %r - character %r', encoding_type_name, current_letter ) pat_counts = {} - for i in range( offset, len(symbol_pattern), 2 ): - if symbol_pattern[i] == pat: + for i in range( encoding_type, len(symbol_pattern), 2 ): + if symbol_pattern[i] == current_letter: k = symbol_pattern[i:i+2] if len(k) == 2: if k not in pat_counts: @@ -451,9 +606,9 @@ def pulses_to_head_key( pulses, fudge=0.1 ): else: pat_counts[k] += 1 - print( 'pat_counts:', pat_counts ) + mylog.debug( 'pat_counts: %r', pat_counts ) - # find the most-common pattern pair, and the next-most-common pattern pair + # find the most-common and next-most-common pattern pair pat_max = [0, ''] pat_next_max = [0, ''] for k in pat_counts: @@ -465,69 +620,101 @@ def pulses_to_head_key( pulses, fudge=0.1 ): pat_next_max[0] = pat_counts[k] pat_next_max[1] = k - print( 'pat_max', pat_max, 'pat_next_max', pat_next_max) + mylog.debug( 'pat_max: %r, pat_next_max: %r', pat_max, pat_next_max) - # reset from the previous 'offset' loop + # reset from the previous 'encoding_type' loop for k in symbol_list: symbol_list[k][1] = False try_bitfield = True - new_symbol_pattern = '' + bit_symbol_pattern = '' full_symbol_pattern = symbol_pattern - a = pat_max[1][0] - b = pat_next_max[1][0] - if symbol_list[a][0] == symbol_list[b][0]: - # pulses are the same, it might be space-width encoded - symbol_list[a][1] = symbol_list[b][1] = '@' - a = pat_max[1][1] - b = pat_next_max[1][1] - if symbol_list[a][0] < symbol_list[b][0]: - symbol_list[a][1] = '#' - symbol_list[b][1] = '$' - zero_symbol = pat_max[1] - one_symbol = pat_next_max[1] - else: - symbol_list[a][1] = '$' - symbol_list[b][1] = '#' - zero_symbol = pat_next_max[1] - one_symbol = pat_max[1] - else: - # pulses are not the same - if symbol_list[a][0] < symbol_list[b][0]: - symbol_list[a][1] = '#' - symbol_list[b][1] = '$' - zero_symbol = pat_max[1] - one_symbol = pat_next_max[1] - else: - symbol_list[a][1] = '$' - symbol_list[b][1] = '#' - zero_symbol = pat_next_max[1] - one_symbol = pat_max[1] - - a = pat_max[1][1] - b = pat_next_max[1][1] + if pat_max[0] and not pat_next_max[0]: + a = pat_max[1][0] + symbol_list[a][1] = '@' + a2 = pat_max[1][1] + symbol_list[a2][1] = '#' + zero_symbol = pat_max[1] + one_symbol = 'DEADBEEF' + # assign timing symbols to the most-common and next-most-common lengths + elif pat_max[0] and pat_next_max[0]: + a = pat_max[1][0] + b = pat_next_max[1][0] if symbol_list[a][0] == symbol_list[b][0]: - # but all spaces are the same, probably pulse-width encoded + # pulses are the same, it might be space-width encoded symbol_list[a][1] = symbol_list[b][1] = '@' - new_symbol_pattern = symbol_pattern[0] - full_symbol_pattern = symbol_pattern[1:] + a2 = pat_max[1][1] + b2 = pat_next_max[1][1] + if symbol_list[a2][0] < symbol_list[b2][0]: + symbol_list[a2][1] = '#' + symbol_list[b2][1] = '$' + zero_symbol = pat_max[1] + one_symbol = pat_next_max[1] + else: + symbol_list[a2][1] = '$' + symbol_list[b2][1] = '#' + zero_symbol = pat_next_max[1] + one_symbol = pat_max[1] else: - symbol_list[a][1] = '@' - try_bitfield = False + # pulses are not the same + if symbol_list[a][0] < symbol_list[b][0]: + symbol_list[a][1] = '#' + symbol_list[b][1] = '$' + zero_symbol = pat_max[1] + one_symbol = pat_next_max[1] + else: + symbol_list[a][1] = '$' + symbol_list[b][1] = '#' + zero_symbol = pat_next_max[1] + one_symbol = pat_max[1] + + a = pat_max[1][1] + b = pat_next_max[1][1] + if symbol_list[a][0] == symbol_list[b][0]: + # but all spaces are the same, probably pulse-width encoded + symbol_list[a][1] = symbol_list[b][1] = '@' + bit_symbol_pattern = symbol_pattern[0] + full_symbol_pattern = symbol_pattern[1:] + else: + symbol_list[a][1] = '@' + try_bitfield = False + + mylog.debug('initial symbol list: %r', symbol_list) + # if the common length and the zero length are the same, combine them as 'head type 1' + # first find the symbols for '@' and '#' + bit_start_symbol = bit_zero_symbol = None + for k in symbol_list: + if symbol_list[k][1] == '@': + bit_start_symbol = k + elif symbol_list[k][1] == '#': + bit_zero_symbol = k + + # they are the same, combine them into 'head type 1' + if bit_start_symbol and bit_zero_symbol and symbol_list[bit_start_symbol][0] == symbol_list[bit_zero_symbol][0]: + bit_time_types[encoding_type] = 1 + symbols_available = list(IRRemoteControlDevice.KEY1_SYMBOL_LIST[2:]) + symbol_list[bit_zero_symbol][1] = '@' + for k in symbol_list: + if symbol_list[k][1] == '$': + symbol_list[k][1] = '#' + # they are different, use 'head type 2' + else: + bit_time_types[encoding_type] = 2 + symbols_available = list(IRRemoteControlDevice.KEY1_SYMBOL_LIST[2:]) + + # start assigning times to symbols + # the common/0/1 symbols were already set above time_symbols = { } for k in symbol_list: if symbol_list[k][1]: c = symbol_list[k][0] time_symbols[c] = symbol_list[k][1] - symbol_set = '%^&*()' - symbols_available = [] + # assign symbols to the remaining pulse/space times need_abort = False - for c in symbol_set: - symbols_available.append(c) - print('sls1:', symbol_list ) + mylog.debug('symbol list before assignment: %r', symbol_list ) for k in symbol_list: if not symbol_list[k][1]: t = symbol_list[k][0] @@ -536,38 +723,49 @@ def pulses_to_head_key( pulses, fudge=0.1 ): continue if not symbols_available: #raise ValueError( 'Cannot convert pulses to head/key, too many unique pulse/space values' ) - print( 'Cannot convert pulses to head/key, too many unique pulse/space values' ) + mylog.debug( 'Cannot convert pulses to head/key, too many unique pulse/space values!' ) #return None need_abort = True break s = symbols_available.pop( 0 ) symbol_list[k][1] = s time_symbols[t] = s - print('sls2:', symbol_list ) + mylog.debug('symbol list after assignment: %r', symbol_list ) + mylog.debug('unique time symbols: %r', time_symbols) if need_abort: + mylog.debug('!! need_abort !!') continue - print( 'zero:', zero_symbol, 'one:', one_symbol ) + mylog.debug( 'zero sequence: %r, one sequence: %r', zero_symbol, one_symbol ) raw_symbol_pattern = '' for c in symbol_pattern: raw_symbol_pattern += symbol_list[c][1] + mylog.debug( 'raw symbol pattern: %r', raw_symbol_pattern ) + # see if we can condense bitfields into len+data if try_bitfield: - if offset: + if encoding_type: c = full_symbol_pattern[0] - new_symbol_pattern += symbol_list[c][1] + bit_symbol_pattern += symbol_list[c][1] bits = data = 0 byts = [] - for i in range( offset, len(full_symbol_pattern), 2 ): + removed = '' + # the len+2 is to make sure we catch any trailing bits + for i in range( encoding_type, len(full_symbol_pattern)+2, 2 ): k = full_symbol_pattern[i:i+2] + k_symbol_pattern = '' + for c in k: + k_symbol_pattern += symbol_list[c][1] if k == zero_symbol: + removed += k_symbol_pattern bits += 1 if bits == 8: byts.append( data ) bits = data = 0 elif k == one_symbol: + removed += k_symbol_pattern bits += 1 data |= 1 << (8 - bits) if bits == 8: @@ -575,56 +773,72 @@ def pulses_to_head_key( pulses, fudge=0.1 ): bits = data = 0 else: if bits or byts: - new_symbol_pattern += IRRemoteControlDevice._build_key_bitfield( bits, data, byts ) + new_bitfield = IRRemoteControlDevice._build_key_bitfield( bits, data, byts ) + # if the new bitfield is longer than the original timing symbols, don't use it + if len(new_bitfield) < len(removed): + bit_symbol_pattern += new_bitfield + else: + bit_symbol_pattern += removed bits = data = 0 byts = [] - - for c in k: - new_symbol_pattern += symbol_list[c][1] + removed = '' + bit_symbol_pattern += k_symbol_pattern + mylog.debug( 'bit symbol pattern: %r', bit_symbol_pattern ) + + # this should not be needed due to the check in the loop above, but make sure anyway + if len(bit_symbol_pattern) > len(raw_symbol_pattern): + mylog.debug( 'Bitfield pattern is longer than pulse/space symbol lists, using shorter symbol list' ) + bit_symbol_pattern = raw_symbol_pattern else: - print( 'Not attempting bitfield' ) - new_symbol_pattern = raw_symbol_pattern - - if len(new_symbol_pattern) > len(raw_symbol_pattern): - print( 'Bitfield pattern is longer than pulse/space symbol lists, using shorter symbol list' ) - new_symbol_pattern = raw_symbol_pattern + mylog.debug( 'Not attempting bitfield' ) + bit_symbol_pattern = raw_symbol_pattern - if offset: - offset_1_shortest = new_symbol_pattern - - offset_1_symbol_list = { } - for k in symbol_list: - j = symbol_list[k][1] - offset_1_symbol_list[j] = symbol_list[k][0] - else: - offset_0_shortest = new_symbol_pattern - offset_0_symbol_list = { } - for k in symbol_list: - j = symbol_list[k][1] - offset_0_symbol_list[j] = symbol_list[k][0] - - if not offset_0_shortest: - new_symbol_pattern = offset_1_shortest - new_symbol_list = offset_1_symbol_list - elif not offset_1_shortest: - new_symbol_pattern = offset_0_shortest - new_symbol_list = offset_0_symbol_list - elif len(offset_0_shortest) < len(offset_1_shortest): - new_symbol_pattern = offset_0_shortest - new_symbol_list = offset_0_symbol_list + # save the results to see which one is better + encoding_type_shortest[encoding_type] = bit_symbol_pattern + encoding_type_symbol_list[encoding_type] = { } + for k in symbol_list: + j = symbol_list[k][1] + encoding_type_symbol_list[encoding_type][j] = symbol_list[k][0] + + mylog.debug( '' ) + + if not encoding_type_shortest[0]: + key1 = encoding_type_shortest[1] + new_symbol_list = encoding_type_symbol_list[1] + bit_time_type = bit_time_types[1] + elif not encoding_type_shortest[1]: + key1 = encoding_type_shortest[0] + new_symbol_list = encoding_type_symbol_list[0] + bit_time_type = bit_time_types[0] + elif len(encoding_type_shortest[0]) <= len(encoding_type_shortest[1]): + key1 = encoding_type_shortest[0] + new_symbol_list = encoding_type_symbol_list[0] + bit_time_type = bit_time_types[0] else: - new_symbol_pattern = offset_1_shortest - new_symbol_list = offset_1_symbol_list + key1 = encoding_type_shortest[1] + new_symbol_list = encoding_type_symbol_list[1] + bit_time_type = bit_time_types[1] + #mylog.debug(new_symbol_list) + + # copy over the symbol times, making sure they're in the correct order + time_symbols = [] + for c in IRRemoteControlDevice.KEY1_SYMBOL_LIST: + if c in new_symbol_list: + time_symbols.append( new_symbol_list[c] ) + elif len(time_symbols) < 3: + time_symbols.append( 100 ) + else: + break - header = IRRemoteControlDevice._build_head_field( 2, 3800, new_symbol_list ) + header = IRRemoteControlDevice.build_head( freq=freq, bit_time_type=bit_time_type, timings=time_symbols ) - print( 'pattern options:', offset_0_shortest, offset_1_shortest ) - print( 'symbol list options:', offset_0_symbol_list, offset_1_symbol_list ) - print( 'new pattern:', header, new_symbol_pattern ) - print( p_key_map ) - print( s_key_map ) - #print( symbol_pattern ) - return header, '001' + new_symbol_pattern + mylog.debug( 'Space-Width Encoded: %r Timings: %r', encoding_type_shortest[0], encoding_type_symbol_list[0] ) + mylog.debug( 'Pulse-Width Encoded: %r Timings: %r', encoding_type_shortest[1], encoding_type_symbol_list[1] ) + mylog.debug( 'new pattern: %r / %r', header, key1 ) + mylog.debug( p_key_map ) + mylog.debug( s_key_map ) + #mylog.debug( symbol_pattern ) + return header, '01' + key1 @staticmethod def _merge_similar_pulse_times( p_count, fudge ): @@ -633,17 +847,19 @@ def _merge_similar_pulse_times( p_count, fudge ): while mod: mod = False merge = None - for p_in in p_count: - pfudge = p_in * fudge - pmin = p_in - pfudge - pmax = p_in + pfudge + for current_ps_time in p_count: + pfudge = current_ps_time * fudge + pmin = current_ps_time - pfudge + pmax = current_ps_time + pfudge for p_check in p_count: - if p_in == p_check: + if current_ps_time == p_check: continue if p_check >= pmin and p_check <= pmax: - merge = (p_in, p_check) - print( 'merging', merge ) + merge = (current_ps_time, p_check) + #print( 'merging', merge ) break + #else: + # print('not merging, pmin < p_check < pmax', pmin, p_check, pmax) if merge: break if merge: @@ -658,8 +874,10 @@ def _merge_similar_pulse_times( p_count, fudge ): p_count[new_p] = new_count p_map[a] = new_p p_map[b] = new_p + for i in p_map: + if p_map[i] == a or p_map[i] == b: + p_map[i] = new_p - print('final map', p_map) return p_map @staticmethod @@ -674,13 +892,13 @@ def pulses_to_space_encoded_head_key( pulses ): is_pulse = False fail_if_again = False - for p_in in pulses: + for current_ps_time in pulses: if fail_if_again: return None is_pulse = not is_pulse if is_pulse: - print( 'p', p_in ) - if p_in >= 7900 and p_in <= 10100: + print( 'p', current_ps_time ) + if current_ps_time >= 7900 and current_ps_time <= 10100: # NEC protocol start pulse if start_pulse > 0: if not start_space: @@ -695,7 +913,7 @@ def pulses_to_space_encoded_head_key( pulses ): start_pulse = 9000 bits = data = start_space = last_space = 0 byts = [] - elif p_in >= 3400 and p_in <= 5600: + elif current_ps_time >= 3400 and current_ps_time <= 5600: # Samsung procol start pulse if start_pulse > 0: if not start_space: @@ -710,26 +928,26 @@ def pulses_to_space_encoded_head_key( pulses ): start_pulse = 4500 bits = data = start_space = last_space = 0 byts = [] - elif p_in > 665 or p_in < 400: + elif current_ps_time > 665 or current_ps_time < 400: # not NEC/Samsung return None elif start_space == 0: # not NEC/Samsung return None else: # not is_pulse - print( 's', p_in ) + print( 's', current_ps_time ) if start_space == 0: - if p_in >= 3400 and p_in <= 5600: + if current_ps_time >= 3400 and current_ps_time <= 5600: # normal start space start_space = 4500 - elif p_in >= 1150 and p_in <= 3350: + elif current_ps_time >= 1150 and current_ps_time <= 3350: # repeat code start_space = 2250 else: # not NEC/Samsung return None else: - if p_in > 3350: + if current_ps_time > 3350: # gap between transmissions if start_pulse > 0: if not start_space: @@ -741,13 +959,13 @@ def pulses_to_space_encoded_head_key( pulses ): start_pulse = 9000 bits = data = start_space = last_space = 0 byts = [] - elif p_in >= 400 and p_in <= 665: + elif current_ps_time >= 400 and current_ps_time <= 665: # zero bits += 1 if bits == 8: byts.append(data) bits = data = 0 - elif p_in >= 1400 and p_in <= 1800: + elif current_ps_time >= 1400 and current_ps_time <= 1800: # one bits += 1 data |= (1 << (8 - bits)) @@ -798,29 +1016,9 @@ def _build_key_bitfield( bits, bitdata, byts ): result_string += '%02X' % b if bits: result_string += '%02X' % bitdata - print('bitfield:', result_string) + #print('bitfield:', result_string) return result_string - @staticmethod - def _build_head_field( typ, freq, symbol_list ): - max_symbol = 0 - for c in symbol_list: - i = IRRemoteControlDevice.KEY1_SYMBOL_LIST.index( c ) - if i >= max_symbol: - max_symbol = i + 1 - - time_base = (100000 / freq) - print(time_base) - - # 02 0ed8 0000 0000 0007 00100014001500380026009a013c - header = '%02x%04x%04x%04x%04x' % (typ, freq, 0, 0, max_symbol) - for i in range( max_symbol ): - k = IRRemoteControlDevice.KEY1_SYMBOL_LIST[i] - print( k, symbol_list[k], time_base, round(symbol_list[k] / time_base)) - header += '%04x' % round(symbol_list[k] / time_base) - - return header - @staticmethod def hex_to_pulses( code_hex ): raw_bytes = bytes.fromhex(code_hex) @@ -1009,3 +1207,28 @@ def pulses_to_pronto( pulses ): for i in pulses: ret += ' %04X' % round(i/scale) return ret + + @staticmethod + def pronto_to_head_key( pronto ): + ret = [ ] + pronto = [int(x, 16) for x in pronto.split(' ')] + ptype = pronto[0] + timebase = pronto[1] + pair1_len = pronto[2] + pair2_len = pronto[3] + if ptype != 0: + # only raw (learned) codes are handled + return None + + # 4,145,152 is 32,768 * 506 / 4 + freq = round(4145152.0 / timebase / 100) / 10 + + pronto = pronto[4:] + timebase *= 0.241246 + for i in range(0, pair1_len*2, 2): + ret += [round(pronto[i] * timebase), round(pronto[i+1] * timebase)] + pronto = pronto[pair1_len*2:] + for i in range(0, pair2_len*2, 2): + ret += [round(pronto[i] * timebase), round(pronto[i+1] * timebase)] + + return IRRemoteControlDevice.pulses_to_head_key( ret, freq=freq ) diff --git a/tinytuya/core.py b/tinytuya/core.py index f278ef0d..3f448562 100644 --- a/tinytuya/core.py +++ b/tinytuya/core.py @@ -1391,13 +1391,27 @@ def set_value(self, index, value, nowait=False): return data + def set_multiple_values(self, data, nowait=False): + """ + Set multiple indexes at the same time + + Args: + data(dict): array of index/value pairs to set + nowait(bool): True to send without waiting for response. + """ + out = {} + for i in data: + out[str(i)] = data[i] + payload = self.generate_payload(CONTROL, out) + return self._send_receive(payload, getresponse=(not nowait)) + def turn_on(self, switch=1, nowait=False): """Turn the device on""" - self.set_status(True, switch, nowait) + return self.set_status(True, switch, nowait) def turn_off(self, switch=1, nowait=False): """Turn the device off""" - self.set_status(False, switch, nowait) + return self.set_status(False, switch, nowait) def set_timer(self, num_secs, dps_id=0, nowait=False): """ From b97dd53a4d89830dc7f49731720e29516cec3f4d Mon Sep 17 00:00:00 2001 From: uzlonewolf Date: Fri, 2 Dec 2022 04:39:33 -0800 Subject: [PATCH 3/4] IRRemoteControlDevice documentation update --- tinytuya/Contrib/IRRemoteControlDevice.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tinytuya/Contrib/IRRemoteControlDevice.py b/tinytuya/Contrib/IRRemoteControlDevice.py index 5dab6354..29594a01 100644 --- a/tinytuya/Contrib/IRRemoteControlDevice.py +++ b/tinytuya/Contrib/IRRemoteControlDevice.py @@ -16,6 +16,8 @@ Functions: ir = IRRemoteControlDevice(..., control_type=None) -> will immediately connect to the device to try and detect the control type if control_type is not provided + control_type=1 for older devices using DPS 201/202 + control_type=2 for newer devices using DPS 1-13 ir.detect_control_type() -> polls device status to try and detect the control type From 72320c0465d3d03d227e8e40ec48bda2781f1e8c Mon Sep 17 00:00:00 2001 From: uzlonewolf Date: Fri, 2 Dec 2022 11:52:01 -0800 Subject: [PATCH 4/4] Fix control type detection after device is rebooted --- .../Contrib/IRRemoteControlDevice-example.py | 3 +- tinytuya/Contrib/IRRemoteControlDevice.py | 40 +++++++++++++++---- 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/examples/Contrib/IRRemoteControlDevice-example.py b/examples/Contrib/IRRemoteControlDevice-example.py index 9185b0f0..d71ed8ba 100644 --- a/examples/Contrib/IRRemoteControlDevice-example.py +++ b/examples/Contrib/IRRemoteControlDevice-example.py @@ -95,8 +95,7 @@ # create the device. this will connect to it to try and determine which DPS it uses -#ir = Contrib.IRRemoteControlDevice( 'abcdefghijklmnop123456', '172.28.321.475', '1234567890123abc', persist=True ) -ir = Contrib.IRRemoteControlDevice("8357021598cdac0aff74", address='172.20.10.150', local_key='3ba3a31d71dd9ee8', version=3.3, persist=True) +ir = Contrib.IRRemoteControlDevice( 'abcdefghijklmnop123456', '172.28.321.475', '1234567890123abc', persist=True ) print( 'Turning the Samsung tv on with pulses' ) diff --git a/tinytuya/Contrib/IRRemoteControlDevice.py b/tinytuya/Contrib/IRRemoteControlDevice.py index 29594a01..2a0d2735 100644 --- a/tinytuya/Contrib/IRRemoteControlDevice.py +++ b/tinytuya/Contrib/IRRemoteControlDevice.py @@ -166,19 +166,45 @@ def __init__(self, *args, **kwargs): super(IRRemoteControlDevice, self).__init__(*args, **kwargs) + self.disabledetect = True self.control_type = control_type if not self.control_type: self.detect_control_type() def detect_control_type( self ): + # This is more difficult than it seems. Neither device responds to status() after + # a reboot until after a command is sent. 201 devices do not respond to study_end + # if they are already in that mode. + old_timeout = self.connection_timeout + old_persist = self.socketPersistent + self.set_socketTimeout( 1 ) + self.set_socketPersistent( True ) + self.control_type = 1 + self.study_end() + self.control_type = 2 + self.study_end() + self.control_type = 0 status = self.status() - if status and 'dps' in status: - # original devices using DPS 201/202 - if self.DP_SEND_IR in status['dps']: - self.control_type = 1 - # newer devices using DPS 1-13 - elif self.DP_MODE in status['dps']: - self.control_type = 2 + while status: + if status and 'dps' in status: + # original devices using DPS 201/202 + if self.DP_SEND_IR in status['dps']: + log.debug( 'Detected control type 1' ) + self.control_type = 1 + break + # newer devices using DPS 1-13 + elif self.DP_MODE in status['dps']: + log.debug( 'Detected control type 2' ) + self.control_type = 2 + break + status = self._send_receive(None) + if not self.control_type: + log.warning( 'Detect control type failed! control_type= must be set manually' ) + elif status: + # try and make sure no data is waiting to be read + status = self._send_receive(None) + self.set_socketTimeout( old_timeout ) + self.set_socketPersistent( old_persist ) def send_command( self, mode, data={} ): if mode == 'send':