From 408cb43f0c9c2618f3bbb33cb5ae34343c451766 Mon Sep 17 00:00:00 2001 From: Niklas Date: Wed, 12 Jun 2024 14:28:16 +0200 Subject: [PATCH 1/5] name => id --- fedn/network/combiner/combiner.py | 8 ++++---- fedn/network/grpc/fedn.proto | 1 + 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/fedn/network/combiner/combiner.py b/fedn/network/combiner/combiner.py index 8eacd917e..6ba9e40dd 100644 --- a/fedn/network/combiner/combiner.py +++ b/fedn/network/combiner/combiner.py @@ -265,9 +265,9 @@ def __join_client(self, client): :param client: the client to add :type client: :class:`fedn.network.grpc.fedn_pb2.Client` """ - if client.name not in self.clients.keys(): + if client.id not in self.clients.keys(): # The status is set to offline by default, and will be updated once _list_active_clients is called. - self.clients[client.name] = {"lastseen": datetime.now(), "status": "offline"} + self.clients[client.id] = {"lastseen": datetime.now(), "status": "offline"} def _subscribe_client_to_queue(self, client, queue_name): """Subscribe a client to the queue. @@ -278,8 +278,8 @@ def _subscribe_client_to_queue(self, client, queue_name): :type queue_name: str """ self.__join_client(client) - if queue_name not in self.clients[client.name].keys(): - self.clients[client.name][queue_name] = queue.Queue() + if queue_name not in self.clients[client.id].keys(): + self.clients[client.id][queue_name] = queue.Queue() def __get_queue(self, client, queue_name): """Get the queue for a client. diff --git a/fedn/network/grpc/fedn.proto b/fedn/network/grpc/fedn.proto index fd2f1d5c5..a7761a10f 100644 --- a/fedn/network/grpc/fedn.proto +++ b/fedn/network/grpc/fedn.proto @@ -149,6 +149,7 @@ enum Role { message Client { Role role = 1; string name = 2; + string id = 3; } message ReassignRequest { From eeaf18fc03672ec8189c93a1e4439ded64e4e99d Mon Sep 17 00:00:00 2001 From: Niklas Date: Tue, 18 Jun 2024 13:58:39 +0200 Subject: [PATCH 2/5] training and validating based on client_id:s --- fedn/genprot.sh | 2 +- fedn/network/clients/client.py | 5 +- fedn/network/combiner/combiner.py | 18 +-- fedn/network/grpc/fedn.proto | 2 +- fedn/network/grpc/fedn_pb2.py | 138 +++++++++---------- fedn/network/grpc/fedn_pb2_grpc.py | 206 ++++++++++++++--------------- 6 files changed, 187 insertions(+), 184 deletions(-) diff --git a/fedn/genprot.sh b/fedn/genprot.sh index bb08ada35..def170de1 100755 --- a/fedn/genprot.sh +++ b/fedn/genprot.sh @@ -1,4 +1,4 @@ #!/bin/bash echo "Generating protocol" -python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. fedn/network/grpc/*.proto +python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. network/grpc/*.proto echo "DONE" diff --git a/fedn/network/clients/client.py b/fedn/network/clients/client.py index df20a8956..d257a4bab 100644 --- a/fedn/network/clients/client.py +++ b/fedn/network/clients/client.py @@ -62,6 +62,8 @@ def __init__(self, config): set_log_level_from_string(config.get("verbosity", "INFO")) set_log_stream(config.get("logfile", None)) + self.id = config["client_id"] or str(uuid.uuid4()) + self.connector = ConnectorClient( host=config["discover_host"], port=config["discover_port"], @@ -71,7 +73,7 @@ def __init__(self, config): force_ssl=config["force_ssl"], verify=config["verify"], combiner=config["preferred_combiner"], - id=config["client_id"], + id=self.id, ) # Validate client name @@ -419,6 +421,7 @@ def _listen_to_task_stream(self): r = fedn.ClientAvailableMessage() r.sender.name = self.name r.sender.role = fedn.WORKER + r.sender.client_id = self.id # Add client to metadata self._add_grpc_metadata("client", self.name) diff --git a/fedn/network/combiner/combiner.py b/fedn/network/combiner/combiner.py index 6ba9e40dd..41a0cf84a 100644 --- a/fedn/network/combiner/combiner.py +++ b/fedn/network/combiner/combiner.py @@ -186,7 +186,7 @@ def request_model_update(self, config, clients=[]): clients = self.get_active_trainers() for client in clients: - request.receiver.name = client + request.receiver.client_id = client request.receiver.role = fedn.WORKER self._put_request_to_client_queue(request, fedn.Queue.TASK_QUEUE) @@ -222,7 +222,7 @@ def request_model_validation(self, model_id, config, clients=[]): clients = self.get_active_validators() for client in clients: - request.receiver.name = client + request.receiver.client_id = client request.receiver.role = fedn.WORKER self._put_request_to_client_queue(request, fedn.Queue.TASK_QUEUE) @@ -265,9 +265,9 @@ def __join_client(self, client): :param client: the client to add :type client: :class:`fedn.network.grpc.fedn_pb2.Client` """ - if client.id not in self.clients.keys(): + if client.client_id not in self.clients.keys(): # The status is set to offline by default, and will be updated once _list_active_clients is called. - self.clients[client.id] = {"lastseen": datetime.now(), "status": "offline"} + self.clients[client.client_id] = {"lastseen": datetime.now(), "status": "offline"} def _subscribe_client_to_queue(self, client, queue_name): """Subscribe a client to the queue. @@ -278,8 +278,8 @@ def _subscribe_client_to_queue(self, client, queue_name): :type queue_name: str """ self.__join_client(client) - if queue_name not in self.clients[client.id].keys(): - self.clients[client.id][queue_name] = queue.Queue() + if queue_name not in self.clients[client.client_id].keys(): + self.clients[client.client_id][queue_name] = queue.Queue() def __get_queue(self, client, queue_name): """Get the queue for a client. @@ -294,7 +294,7 @@ def __get_queue(self, client, queue_name): :raises KeyError: if the queue does not exist """ try: - return self.clients[client.name][queue_name] + return self.clients[client.client_id][queue_name] except KeyError: raise @@ -611,7 +611,7 @@ def TaskStream(self, response, context): self._send_status(status) # Set client status to online - self.clients[client.name]["status"] = "online" + self.clients[client.client_id]["status"] = "online" self.statestore.set_client({"name": client.name, "status": "online"}) # Keep track of the time context has been active @@ -619,7 +619,7 @@ def TaskStream(self, response, context): while context.is_active(): # Check if the context has been active for more than 10 seconds if time.time() - start_time > 10: - self.clients[client.name]["lastseen"] = datetime.now() + self.clients[client.client_id]["lastseen"] = datetime.now() # Reset the start time start_time = time.time() try: diff --git a/fedn/network/grpc/fedn.proto b/fedn/network/grpc/fedn.proto index a7761a10f..04a1e5175 100644 --- a/fedn/network/grpc/fedn.proto +++ b/fedn/network/grpc/fedn.proto @@ -149,7 +149,7 @@ enum Role { message Client { Role role = 1; string name = 2; - string id = 3; + string client_id = 3; } message ReassignRequest { diff --git a/fedn/network/grpc/fedn_pb2.py b/fedn/network/grpc/fedn_pb2.py index 714bef0e5..d47f08792 100644 --- a/fedn/network/grpc/fedn_pb2.py +++ b/fedn/network/grpc/fedn_pb2.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! -# source: fedn/network/grpc/fedn.proto +# source: network/grpc/fedn.proto """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool @@ -14,78 +14,78 @@ from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1c\x66\x65\x64n/network/grpc/fedn.proto\x12\x04\x66\x65\x64n\x1a\x1fgoogle/protobuf/timestamp.proto\":\n\x08Response\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.fedn.Client\x12\x10\n\x08response\x18\x02 \x01(\t\"\xbc\x02\n\x06Status\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.fedn.Client\x12\x0e\n\x06status\x18\x02 \x01(\t\x12(\n\tlog_level\x18\x03 \x01(\x0e\x32\x15.fedn.Status.LogLevel\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12-\n\ttimestamp\x18\x06 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x1e\n\x04type\x18\x07 \x01(\x0e\x32\x10.fedn.StatusType\x12\r\n\x05\x65xtra\x18\x08 \x01(\t\x12\x12\n\nsession_id\x18\t \x01(\t\"B\n\x08LogLevel\x12\x08\n\x04INFO\x10\x00\x12\t\n\x05\x44\x45\x42UG\x10\x01\x12\x0b\n\x07WARNING\x10\x02\x12\t\n\x05\x45RROR\x10\x03\x12\t\n\x05\x41UDIT\x10\x04\"\xd8\x01\n\x0bTaskRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.fedn.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.fedn.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12\x11\n\ttimestamp\x18\x06 \x01(\t\x12\x0c\n\x04meta\x18\x07 \x01(\t\x12\x12\n\nsession_id\x18\x08 \x01(\t\x12\x1e\n\x04type\x18\t \x01(\x0e\x32\x10.fedn.StatusType\"\xbf\x01\n\x0bModelUpdate\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.fedn.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.fedn.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\x12\x17\n\x0fmodel_update_id\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12\x11\n\ttimestamp\x18\x06 \x01(\t\x12\x0c\n\x04meta\x18\x07 \x01(\t\x12\x0e\n\x06\x63onfig\x18\x08 \x01(\t\"\xd8\x01\n\x0fModelValidation\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.fedn.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.fedn.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12-\n\ttimestamp\x18\x06 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0c\n\x04meta\x18\x07 \x01(\t\x12\x12\n\nsession_id\x18\x08 \x01(\t\"\x89\x01\n\x0cModelRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.fedn.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.fedn.Client\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\x12\n\n\x02id\x18\x04 \x01(\t\x12!\n\x06status\x18\x05 \x01(\x0e\x32\x11.fedn.ModelStatus\"]\n\rModelResponse\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\x12\n\n\x02id\x18\x02 \x01(\t\x12!\n\x06status\x18\x03 \x01(\x0e\x32\x11.fedn.ModelStatus\x12\x0f\n\x07message\x18\x04 \x01(\t\"U\n\x15GetGlobalModelRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.fedn.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.fedn.Client\"h\n\x16GetGlobalModelResponse\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.fedn.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.fedn.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\")\n\tHeartbeat\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.fedn.Client\"W\n\x16\x43lientAvailableMessage\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.fedn.Client\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\t\x12\x11\n\ttimestamp\x18\x03 \x01(\t\"P\n\x12ListClientsRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.fedn.Client\x12\x1c\n\x07\x63hannel\x18\x02 \x01(\x0e\x32\x0b.fedn.Queue\"*\n\nClientList\x12\x1c\n\x06\x63lient\x18\x01 \x03(\x0b\x32\x0c.fedn.Client\"0\n\x06\x43lient\x12\x18\n\x04role\x18\x01 \x01(\x0e\x32\n.fedn.Role\x12\x0c\n\x04name\x18\x02 \x01(\t\"m\n\x0fReassignRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.fedn.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.fedn.Client\x12\x0e\n\x06server\x18\x03 \x01(\t\x12\x0c\n\x04port\x18\x04 \x01(\r\"c\n\x10ReconnectRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.fedn.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.fedn.Client\x12\x11\n\treconnect\x18\x03 \x01(\r\"\'\n\tParameter\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t\"T\n\x0e\x43ontrolRequest\x12\x1e\n\x07\x63ommand\x18\x01 \x01(\x0e\x32\r.fedn.Command\x12\"\n\tparameter\x18\x02 \x03(\x0b\x32\x0f.fedn.Parameter\"F\n\x0f\x43ontrolResponse\x12\x0f\n\x07message\x18\x01 \x01(\t\x12\"\n\tparameter\x18\x02 \x03(\x0b\x32\x0f.fedn.Parameter\"\x13\n\x11\x43onnectionRequest\"<\n\x12\x43onnectionResponse\x12&\n\x06status\x18\x01 \x01(\x0e\x32\x16.fedn.ConnectionStatus*\x84\x01\n\nStatusType\x12\x07\n\x03LOG\x10\x00\x12\x18\n\x14MODEL_UPDATE_REQUEST\x10\x01\x12\x10\n\x0cMODEL_UPDATE\x10\x02\x12\x1c\n\x18MODEL_VALIDATION_REQUEST\x10\x03\x12\x14\n\x10MODEL_VALIDATION\x10\x04\x12\r\n\tINFERENCE\x10\x05*$\n\x05Queue\x12\x0b\n\x07\x44\x45\x46\x41ULT\x10\x00\x12\x0e\n\nTASK_QUEUE\x10\x01*S\n\x0bModelStatus\x12\x06\n\x02OK\x10\x00\x12\x0f\n\x0bIN_PROGRESS\x10\x01\x12\x12\n\x0eIN_PROGRESS_OK\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03\x12\x0b\n\x07UNKNOWN\x10\x04*8\n\x04Role\x12\n\n\x06WORKER\x10\x00\x12\x0c\n\x08\x43OMBINER\x10\x01\x12\x0b\n\x07REDUCER\x10\x02\x12\t\n\x05OTHER\x10\x03*J\n\x07\x43ommand\x12\x08\n\x04IDLE\x10\x00\x12\t\n\x05START\x10\x01\x12\t\n\x05PAUSE\x10\x02\x12\x08\n\x04STOP\x10\x03\x12\t\n\x05RESET\x10\x04\x12\n\n\x06REPORT\x10\x05*I\n\x10\x43onnectionStatus\x12\x11\n\rNOT_ACCEPTING\x10\x00\x12\r\n\tACCEPTING\x10\x01\x12\x13\n\x0fTRY_AGAIN_LATER\x10\x02\x32z\n\x0cModelService\x12\x33\n\x06Upload\x12\x12.fedn.ModelRequest\x1a\x13.fedn.ModelResponse(\x01\x12\x35\n\x08\x44ownload\x12\x12.fedn.ModelRequest\x1a\x13.fedn.ModelResponse0\x01\x32\xf8\x01\n\x07\x43ontrol\x12\x34\n\x05Start\x12\x14.fedn.ControlRequest\x1a\x15.fedn.ControlResponse\x12\x33\n\x04Stop\x12\x14.fedn.ControlRequest\x1a\x15.fedn.ControlResponse\x12\x44\n\x15\x46lushAggregationQueue\x12\x14.fedn.ControlRequest\x1a\x15.fedn.ControlResponse\x12<\n\rSetAggregator\x12\x14.fedn.ControlRequest\x1a\x15.fedn.ControlResponse2V\n\x07Reducer\x12K\n\x0eGetGlobalModel\x12\x1b.fedn.GetGlobalModelRequest\x1a\x1c.fedn.GetGlobalModelResponse2\xab\x03\n\tConnector\x12\x44\n\x14\x41llianceStatusStream\x12\x1c.fedn.ClientAvailableMessage\x1a\x0c.fedn.Status0\x01\x12*\n\nSendStatus\x12\x0c.fedn.Status\x1a\x0e.fedn.Response\x12?\n\x11ListActiveClients\x12\x18.fedn.ListClientsRequest\x1a\x10.fedn.ClientList\x12\x45\n\x10\x41\x63\x63\x65ptingClients\x12\x17.fedn.ConnectionRequest\x1a\x18.fedn.ConnectionResponse\x12\x30\n\rSendHeartbeat\x12\x0f.fedn.Heartbeat\x1a\x0e.fedn.Response\x12\x37\n\x0eReassignClient\x12\x15.fedn.ReassignRequest\x1a\x0e.fedn.Response\x12\x39\n\x0fReconnectClient\x12\x16.fedn.ReconnectRequest\x1a\x0e.fedn.Response2\xbf\x01\n\x08\x43ombiner\x12?\n\nTaskStream\x12\x1c.fedn.ClientAvailableMessage\x1a\x11.fedn.TaskRequest0\x01\x12\x34\n\x0fSendModelUpdate\x12\x11.fedn.ModelUpdate\x1a\x0e.fedn.Response\x12<\n\x13SendModelValidation\x12\x15.fedn.ModelValidation\x1a\x0e.fedn.Responseb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x17network/grpc/fedn.proto\x12\x04\x66\x65\x64n\x1a\x1fgoogle/protobuf/timestamp.proto\":\n\x08Response\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.fedn.Client\x12\x10\n\x08response\x18\x02 \x01(\t\"\xbc\x02\n\x06Status\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.fedn.Client\x12\x0e\n\x06status\x18\x02 \x01(\t\x12(\n\tlog_level\x18\x03 \x01(\x0e\x32\x15.fedn.Status.LogLevel\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12-\n\ttimestamp\x18\x06 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x1e\n\x04type\x18\x07 \x01(\x0e\x32\x10.fedn.StatusType\x12\r\n\x05\x65xtra\x18\x08 \x01(\t\x12\x12\n\nsession_id\x18\t \x01(\t\"B\n\x08LogLevel\x12\x08\n\x04INFO\x10\x00\x12\t\n\x05\x44\x45\x42UG\x10\x01\x12\x0b\n\x07WARNING\x10\x02\x12\t\n\x05\x45RROR\x10\x03\x12\t\n\x05\x41UDIT\x10\x04\"\xd8\x01\n\x0bTaskRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.fedn.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.fedn.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12\x11\n\ttimestamp\x18\x06 \x01(\t\x12\x0c\n\x04meta\x18\x07 \x01(\t\x12\x12\n\nsession_id\x18\x08 \x01(\t\x12\x1e\n\x04type\x18\t \x01(\x0e\x32\x10.fedn.StatusType\"\xbf\x01\n\x0bModelUpdate\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.fedn.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.fedn.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\x12\x17\n\x0fmodel_update_id\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12\x11\n\ttimestamp\x18\x06 \x01(\t\x12\x0c\n\x04meta\x18\x07 \x01(\t\x12\x0e\n\x06\x63onfig\x18\x08 \x01(\t\"\xd8\x01\n\x0fModelValidation\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.fedn.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.fedn.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12-\n\ttimestamp\x18\x06 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0c\n\x04meta\x18\x07 \x01(\t\x12\x12\n\nsession_id\x18\x08 \x01(\t\"\x89\x01\n\x0cModelRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.fedn.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.fedn.Client\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\x12\n\n\x02id\x18\x04 \x01(\t\x12!\n\x06status\x18\x05 \x01(\x0e\x32\x11.fedn.ModelStatus\"]\n\rModelResponse\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\x12\n\n\x02id\x18\x02 \x01(\t\x12!\n\x06status\x18\x03 \x01(\x0e\x32\x11.fedn.ModelStatus\x12\x0f\n\x07message\x18\x04 \x01(\t\"U\n\x15GetGlobalModelRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.fedn.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.fedn.Client\"h\n\x16GetGlobalModelResponse\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.fedn.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.fedn.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\")\n\tHeartbeat\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.fedn.Client\"W\n\x16\x43lientAvailableMessage\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.fedn.Client\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\t\x12\x11\n\ttimestamp\x18\x03 \x01(\t\"P\n\x12ListClientsRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.fedn.Client\x12\x1c\n\x07\x63hannel\x18\x02 \x01(\x0e\x32\x0b.fedn.Queue\"*\n\nClientList\x12\x1c\n\x06\x63lient\x18\x01 \x03(\x0b\x32\x0c.fedn.Client\"C\n\x06\x43lient\x12\x18\n\x04role\x18\x01 \x01(\x0e\x32\n.fedn.Role\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x11\n\tclient_id\x18\x03 \x01(\t\"m\n\x0fReassignRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.fedn.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.fedn.Client\x12\x0e\n\x06server\x18\x03 \x01(\t\x12\x0c\n\x04port\x18\x04 \x01(\r\"c\n\x10ReconnectRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.fedn.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.fedn.Client\x12\x11\n\treconnect\x18\x03 \x01(\r\"\'\n\tParameter\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t\"T\n\x0e\x43ontrolRequest\x12\x1e\n\x07\x63ommand\x18\x01 \x01(\x0e\x32\r.fedn.Command\x12\"\n\tparameter\x18\x02 \x03(\x0b\x32\x0f.fedn.Parameter\"F\n\x0f\x43ontrolResponse\x12\x0f\n\x07message\x18\x01 \x01(\t\x12\"\n\tparameter\x18\x02 \x03(\x0b\x32\x0f.fedn.Parameter\"\x13\n\x11\x43onnectionRequest\"<\n\x12\x43onnectionResponse\x12&\n\x06status\x18\x01 \x01(\x0e\x32\x16.fedn.ConnectionStatus*\x84\x01\n\nStatusType\x12\x07\n\x03LOG\x10\x00\x12\x18\n\x14MODEL_UPDATE_REQUEST\x10\x01\x12\x10\n\x0cMODEL_UPDATE\x10\x02\x12\x1c\n\x18MODEL_VALIDATION_REQUEST\x10\x03\x12\x14\n\x10MODEL_VALIDATION\x10\x04\x12\r\n\tINFERENCE\x10\x05*$\n\x05Queue\x12\x0b\n\x07\x44\x45\x46\x41ULT\x10\x00\x12\x0e\n\nTASK_QUEUE\x10\x01*S\n\x0bModelStatus\x12\x06\n\x02OK\x10\x00\x12\x0f\n\x0bIN_PROGRESS\x10\x01\x12\x12\n\x0eIN_PROGRESS_OK\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03\x12\x0b\n\x07UNKNOWN\x10\x04*8\n\x04Role\x12\n\n\x06WORKER\x10\x00\x12\x0c\n\x08\x43OMBINER\x10\x01\x12\x0b\n\x07REDUCER\x10\x02\x12\t\n\x05OTHER\x10\x03*J\n\x07\x43ommand\x12\x08\n\x04IDLE\x10\x00\x12\t\n\x05START\x10\x01\x12\t\n\x05PAUSE\x10\x02\x12\x08\n\x04STOP\x10\x03\x12\t\n\x05RESET\x10\x04\x12\n\n\x06REPORT\x10\x05*I\n\x10\x43onnectionStatus\x12\x11\n\rNOT_ACCEPTING\x10\x00\x12\r\n\tACCEPTING\x10\x01\x12\x13\n\x0fTRY_AGAIN_LATER\x10\x02\x32z\n\x0cModelService\x12\x33\n\x06Upload\x12\x12.fedn.ModelRequest\x1a\x13.fedn.ModelResponse(\x01\x12\x35\n\x08\x44ownload\x12\x12.fedn.ModelRequest\x1a\x13.fedn.ModelResponse0\x01\x32\xf8\x01\n\x07\x43ontrol\x12\x34\n\x05Start\x12\x14.fedn.ControlRequest\x1a\x15.fedn.ControlResponse\x12\x33\n\x04Stop\x12\x14.fedn.ControlRequest\x1a\x15.fedn.ControlResponse\x12\x44\n\x15\x46lushAggregationQueue\x12\x14.fedn.ControlRequest\x1a\x15.fedn.ControlResponse\x12<\n\rSetAggregator\x12\x14.fedn.ControlRequest\x1a\x15.fedn.ControlResponse2V\n\x07Reducer\x12K\n\x0eGetGlobalModel\x12\x1b.fedn.GetGlobalModelRequest\x1a\x1c.fedn.GetGlobalModelResponse2\xab\x03\n\tConnector\x12\x44\n\x14\x41llianceStatusStream\x12\x1c.fedn.ClientAvailableMessage\x1a\x0c.fedn.Status0\x01\x12*\n\nSendStatus\x12\x0c.fedn.Status\x1a\x0e.fedn.Response\x12?\n\x11ListActiveClients\x12\x18.fedn.ListClientsRequest\x1a\x10.fedn.ClientList\x12\x45\n\x10\x41\x63\x63\x65ptingClients\x12\x17.fedn.ConnectionRequest\x1a\x18.fedn.ConnectionResponse\x12\x30\n\rSendHeartbeat\x12\x0f.fedn.Heartbeat\x1a\x0e.fedn.Response\x12\x37\n\x0eReassignClient\x12\x15.fedn.ReassignRequest\x1a\x0e.fedn.Response\x12\x39\n\x0fReconnectClient\x12\x16.fedn.ReconnectRequest\x1a\x0e.fedn.Response2\xbf\x01\n\x08\x43ombiner\x12?\n\nTaskStream\x12\x1c.fedn.ClientAvailableMessage\x1a\x11.fedn.TaskRequest0\x01\x12\x34\n\x0fSendModelUpdate\x12\x11.fedn.ModelUpdate\x1a\x0e.fedn.Response\x12<\n\x13SendModelValidation\x12\x15.fedn.ModelValidation\x1a\x0e.fedn.Responseb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) -_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'fedn.network.grpc.fedn_pb2', _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'network.grpc.fedn_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: DESCRIPTOR._options = None - _globals['_STATUSTYPE']._serialized_start=2313 - _globals['_STATUSTYPE']._serialized_end=2445 - _globals['_QUEUE']._serialized_start=2447 - _globals['_QUEUE']._serialized_end=2483 - _globals['_MODELSTATUS']._serialized_start=2485 - _globals['_MODELSTATUS']._serialized_end=2568 - _globals['_ROLE']._serialized_start=2570 - _globals['_ROLE']._serialized_end=2626 - _globals['_COMMAND']._serialized_start=2628 - _globals['_COMMAND']._serialized_end=2702 - _globals['_CONNECTIONSTATUS']._serialized_start=2704 - _globals['_CONNECTIONSTATUS']._serialized_end=2777 - _globals['_RESPONSE']._serialized_start=71 - _globals['_RESPONSE']._serialized_end=129 - _globals['_STATUS']._serialized_start=132 - _globals['_STATUS']._serialized_end=448 - _globals['_STATUS_LOGLEVEL']._serialized_start=382 - _globals['_STATUS_LOGLEVEL']._serialized_end=448 - _globals['_TASKREQUEST']._serialized_start=451 - _globals['_TASKREQUEST']._serialized_end=667 - _globals['_MODELUPDATE']._serialized_start=670 - _globals['_MODELUPDATE']._serialized_end=861 - _globals['_MODELVALIDATION']._serialized_start=864 - _globals['_MODELVALIDATION']._serialized_end=1080 - _globals['_MODELREQUEST']._serialized_start=1083 - _globals['_MODELREQUEST']._serialized_end=1220 - _globals['_MODELRESPONSE']._serialized_start=1222 - _globals['_MODELRESPONSE']._serialized_end=1315 - _globals['_GETGLOBALMODELREQUEST']._serialized_start=1317 - _globals['_GETGLOBALMODELREQUEST']._serialized_end=1402 - _globals['_GETGLOBALMODELRESPONSE']._serialized_start=1404 - _globals['_GETGLOBALMODELRESPONSE']._serialized_end=1508 - _globals['_HEARTBEAT']._serialized_start=1510 - _globals['_HEARTBEAT']._serialized_end=1551 - _globals['_CLIENTAVAILABLEMESSAGE']._serialized_start=1553 - _globals['_CLIENTAVAILABLEMESSAGE']._serialized_end=1640 - _globals['_LISTCLIENTSREQUEST']._serialized_start=1642 - _globals['_LISTCLIENTSREQUEST']._serialized_end=1722 - _globals['_CLIENTLIST']._serialized_start=1724 - _globals['_CLIENTLIST']._serialized_end=1766 - _globals['_CLIENT']._serialized_start=1768 - _globals['_CLIENT']._serialized_end=1816 - _globals['_REASSIGNREQUEST']._serialized_start=1818 - _globals['_REASSIGNREQUEST']._serialized_end=1927 - _globals['_RECONNECTREQUEST']._serialized_start=1929 - _globals['_RECONNECTREQUEST']._serialized_end=2028 - _globals['_PARAMETER']._serialized_start=2030 - _globals['_PARAMETER']._serialized_end=2069 - _globals['_CONTROLREQUEST']._serialized_start=2071 - _globals['_CONTROLREQUEST']._serialized_end=2155 - _globals['_CONTROLRESPONSE']._serialized_start=2157 - _globals['_CONTROLRESPONSE']._serialized_end=2227 - _globals['_CONNECTIONREQUEST']._serialized_start=2229 - _globals['_CONNECTIONREQUEST']._serialized_end=2248 - _globals['_CONNECTIONRESPONSE']._serialized_start=2250 - _globals['_CONNECTIONRESPONSE']._serialized_end=2310 - _globals['_MODELSERVICE']._serialized_start=2779 - _globals['_MODELSERVICE']._serialized_end=2901 - _globals['_CONTROL']._serialized_start=2904 - _globals['_CONTROL']._serialized_end=3152 - _globals['_REDUCER']._serialized_start=3154 - _globals['_REDUCER']._serialized_end=3240 - _globals['_CONNECTOR']._serialized_start=3243 - _globals['_CONNECTOR']._serialized_end=3670 - _globals['_COMBINER']._serialized_start=3673 - _globals['_COMBINER']._serialized_end=3864 + _globals['_STATUSTYPE']._serialized_start=2327 + _globals['_STATUSTYPE']._serialized_end=2459 + _globals['_QUEUE']._serialized_start=2461 + _globals['_QUEUE']._serialized_end=2497 + _globals['_MODELSTATUS']._serialized_start=2499 + _globals['_MODELSTATUS']._serialized_end=2582 + _globals['_ROLE']._serialized_start=2584 + _globals['_ROLE']._serialized_end=2640 + _globals['_COMMAND']._serialized_start=2642 + _globals['_COMMAND']._serialized_end=2716 + _globals['_CONNECTIONSTATUS']._serialized_start=2718 + _globals['_CONNECTIONSTATUS']._serialized_end=2791 + _globals['_RESPONSE']._serialized_start=66 + _globals['_RESPONSE']._serialized_end=124 + _globals['_STATUS']._serialized_start=127 + _globals['_STATUS']._serialized_end=443 + _globals['_STATUS_LOGLEVEL']._serialized_start=377 + _globals['_STATUS_LOGLEVEL']._serialized_end=443 + _globals['_TASKREQUEST']._serialized_start=446 + _globals['_TASKREQUEST']._serialized_end=662 + _globals['_MODELUPDATE']._serialized_start=665 + _globals['_MODELUPDATE']._serialized_end=856 + _globals['_MODELVALIDATION']._serialized_start=859 + _globals['_MODELVALIDATION']._serialized_end=1075 + _globals['_MODELREQUEST']._serialized_start=1078 + _globals['_MODELREQUEST']._serialized_end=1215 + _globals['_MODELRESPONSE']._serialized_start=1217 + _globals['_MODELRESPONSE']._serialized_end=1310 + _globals['_GETGLOBALMODELREQUEST']._serialized_start=1312 + _globals['_GETGLOBALMODELREQUEST']._serialized_end=1397 + _globals['_GETGLOBALMODELRESPONSE']._serialized_start=1399 + _globals['_GETGLOBALMODELRESPONSE']._serialized_end=1503 + _globals['_HEARTBEAT']._serialized_start=1505 + _globals['_HEARTBEAT']._serialized_end=1546 + _globals['_CLIENTAVAILABLEMESSAGE']._serialized_start=1548 + _globals['_CLIENTAVAILABLEMESSAGE']._serialized_end=1635 + _globals['_LISTCLIENTSREQUEST']._serialized_start=1637 + _globals['_LISTCLIENTSREQUEST']._serialized_end=1717 + _globals['_CLIENTLIST']._serialized_start=1719 + _globals['_CLIENTLIST']._serialized_end=1761 + _globals['_CLIENT']._serialized_start=1763 + _globals['_CLIENT']._serialized_end=1830 + _globals['_REASSIGNREQUEST']._serialized_start=1832 + _globals['_REASSIGNREQUEST']._serialized_end=1941 + _globals['_RECONNECTREQUEST']._serialized_start=1943 + _globals['_RECONNECTREQUEST']._serialized_end=2042 + _globals['_PARAMETER']._serialized_start=2044 + _globals['_PARAMETER']._serialized_end=2083 + _globals['_CONTROLREQUEST']._serialized_start=2085 + _globals['_CONTROLREQUEST']._serialized_end=2169 + _globals['_CONTROLRESPONSE']._serialized_start=2171 + _globals['_CONTROLRESPONSE']._serialized_end=2241 + _globals['_CONNECTIONREQUEST']._serialized_start=2243 + _globals['_CONNECTIONREQUEST']._serialized_end=2262 + _globals['_CONNECTIONRESPONSE']._serialized_start=2264 + _globals['_CONNECTIONRESPONSE']._serialized_end=2324 + _globals['_MODELSERVICE']._serialized_start=2793 + _globals['_MODELSERVICE']._serialized_end=2915 + _globals['_CONTROL']._serialized_start=2918 + _globals['_CONTROL']._serialized_end=3166 + _globals['_REDUCER']._serialized_start=3168 + _globals['_REDUCER']._serialized_end=3254 + _globals['_CONNECTOR']._serialized_start=3257 + _globals['_CONNECTOR']._serialized_end=3684 + _globals['_COMBINER']._serialized_start=3687 + _globals['_COMBINER']._serialized_end=3878 # @@protoc_insertion_point(module_scope) diff --git a/fedn/network/grpc/fedn_pb2_grpc.py b/fedn/network/grpc/fedn_pb2_grpc.py index 3c2db9a2c..86ebea055 100644 --- a/fedn/network/grpc/fedn_pb2_grpc.py +++ b/fedn/network/grpc/fedn_pb2_grpc.py @@ -2,7 +2,7 @@ """Client and server classes corresponding to protobuf-defined services.""" import grpc -from fedn.network.grpc import fedn_pb2 as fedn_dot_network_dot_grpc_dot_fedn__pb2 +from ..grpc import fedn_pb2 as network_dot_grpc_dot_fedn__pb2 class ModelServiceStub(object): @@ -16,13 +16,13 @@ def __init__(self, channel): """ self.Upload = channel.stream_unary( '/fedn.ModelService/Upload', - request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelRequest.SerializeToString, - response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelResponse.FromString, + request_serializer=network_dot_grpc_dot_fedn__pb2.ModelRequest.SerializeToString, + response_deserializer=network_dot_grpc_dot_fedn__pb2.ModelResponse.FromString, ) self.Download = channel.unary_stream( '/fedn.ModelService/Download', - request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelRequest.SerializeToString, - response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelResponse.FromString, + request_serializer=network_dot_grpc_dot_fedn__pb2.ModelRequest.SerializeToString, + response_deserializer=network_dot_grpc_dot_fedn__pb2.ModelResponse.FromString, ) @@ -46,13 +46,13 @@ def add_ModelServiceServicer_to_server(servicer, server): rpc_method_handlers = { 'Upload': grpc.stream_unary_rpc_method_handler( servicer.Upload, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelRequest.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelResponse.SerializeToString, + request_deserializer=network_dot_grpc_dot_fedn__pb2.ModelRequest.FromString, + response_serializer=network_dot_grpc_dot_fedn__pb2.ModelResponse.SerializeToString, ), 'Download': grpc.unary_stream_rpc_method_handler( servicer.Download, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelRequest.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelResponse.SerializeToString, + request_deserializer=network_dot_grpc_dot_fedn__pb2.ModelRequest.FromString, + response_serializer=network_dot_grpc_dot_fedn__pb2.ModelResponse.SerializeToString, ), } generic_handler = grpc.method_handlers_generic_handler( @@ -76,8 +76,8 @@ def Upload(request_iterator, timeout=None, metadata=None): return grpc.experimental.stream_unary(request_iterator, target, '/fedn.ModelService/Upload', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelRequest.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelResponse.FromString, + network_dot_grpc_dot_fedn__pb2.ModelRequest.SerializeToString, + network_dot_grpc_dot_fedn__pb2.ModelResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @@ -93,8 +93,8 @@ def Download(request, timeout=None, metadata=None): return grpc.experimental.unary_stream(request, target, '/fedn.ModelService/Download', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelRequest.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelResponse.FromString, + network_dot_grpc_dot_fedn__pb2.ModelRequest.SerializeToString, + network_dot_grpc_dot_fedn__pb2.ModelResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @@ -110,23 +110,23 @@ def __init__(self, channel): """ self.Start = channel.unary_unary( '/fedn.Control/Start', - request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, - response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, + request_serializer=network_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, + response_deserializer=network_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, ) self.Stop = channel.unary_unary( '/fedn.Control/Stop', - request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, - response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, + request_serializer=network_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, + response_deserializer=network_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, ) self.FlushAggregationQueue = channel.unary_unary( '/fedn.Control/FlushAggregationQueue', - request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, - response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, + request_serializer=network_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, + response_deserializer=network_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, ) self.SetAggregator = channel.unary_unary( '/fedn.Control/SetAggregator', - request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, - response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, + request_serializer=network_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, + response_deserializer=network_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, ) @@ -162,23 +162,23 @@ def add_ControlServicer_to_server(servicer, server): rpc_method_handlers = { 'Start': grpc.unary_unary_rpc_method_handler( servicer.Start, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.SerializeToString, + request_deserializer=network_dot_grpc_dot_fedn__pb2.ControlRequest.FromString, + response_serializer=network_dot_grpc_dot_fedn__pb2.ControlResponse.SerializeToString, ), 'Stop': grpc.unary_unary_rpc_method_handler( servicer.Stop, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.SerializeToString, + request_deserializer=network_dot_grpc_dot_fedn__pb2.ControlRequest.FromString, + response_serializer=network_dot_grpc_dot_fedn__pb2.ControlResponse.SerializeToString, ), 'FlushAggregationQueue': grpc.unary_unary_rpc_method_handler( servicer.FlushAggregationQueue, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.SerializeToString, + request_deserializer=network_dot_grpc_dot_fedn__pb2.ControlRequest.FromString, + response_serializer=network_dot_grpc_dot_fedn__pb2.ControlResponse.SerializeToString, ), 'SetAggregator': grpc.unary_unary_rpc_method_handler( servicer.SetAggregator, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.SerializeToString, + request_deserializer=network_dot_grpc_dot_fedn__pb2.ControlRequest.FromString, + response_serializer=network_dot_grpc_dot_fedn__pb2.ControlResponse.SerializeToString, ), } generic_handler = grpc.method_handlers_generic_handler( @@ -202,8 +202,8 @@ def Start(request, timeout=None, metadata=None): return grpc.experimental.unary_unary(request, target, '/fedn.Control/Start', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, + network_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, + network_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @@ -219,8 +219,8 @@ def Stop(request, timeout=None, metadata=None): return grpc.experimental.unary_unary(request, target, '/fedn.Control/Stop', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, + network_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, + network_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @@ -236,8 +236,8 @@ def FlushAggregationQueue(request, timeout=None, metadata=None): return grpc.experimental.unary_unary(request, target, '/fedn.Control/FlushAggregationQueue', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, + network_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, + network_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @@ -253,8 +253,8 @@ def SetAggregator(request, timeout=None, metadata=None): return grpc.experimental.unary_unary(request, target, '/fedn.Control/SetAggregator', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, + network_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, + network_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @@ -270,8 +270,8 @@ def __init__(self, channel): """ self.GetGlobalModel = channel.unary_unary( '/fedn.Reducer/GetGlobalModel', - request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.GetGlobalModelRequest.SerializeToString, - response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.GetGlobalModelResponse.FromString, + request_serializer=network_dot_grpc_dot_fedn__pb2.GetGlobalModelRequest.SerializeToString, + response_deserializer=network_dot_grpc_dot_fedn__pb2.GetGlobalModelResponse.FromString, ) @@ -289,8 +289,8 @@ def add_ReducerServicer_to_server(servicer, server): rpc_method_handlers = { 'GetGlobalModel': grpc.unary_unary_rpc_method_handler( servicer.GetGlobalModel, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.GetGlobalModelRequest.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.GetGlobalModelResponse.SerializeToString, + request_deserializer=network_dot_grpc_dot_fedn__pb2.GetGlobalModelRequest.FromString, + response_serializer=network_dot_grpc_dot_fedn__pb2.GetGlobalModelResponse.SerializeToString, ), } generic_handler = grpc.method_handlers_generic_handler( @@ -314,8 +314,8 @@ def GetGlobalModel(request, timeout=None, metadata=None): return grpc.experimental.unary_unary(request, target, '/fedn.Reducer/GetGlobalModel', - fedn_dot_network_dot_grpc_dot_fedn__pb2.GetGlobalModelRequest.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.GetGlobalModelResponse.FromString, + network_dot_grpc_dot_fedn__pb2.GetGlobalModelRequest.SerializeToString, + network_dot_grpc_dot_fedn__pb2.GetGlobalModelResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @@ -331,38 +331,38 @@ def __init__(self, channel): """ self.AllianceStatusStream = channel.unary_stream( '/fedn.Connector/AllianceStatusStream', - request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Status.FromString, + request_serializer=network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, + response_deserializer=network_dot_grpc_dot_fedn__pb2.Status.FromString, ) self.SendStatus = channel.unary_unary( '/fedn.Connector/SendStatus', - request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Status.SerializeToString, - response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.FromString, + request_serializer=network_dot_grpc_dot_fedn__pb2.Status.SerializeToString, + response_deserializer=network_dot_grpc_dot_fedn__pb2.Response.FromString, ) self.ListActiveClients = channel.unary_unary( '/fedn.Connector/ListActiveClients', - request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ListClientsRequest.SerializeToString, - response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientList.FromString, + request_serializer=network_dot_grpc_dot_fedn__pb2.ListClientsRequest.SerializeToString, + response_deserializer=network_dot_grpc_dot_fedn__pb2.ClientList.FromString, ) self.AcceptingClients = channel.unary_unary( '/fedn.Connector/AcceptingClients', - request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ConnectionRequest.SerializeToString, - response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ConnectionResponse.FromString, + request_serializer=network_dot_grpc_dot_fedn__pb2.ConnectionRequest.SerializeToString, + response_deserializer=network_dot_grpc_dot_fedn__pb2.ConnectionResponse.FromString, ) self.SendHeartbeat = channel.unary_unary( '/fedn.Connector/SendHeartbeat', - request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Heartbeat.SerializeToString, - response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.FromString, + request_serializer=network_dot_grpc_dot_fedn__pb2.Heartbeat.SerializeToString, + response_deserializer=network_dot_grpc_dot_fedn__pb2.Response.FromString, ) self.ReassignClient = channel.unary_unary( '/fedn.Connector/ReassignClient', - request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ReassignRequest.SerializeToString, - response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.FromString, + request_serializer=network_dot_grpc_dot_fedn__pb2.ReassignRequest.SerializeToString, + response_deserializer=network_dot_grpc_dot_fedn__pb2.Response.FromString, ) self.ReconnectClient = channel.unary_unary( '/fedn.Connector/ReconnectClient', - request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ReconnectRequest.SerializeToString, - response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.FromString, + request_serializer=network_dot_grpc_dot_fedn__pb2.ReconnectRequest.SerializeToString, + response_deserializer=network_dot_grpc_dot_fedn__pb2.Response.FromString, ) @@ -421,38 +421,38 @@ def add_ConnectorServicer_to_server(servicer, server): rpc_method_handlers = { 'AllianceStatusStream': grpc.unary_stream_rpc_method_handler( servicer.AllianceStatusStream, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Status.SerializeToString, + request_deserializer=network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.FromString, + response_serializer=network_dot_grpc_dot_fedn__pb2.Status.SerializeToString, ), 'SendStatus': grpc.unary_unary_rpc_method_handler( servicer.SendStatus, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Status.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.SerializeToString, + request_deserializer=network_dot_grpc_dot_fedn__pb2.Status.FromString, + response_serializer=network_dot_grpc_dot_fedn__pb2.Response.SerializeToString, ), 'ListActiveClients': grpc.unary_unary_rpc_method_handler( servicer.ListActiveClients, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ListClientsRequest.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientList.SerializeToString, + request_deserializer=network_dot_grpc_dot_fedn__pb2.ListClientsRequest.FromString, + response_serializer=network_dot_grpc_dot_fedn__pb2.ClientList.SerializeToString, ), 'AcceptingClients': grpc.unary_unary_rpc_method_handler( servicer.AcceptingClients, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ConnectionRequest.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ConnectionResponse.SerializeToString, + request_deserializer=network_dot_grpc_dot_fedn__pb2.ConnectionRequest.FromString, + response_serializer=network_dot_grpc_dot_fedn__pb2.ConnectionResponse.SerializeToString, ), 'SendHeartbeat': grpc.unary_unary_rpc_method_handler( servicer.SendHeartbeat, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Heartbeat.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.SerializeToString, + request_deserializer=network_dot_grpc_dot_fedn__pb2.Heartbeat.FromString, + response_serializer=network_dot_grpc_dot_fedn__pb2.Response.SerializeToString, ), 'ReassignClient': grpc.unary_unary_rpc_method_handler( servicer.ReassignClient, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ReassignRequest.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.SerializeToString, + request_deserializer=network_dot_grpc_dot_fedn__pb2.ReassignRequest.FromString, + response_serializer=network_dot_grpc_dot_fedn__pb2.Response.SerializeToString, ), 'ReconnectClient': grpc.unary_unary_rpc_method_handler( servicer.ReconnectClient, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ReconnectRequest.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.SerializeToString, + request_deserializer=network_dot_grpc_dot_fedn__pb2.ReconnectRequest.FromString, + response_serializer=network_dot_grpc_dot_fedn__pb2.Response.SerializeToString, ), } generic_handler = grpc.method_handlers_generic_handler( @@ -476,8 +476,8 @@ def AllianceStatusStream(request, timeout=None, metadata=None): return grpc.experimental.unary_stream(request, target, '/fedn.Connector/AllianceStatusStream', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.Status.FromString, + network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, + network_dot_grpc_dot_fedn__pb2.Status.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @@ -493,8 +493,8 @@ def SendStatus(request, timeout=None, metadata=None): return grpc.experimental.unary_unary(request, target, '/fedn.Connector/SendStatus', - fedn_dot_network_dot_grpc_dot_fedn__pb2.Status.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.FromString, + network_dot_grpc_dot_fedn__pb2.Status.SerializeToString, + network_dot_grpc_dot_fedn__pb2.Response.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @@ -510,8 +510,8 @@ def ListActiveClients(request, timeout=None, metadata=None): return grpc.experimental.unary_unary(request, target, '/fedn.Connector/ListActiveClients', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ListClientsRequest.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientList.FromString, + network_dot_grpc_dot_fedn__pb2.ListClientsRequest.SerializeToString, + network_dot_grpc_dot_fedn__pb2.ClientList.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @@ -527,8 +527,8 @@ def AcceptingClients(request, timeout=None, metadata=None): return grpc.experimental.unary_unary(request, target, '/fedn.Connector/AcceptingClients', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ConnectionRequest.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.ConnectionResponse.FromString, + network_dot_grpc_dot_fedn__pb2.ConnectionRequest.SerializeToString, + network_dot_grpc_dot_fedn__pb2.ConnectionResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @@ -544,8 +544,8 @@ def SendHeartbeat(request, timeout=None, metadata=None): return grpc.experimental.unary_unary(request, target, '/fedn.Connector/SendHeartbeat', - fedn_dot_network_dot_grpc_dot_fedn__pb2.Heartbeat.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.FromString, + network_dot_grpc_dot_fedn__pb2.Heartbeat.SerializeToString, + network_dot_grpc_dot_fedn__pb2.Response.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @@ -561,8 +561,8 @@ def ReassignClient(request, timeout=None, metadata=None): return grpc.experimental.unary_unary(request, target, '/fedn.Connector/ReassignClient', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ReassignRequest.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.FromString, + network_dot_grpc_dot_fedn__pb2.ReassignRequest.SerializeToString, + network_dot_grpc_dot_fedn__pb2.Response.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @@ -578,8 +578,8 @@ def ReconnectClient(request, timeout=None, metadata=None): return grpc.experimental.unary_unary(request, target, '/fedn.Connector/ReconnectClient', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ReconnectRequest.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.FromString, + network_dot_grpc_dot_fedn__pb2.ReconnectRequest.SerializeToString, + network_dot_grpc_dot_fedn__pb2.Response.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @@ -595,18 +595,18 @@ def __init__(self, channel): """ self.TaskStream = channel.unary_stream( '/fedn.Combiner/TaskStream', - request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.TaskRequest.FromString, + request_serializer=network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, + response_deserializer=network_dot_grpc_dot_fedn__pb2.TaskRequest.FromString, ) self.SendModelUpdate = channel.unary_unary( '/fedn.Combiner/SendModelUpdate', - request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelUpdate.SerializeToString, - response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.FromString, + request_serializer=network_dot_grpc_dot_fedn__pb2.ModelUpdate.SerializeToString, + response_deserializer=network_dot_grpc_dot_fedn__pb2.Response.FromString, ) self.SendModelValidation = channel.unary_unary( '/fedn.Combiner/SendModelValidation', - request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelValidation.SerializeToString, - response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.FromString, + request_serializer=network_dot_grpc_dot_fedn__pb2.ModelValidation.SerializeToString, + response_deserializer=network_dot_grpc_dot_fedn__pb2.Response.FromString, ) @@ -637,18 +637,18 @@ def add_CombinerServicer_to_server(servicer, server): rpc_method_handlers = { 'TaskStream': grpc.unary_stream_rpc_method_handler( servicer.TaskStream, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.TaskRequest.SerializeToString, + request_deserializer=network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.FromString, + response_serializer=network_dot_grpc_dot_fedn__pb2.TaskRequest.SerializeToString, ), 'SendModelUpdate': grpc.unary_unary_rpc_method_handler( servicer.SendModelUpdate, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelUpdate.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.SerializeToString, + request_deserializer=network_dot_grpc_dot_fedn__pb2.ModelUpdate.FromString, + response_serializer=network_dot_grpc_dot_fedn__pb2.Response.SerializeToString, ), 'SendModelValidation': grpc.unary_unary_rpc_method_handler( servicer.SendModelValidation, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelValidation.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.SerializeToString, + request_deserializer=network_dot_grpc_dot_fedn__pb2.ModelValidation.FromString, + response_serializer=network_dot_grpc_dot_fedn__pb2.Response.SerializeToString, ), } generic_handler = grpc.method_handlers_generic_handler( @@ -672,8 +672,8 @@ def TaskStream(request, timeout=None, metadata=None): return grpc.experimental.unary_stream(request, target, '/fedn.Combiner/TaskStream', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.TaskRequest.FromString, + network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, + network_dot_grpc_dot_fedn__pb2.TaskRequest.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @@ -689,8 +689,8 @@ def SendModelUpdate(request, timeout=None, metadata=None): return grpc.experimental.unary_unary(request, target, '/fedn.Combiner/SendModelUpdate', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelUpdate.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.FromString, + network_dot_grpc_dot_fedn__pb2.ModelUpdate.SerializeToString, + network_dot_grpc_dot_fedn__pb2.Response.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @@ -706,7 +706,7 @@ def SendModelValidation(request, timeout=None, metadata=None): return grpc.experimental.unary_unary(request, target, '/fedn.Combiner/SendModelValidation', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelValidation.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.FromString, + network_dot_grpc_dot_fedn__pb2.ModelValidation.SerializeToString, + network_dot_grpc_dot_fedn__pb2.Response.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) From 7ad8c57d2ec410b71763df1d91cacc65e97d9037 Mon Sep 17 00:00:00 2001 From: Niklas Date: Wed, 19 Jun 2024 12:15:28 +0200 Subject: [PATCH 3/5] client should pass id in heart beat and combiner should store that id in db --- fedn/network/clients/client.py | 2 +- fedn/network/combiner/combiner.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/fedn/network/clients/client.py b/fedn/network/clients/client.py index d257a4bab..c7b8d0329 100644 --- a/fedn/network/clients/client.py +++ b/fedn/network/clients/client.py @@ -697,7 +697,7 @@ def _send_heartbeat(self, update_frequency=2.0): :rtype: None """ while True: - heartbeat = fedn.Heartbeat(sender=fedn.Client(name=self.name, role=fedn.WORKER)) + heartbeat = fedn.Heartbeat(sender=fedn.Client(name=self.name, role=fedn.WORKER, client_id=self.id)) try: self.connectorStub.SendHeartbeat(heartbeat, metadata=self.metadata) self._missed_heartbeat = 0 diff --git a/fedn/network/combiner/combiner.py b/fedn/network/combiner/combiner.py index 41a0cf84a..6681da603 100644 --- a/fedn/network/combiner/combiner.py +++ b/fedn/network/combiner/combiner.py @@ -128,7 +128,7 @@ def __init__(self, config): # Set the status to offline for previous clients. previous_clients = self.statestore.clients.find({"combiner": config["name"]}) for client in previous_clients: - self.statestore.set_client({"name": client["name"], "status": "offline"}) + self.statestore.set_client({"name": client["name"], "status": "offline", "client_id": client["client_id"]}) self.modelservice = ModelService() @@ -575,7 +575,7 @@ def SendHeartbeat(self, heartbeat: fedn.Heartbeat, context): # Update the clients dict with the last seen timestamp. client = heartbeat.sender self.__join_client(client) - self.clients[client.name]["lastseen"] = datetime.now() + self.clients[client.client_id]["lastseen"] = datetime.now() response = fedn.Response() response.sender.name = heartbeat.sender.name @@ -612,7 +612,7 @@ def TaskStream(self, response, context): # Set client status to online self.clients[client.client_id]["status"] = "online" - self.statestore.set_client({"name": client.name, "status": "online"}) + self.statestore.set_client({"name": client.name, "status": "online", "client_id": client.client_id}) # Keep track of the time context has been active start_time = time.time() From 7f7124c8dcc5dbe2694cbc296a5f05d53f0485a4 Mon Sep 17 00:00:00 2001 From: Niklas Date: Wed, 19 Jun 2024 14:48:10 +0200 Subject: [PATCH 4/5] fixed offline & Invalid date issue --- fedn/network/combiner/combiner.py | 10 +++++----- fedn/network/storage/statestore/mongostatestore.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/fedn/network/combiner/combiner.py b/fedn/network/combiner/combiner.py index 6681da603..afd9e4d67 100644 --- a/fedn/network/combiner/combiner.py +++ b/fedn/network/combiner/combiner.py @@ -267,7 +267,7 @@ def __join_client(self, client): """ if client.client_id not in self.clients.keys(): # The status is set to offline by default, and will be updated once _list_active_clients is called. - self.clients[client.client_id] = {"lastseen": datetime.now(), "status": "offline"} + self.clients[client.client_id] = {"last_seen": datetime.now(), "status": "offline"} def _subscribe_client_to_queue(self, client, queue_name): """Subscribe a client to the queue. @@ -329,7 +329,7 @@ def _list_active_clients(self, channel): for client in self._list_subscribed_clients(channel): status = self.clients[client]["status"] now = datetime.now() - then = self.clients[client]["lastseen"] + then = self.clients[client]["last_seen"] if (now - then) < timedelta(seconds=10): clients["active_clients"].append(client) # If client has changed status, update statestore @@ -575,7 +575,7 @@ def SendHeartbeat(self, heartbeat: fedn.Heartbeat, context): # Update the clients dict with the last seen timestamp. client = heartbeat.sender self.__join_client(client) - self.clients[client.client_id]["lastseen"] = datetime.now() + self.clients[client.client_id]["last_seen"] = datetime.now() response = fedn.Response() response.sender.name = heartbeat.sender.name @@ -612,14 +612,14 @@ def TaskStream(self, response, context): # Set client status to online self.clients[client.client_id]["status"] = "online" - self.statestore.set_client({"name": client.name, "status": "online", "client_id": client.client_id}) + self.statestore.set_client({"name": client.name, "status": "online", "client_id": client.client_id, "last_seen": datetime.now()}) # Keep track of the time context has been active start_time = time.time() while context.is_active(): # Check if the context has been active for more than 10 seconds if time.time() - start_time > 10: - self.clients[client.client_id]["lastseen"] = datetime.now() + self.clients[client.client_id]["last_seen"] = datetime.now() # Reset the start time start_time = time.time() try: diff --git a/fedn/network/storage/statestore/mongostatestore.py b/fedn/network/storage/statestore/mongostatestore.py index 6bf3be4ff..71738d565 100644 --- a/fedn/network/storage/statestore/mongostatestore.py +++ b/fedn/network/storage/statestore/mongostatestore.py @@ -924,7 +924,7 @@ def update_client_status(self, clients, status): :return: None """ datetime_now = datetime.now() - filter_query = {"name": {"$in": clients}} + filter_query = {"client_id": {"$in": clients}} update_query = {"$set": {"last_seen": datetime_now, "status": status}} self.clients.update_many(filter_query, update_query) From 55a22312160ee539b922bfb386383295fb3f22f1 Mon Sep 17 00:00:00 2001 From: Niklas Date: Tue, 25 Jun 2024 11:40:32 +0200 Subject: [PATCH 5/5] added index for client_id & made sure client_id is used for /add_client as well --- fedn/network/api/interface.py | 5 +++-- fedn/network/api/network.py | 4 ++-- fedn/network/clients/connect.py | 2 +- fedn/network/storage/statestore/mongostatestore.py | 13 +++++++------ 4 files changed, 13 insertions(+), 11 deletions(-) diff --git a/fedn/network/api/interface.py b/fedn/network/api/interface.py index c50adf8e2..8a2abe92e 100644 --- a/fedn/network/api/interface.py +++ b/fedn/network/api/interface.py @@ -554,7 +554,7 @@ def add_combiner(self, combiner_id, secure_grpc, address, remote_addr, fqdn, por return jsonify(payload) - def add_client(self, client_id, preferred_combiner, remote_addr): + def add_client(self, client_id, preferred_combiner, remote_addr, name): """Add a client to the network. :param client_id: The client id to add. @@ -600,7 +600,8 @@ def add_client(self, client_id, preferred_combiner, remote_addr): ) client_config = { - "name": client_id, + "client_id": client_id, + "name": name, "combiner_preferred": preferred_combiner, "combiner": combiner.name, "ip": remote_addr, diff --git a/fedn/network/api/network.py b/fedn/network/api/network.py index 045f8aa34..5e2f2ef91 100644 --- a/fedn/network/api/network.py +++ b/fedn/network/api/network.py @@ -113,10 +113,10 @@ def add_client(self, client): :type client: dict :return: None """ - if self.get_client(client["name"]): + if self.get_client(client["client_id"]): return - logger.info("adding client {}".format(client["name"])) + logger.info("adding client {}".format(client["client_id"])) self.statestore.set_client(client) def get_client(self, name): diff --git a/fedn/network/clients/connect.py b/fedn/network/clients/connect.py index 09450c5ab..59aaead35 100644 --- a/fedn/network/clients/connect.py +++ b/fedn/network/clients/connect.py @@ -74,7 +74,7 @@ def assign(self): """ try: retval = None - payload = {"client_id": self.name, "preferred_combiner": self.preferred_combiner} + payload = {"name": self.name, "client_id": self.id, "preferred_combiner": self.preferred_combiner} retval = requests.post( self.connect_string + FEDN_CUSTOM_URL_PREFIX + "/add_client", json=payload, diff --git a/fedn/network/storage/statestore/mongostatestore.py b/fedn/network/storage/statestore/mongostatestore.py index fe01dac39..128bcecc4 100644 --- a/fedn/network/storage/statestore/mongostatestore.py +++ b/fedn/network/storage/statestore/mongostatestore.py @@ -81,6 +81,7 @@ def connect(self): def init_index(self): self.package.create_index([("id", pymongo.DESCENDING)]) + self.clients.create_index([("client_id", pymongo.DESCENDING)]) def is_inited(self): """Check if the statestore is intialized. @@ -726,18 +727,18 @@ def set_client(self, client_data): :return: """ client_data["updated_at"] = str(datetime.now()) - self.clients.update_one({"name": client_data["name"]}, {"$set": client_data}, True) + self.clients.update_one({"client_id": client_data["client_id"]}, {"$set": client_data}, True) - def get_client(self, name): - """Get client by name. + def get_client(self, client_id): + """Get client by client_id. - :param name: name of client to get. - :type name: str + :param client_id: client_id of client to get. + :type client_id: str :return: The client. None if not found. :rtype: ObjectId """ try: - ret = self.clients.find({"key": name}) + ret = self.clients.find({"key": client_id}) if list(ret) == []: return None else: