From 565e57819aee56dc6f6c90c22e185437bcb11fc7 Mon Sep 17 00:00:00 2001 From: Jason Cox Date: Sat, 19 Oct 2024 22:31:25 -0700 Subject: [PATCH] Refactor threading.py to add error handling and heartbeat keepalive --- examples/threading.py | 90 ++++++++++++++++++++++++++++++++----------- 1 file changed, 68 insertions(+), 22 deletions(-) diff --git a/examples/threading.py b/examples/threading.py index a919d3e..e3ac712 100644 --- a/examples/threading.py +++ b/examples/threading.py @@ -50,6 +50,10 @@ ] } + +# Settings +TTL_HEARTBEAT = 12 # Time in seconds between heartbeats + # Create array, devices, that is an array of tinytuya.Device objects devices = [] for i in config["TuyaDevices"]: @@ -62,33 +66,75 @@ def getDeviceStatuses(): global statuses def listen_for_status_updates(device, index): + """ + Thread function to continuously listen for status updates and send heartbeats. + """ # Enable persistent connection to the device - device.set_socketPersistent(True) - - # Call status() once to establish connection and get initial status - initial_status = device.status() - print(f"INITIAL status from {device.id}: {initial_status}") - statuses[index] = {"id": device.id, "status": initial_status["dps"]} - + def reconnect(): + time.sleep(5) # Cool-down before reconnecting + try: + print(f"Reconnecting to {device.id}...") + device.set_socketPersistent(True) + initial_status = device.status() + print(f"Reconnected and got status from {device.id}: {initial_status}") + statuses[index] = {"id": device.id, "status": initial_status["dps"]} + return True + except Exception as e: + print(f"Failed to reconnect to {device.id}: {e}") + return False + + try: + # Call status() once to establish connection and get initial status + device.set_socketPersistent(True) + initial_status = device.status() + print(f"INITIAL status from {device.id}: {initial_status}") + statuses[index] = {"id": device.id, "status": initial_status["dps"]} + except Exception as e: + print(f"Error getting initial status from {device.id}: {e}") + statuses[index] = {"id": device.id, "status": "Disconnected"} + return + + # Variables to track the last heartbeat + last_heartbeat_time = time.time() + # Infinite loop to listen for status updates while True: - # Listen for updates from the device (TinyTuya handles receiving updates) - updated_status = device.receive() - - if updated_status: - print(f"UPDATE status from {device.id}: {updated_status}") - # We may only get one DPS, so just update that one item - if "dps" in updated_status: - for key in updated_status["dps"]: - statuses[index]["status"][key] = updated_status["dps"][key] - print(f" - Updated status for {device.id} DPS {key} to {updated_status['dps'][key]}") - # This would be a good place to push a socket.io update to a web client - - # Add a small delay to avoid overwhelming the system - time.sleep(0.1) # 100ms + try: + # Send a heartbeat every 5 seconds + if time.time() - last_heartbeat_time >= TTL_HEARTBEAT: + try: + device.heartbeat() + print(f"Heartbeat sent to {device.id}") + last_heartbeat_time = time.time() + except Exception as hb_error: + print(f"Failed to send heartbeat to {device.id}: {hb_error}") + # Try to reconnect if the heartbeat fails + if not reconnect(): + statuses[index]["status"] = "Disconnected" + break # Exit the loop if reconnection fails + + # Listen for updates from the device + updated_status = device.receive() + + if updated_status: + print(f"UPDATE status from {device.id}: {updated_status}") + # We may only get one DPS, so just update that one item + if "dps" in updated_status: + for key in updated_status["dps"]: + statuses[index]["status"][key] = updated_status["dps"][key] + print(f" - Updated status for {device.id} DPS {key} to {updated_status['dps'][key]}") + + # Small delay to avoid tight loops + time.sleep(0.1) + + except Exception as e: + print(f"Error receiving status from {device.id}: {e}") + statuses[index]["status"] = "Disconnected" + if not reconnect(): + break # Exit the loop if reconnection fails threads = [] - + # Create and start a thread for each device for index, device in enumerate(devices): print(f"Starting thread for device {device.id}")