From 68cdeda7190a200ef71da985abc9866176ad00bd Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Fri, 1 Dec 2023 16:03:36 +0000 Subject: [PATCH] progress --- fedn/fedn/network/api/interface.py | 11 ++++++++++- .../network/combiner/aggregators/aggregatorbase.py | 11 ++++------- fedn/fedn/network/combiner/aggregators/fedavg.py | 8 +++----- fedn/fedn/network/combiner/server.py | 7 +++++-- fedn/fedn/network/controller/controlbase.py | 13 ++++++------- 5 files changed, 28 insertions(+), 22 deletions(-) diff --git a/fedn/fedn/network/api/interface.py b/fedn/fedn/network/api/interface.py index 9cb07a950..3cea06668 100644 --- a/fedn/fedn/network/api/interface.py +++ b/fedn/fedn/network/api/interface.py @@ -794,11 +794,20 @@ def start_session( {"success": False, "message": "A session is already running."} ) + # Check that initial (seed) model is set + if not self.statestore.get_initial_model(): + return jsonify( + { + "success": False, + "message": "No initial model set. Set initial model before starting session.", + } + ) + # Check available clients per combiner clients_available = 0 for combiner in self.control.network.get_combiners(): try: - nr_active_clients = len(combiner.get_active_clients()) + nr_active_clients = len(combiner.list_active_clients()) clients_available = clients_available + int(nr_active_clients) except CombinerUnavailableError as e: # TODO: Handle unavailable combiner, stop session or continue? diff --git a/fedn/fedn/network/combiner/aggregators/aggregatorbase.py b/fedn/fedn/network/combiner/aggregators/aggregatorbase.py index 7e5665f44..b74390c72 100644 --- a/fedn/fedn/network/combiner/aggregators/aggregatorbase.py +++ b/fedn/fedn/network/combiner/aggregators/aggregatorbase.py @@ -3,7 +3,6 @@ import queue from abc import ABC, abstractmethod -import fedn.common.net.grpc.fedn_pb2 as fedn from fedn.common.log_config import logger AGGREGATOR_PLUGIN_PATH = "fedn.network.combiner.aggregators.{}" @@ -61,8 +60,7 @@ def on_model_update(self, model_update): :type model_id: str """ try: - logger.info("AGGREGATOR({}): callback received model update {}".format(self.name, model_update.model_update_id), - log_level=fedn.Status.INFO) + logger.info("AGGREGATOR({}): callback received model update {}".format(self.name, model_update.model_update_id)) # Validate the update and metadata valid_update = self._validate_model_update(model_update) @@ -70,10 +68,9 @@ def on_model_update(self, model_update): # Push the model update to the processing queue self.model_updates.put(model_update) else: - logger.info("AGGREGATOR({}): Invalid model update, skipping.".format(self.name)) + logger.warning("AGGREGATOR({}): Invalid model update, skipping.".format(self.name)) except Exception as e: - logger.info("AGGREGATOR({}): Failed to receive model update! {}".format(self.name, e), - log_level=fedn.Status.WARNING) + logger.error("AGGREGATOR({}): Failed to receive model update! {}".format(self.name, e)) pass def _validate_model_update(self, model_update): @@ -87,7 +84,7 @@ def _validate_model_update(self, model_update): # TODO: Validate the metadata to check that it contains all variables assumed by the aggregator. data = json.loads(model_update.meta)['training_metadata'] if 'num_examples' not in data.keys(): - logger.info("AGGREGATOR({}): Model validation failed, num_examples missing in metadata.".format(self.name)) + logger.error("AGGREGATOR({}): Model validation failed, num_examples missing in metadata.".format(self.name)) return False return True diff --git a/fedn/fedn/network/combiner/aggregators/fedavg.py b/fedn/fedn/network/combiner/aggregators/fedavg.py index ddc4f1bfd..ae479676d 100644 --- a/fedn/fedn/network/combiner/aggregators/fedavg.py +++ b/fedn/fedn/network/combiner/aggregators/fedavg.py @@ -1,6 +1,5 @@ -import fedn.common.net.grpc.fedn_pb2 as fedn -from fedn.network.combiner.aggregators.aggregatorbase import AggregatorBase from fedn.common.log_config import logger +from fedn.network.combiner.aggregators.aggregatorbase import AggregatorBase class Aggregator(AggregatorBase): @@ -78,12 +77,11 @@ def combine_models(self, helper=None, time_window=180, max_nr_models=100, delete "AGGREGATOR({}): Deleted model update {} from storage.".format(self.name, model_id)) self.model_updates.task_done() except Exception as e: - logger.info( + logger.error( "AGGREGATOR({}): Error encoutered while processing model update {}, skipping this update.".format(self.name, e)) self.model_updates.task_done() data['nr_aggregated_models'] = nr_aggregated_models - logger.info("AGGREGATOR({}): Aggregation completed, aggregated {} models.".format(self.name, nr_aggregated_models), - log_level=fedn.Status.INFO) + logger.info("AGGREGATOR({}): Aggregation completed, aggregated {} models.".format(self.name, nr_aggregated_models)) return model, data diff --git a/fedn/fedn/network/combiner/server.py b/fedn/fedn/network/combiner/server.py index ac418d574..e90c998fd 100644 --- a/fedn/fedn/network/combiner/server.py +++ b/fedn/fedn/network/combiner/server.py @@ -166,7 +166,7 @@ def request_model_update(self, config, clients=[]): # The request to be added to the client queue request = fedn.ModelUpdateRequest() request.model_id = config['model_id'] - request.correlation_id = str(uuid.uuid4()) # Obesolete? + request.correlation_id = str(uuid.uuid4()) request.timestamp = str(datetime.now()) request.data = json.dumps(config) @@ -174,7 +174,7 @@ def request_model_update(self, config, clients=[]): clients = self.get_active_trainers() for client in clients: - request.receiver.name = client.name + request.receiver.name = client request.receiver.role = fedn.WORKER self._put_request_to_client_queue(request, fedn.Channel.MODEL_UPDATE_REQUESTS) @@ -592,6 +592,9 @@ def ModelUpdateRequestStream(self, response, context): yield q.get(timeout=1.0) except queue.Empty: pass + except Exception as e: + logger.error("Error in ModelUpdateRequestStream: {}".format(e)) + break self.tracer.update_client_status(client.name, "offline") diff --git a/fedn/fedn/network/controller/controlbase.py b/fedn/fedn/network/controller/controlbase.py index 71f660e24..83fffed28 100644 --- a/fedn/fedn/network/controller/controlbase.py +++ b/fedn/fedn/network/controller/controlbase.py @@ -305,14 +305,13 @@ def get_participating_combiners(self, combiner_round_config): nr_active_clients = len(combiner.list_active_clients()) except CombinerUnavailableError: self._handle_unavailable_combiner(combiner) - combiner_state = None + continue - if combiner_state is not None: - is_participating = self.evaluate_round_participation_policy( - combiner_round_config, nr_active_clients - ) - if is_participating: - combiners.append((combiner, combiner_round_config)) + is_participating = self.evaluate_round_participation_policy( + combiner_round_config, nr_active_clients + ) + if is_participating: + combiners.append((combiner, combiner_round_config)) return combiners def evaluate_round_participation_policy(