From d9b83d16123bf4f25746998581c3e06d93cad03d Mon Sep 17 00:00:00 2001 From: Stefan Hellander Date: Wed, 15 May 2024 21:19:49 +0200 Subject: [PATCH 1/3] Sets client status to online in database when client connects. Also clean up status of previously connected clients on startup. --- fedn/network/combiner/combiner.py | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/fedn/network/combiner/combiner.py b/fedn/network/combiner/combiner.py index f9491b90e..32c88acbc 100644 --- a/fedn/network/combiner/combiner.py +++ b/fedn/network/combiner/combiner.py @@ -64,8 +64,7 @@ def __init__(self, config): # Client queues self.clients = {} - - self.modelservice = ModelService() + # Validate combiner name match = re.search(VALID_NAME_REGEX, config["name"]) @@ -122,6 +121,19 @@ def __init__(self, config): self.repository = Repository(announce_config["storage"]["storage_config"]) self.statestore = MongoStateStore(announce_config["statestore"]["network_id"], announce_config["statestore"]["mongo_config"]) + + # Fetch all clients previously connected to the combiner + # If a client and a combiner goes down at the same time, + # the client will be stuck listed as "online" in the statestore. + # Set the status to offline for previous clients. + previous_clients = self.statestore.clients.find({"combiner": config["name"]}) + for client in previous_clients: + self.statestore.set_client({"name": client["name"], "status": "offline"}) + + print("Previous clients: {}".format(self.clients)) + + self.modelservice = ModelService() + # Create gRPC server self.server = Server(self, self.modelservice, grpc_config) @@ -316,10 +328,13 @@ def _list_active_clients(self, channel): "update_active_clients": [], "update_offline_clients": [], } + print(self._list_subscribed_clients(channel)) for client in self._list_subscribed_clients(channel): status = self.clients[client]["status"] + print("Client: {}, status={}".format(client, status)) now = datetime.now() then = self.clients[client]["lastseen"] + print("Time since seen: {}".format(now - then)) if (now - then) < timedelta(seconds=10): clients["active_clients"].append(client) # If client has changed status, update statestore @@ -331,13 +346,15 @@ def _list_active_clients(self, channel): clients["update_offline_clients"].append(client) # Update statestore with client status if len(clients["update_active_clients"]) > 0: + print("Updating active clients: {}".format(clients["update_active_clients"])) self.statestore.update_client_status(clients["update_active_clients"], "online") if len(clients["update_offline_clients"]) > 0: + print("Updating offline clients: {}".format(clients["update_offline_clients"])) self.statestore.update_client_status(clients["update_offline_clients"], "offline") return clients["active_clients"] - def _deamon_thread_client_status(self, timeout=5): + def _deamon_thread_client_status(self, timeout=10): """Deamon thread that checks for inactive clients and updates statestore.""" while True: time.sleep(timeout) @@ -600,6 +617,10 @@ def TaskStream(self, response, context): self._send_status(status) + # Set client status to online + self.clients[client.name]["status"] = "online" + self.statestore.set_client({"name": client.name, "status": "online"}) + # Keep track of the time context has been active start_time = time.time() while context.is_active(): From a256c4491964ac76dc4d77f5c564207e11341f0a Mon Sep 17 00:00:00 2001 From: Stefan Hellander Date: Wed, 15 May 2024 21:26:03 +0200 Subject: [PATCH 2/3] Cleanup --- fedn/network/combiner/combiner.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/fedn/network/combiner/combiner.py b/fedn/network/combiner/combiner.py index 32c88acbc..abc34c994 100644 --- a/fedn/network/combiner/combiner.py +++ b/fedn/network/combiner/combiner.py @@ -130,8 +130,6 @@ def __init__(self, config): for client in previous_clients: self.statestore.set_client({"name": client["name"], "status": "offline"}) - print("Previous clients: {}".format(self.clients)) - self.modelservice = ModelService() # Create gRPC server @@ -328,13 +326,10 @@ def _list_active_clients(self, channel): "update_active_clients": [], "update_offline_clients": [], } - print(self._list_subscribed_clients(channel)) for client in self._list_subscribed_clients(channel): status = self.clients[client]["status"] - print("Client: {}, status={}".format(client, status)) now = datetime.now() then = self.clients[client]["lastseen"] - print("Time since seen: {}".format(now - then)) if (now - then) < timedelta(seconds=10): clients["active_clients"].append(client) # If client has changed status, update statestore @@ -346,15 +341,13 @@ def _list_active_clients(self, channel): clients["update_offline_clients"].append(client) # Update statestore with client status if len(clients["update_active_clients"]) > 0: - print("Updating active clients: {}".format(clients["update_active_clients"])) self.statestore.update_client_status(clients["update_active_clients"], "online") if len(clients["update_offline_clients"]) > 0: - print("Updating offline clients: {}".format(clients["update_offline_clients"])) self.statestore.update_client_status(clients["update_offline_clients"], "offline") return clients["active_clients"] - def _deamon_thread_client_status(self, timeout=10): + def _deamon_thread_client_status(self, timeout=5): """Deamon thread that checks for inactive clients and updates statestore.""" while True: time.sleep(timeout) From 9eb0568607ec168a8a0d688087eebbbb54d05eea Mon Sep 17 00:00:00 2001 From: Stefan Hellander Date: Thu, 16 May 2024 10:05:57 +0200 Subject: [PATCH 3/3] Linting --- fedn/network/combiner/combiner.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fedn/network/combiner/combiner.py b/fedn/network/combiner/combiner.py index abc34c994..f66e2af8f 100644 --- a/fedn/network/combiner/combiner.py +++ b/fedn/network/combiner/combiner.py @@ -64,7 +64,7 @@ def __init__(self, config): # Client queues self.clients = {} - + # Validate combiner name match = re.search(VALID_NAME_REGEX, config["name"]) @@ -121,7 +121,7 @@ def __init__(self, config): self.repository = Repository(announce_config["storage"]["storage_config"]) self.statestore = MongoStateStore(announce_config["statestore"]["network_id"], announce_config["statestore"]["mongo_config"]) - + # Fetch all clients previously connected to the combiner # If a client and a combiner goes down at the same time, # the client will be stuck listed as "online" in the statestore. @@ -131,7 +131,7 @@ def __init__(self, config): self.statestore.set_client({"name": client["name"], "status": "offline"}) self.modelservice = ModelService() - + # Create gRPC server self.server = Server(self, self.modelservice, grpc_config)