From 07629ff8d17184ead95da62e26e9998a09ab6b8f Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Tue, 20 Jun 2023 15:06:10 +0200 Subject: [PATCH] Bug/SK-507 | Add functionality to inspect combiner queue state (debug mode) (#474) * Combiner Report now returns the current size of the aggregator model update queue * Resolve SK-507 * Resolve SK-507 * add doc string --------- Co-authored-by: Andreas Hellander Co-authored-by: Fredrik Wrede --- .../network/combiner/aggregators/aggregatorbase.py | 8 ++++++++ fedn/fedn/network/combiner/round.py | 8 +++----- fedn/fedn/network/combiner/server.py | 10 +++++++++- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/fedn/fedn/network/combiner/aggregators/aggregatorbase.py b/fedn/fedn/network/combiner/aggregators/aggregatorbase.py index 1a6da8d1a..e075f7142 100644 --- a/fedn/fedn/network/combiner/aggregators/aggregatorbase.py +++ b/fedn/fedn/network/combiner/aggregators/aggregatorbase.py @@ -108,6 +108,14 @@ def next_model_update(self, helper): return model_next, data, model_id + def get_state(self): + """ Get the state of the aggregator's queue, including the number of model updates.""" + state = { + 'queue_len': self.model_updates.qsize() + + } + return state + def get_aggregator(aggregator_module_name, storage, server, modelservice, control): """ Return an instance of the helper class. diff --git a/fedn/fedn/network/combiner/round.py b/fedn/fedn/network/combiner/round.py index 266873608..4a2436fe9 100644 --- a/fedn/fedn/network/combiner/round.py +++ b/fedn/fedn/network/combiner/round.py @@ -331,13 +331,11 @@ def execute_training_round(self, config): model_id = str(uuid.uuid4()) self.modelservice.set_model(a, model_id) a.close() + data['model_id'] = model_id - data['model_id'] = model_id + self.server.report_status( + "ROUNDCONTROL: TRAINING ROUND COMPLETED. Aggregated model id: {}, Job id: {}".format(model_id, config['_job_id']), flush=True) - print("------------------------------------------") - self.server.report_status( - "ROUNDCONTROL: TRAINING ROUND COMPLETED. Aggregated model id: {}, Job id: {}".format(model_id, config['_job_id']), flush=True) - print("\n") return data def run(self, polling_interval=1.0): diff --git a/fedn/fedn/network/combiner/server.py b/fedn/fedn/network/combiner/server.py index 05df34a43..ff0134d6b 100644 --- a/fedn/fedn/network/combiner/server.py +++ b/fedn/fedn/network/combiner/server.py @@ -442,7 +442,15 @@ def Report(self, control: fedn.ControlRequest, context): """ response = fedn.ControlResponse() - print("\n RECIEVED **REPORT** from Controller\n", flush=True) + self.report_status("\n RECIEVED **REPORT** from Controller\n", + log_level=fedn.Status.INFO) + + control_state = self.control.aggregator.get_state() + self.report_status("Aggregator state: {}".format(control_state), log_level=fedn.Status.INFO) + p = response.parameter.add() + for key, value in control_state.items(): + p.key = str(key) + p.value = str(value) active_trainers = self.get_active_trainers() p = response.parameter.add()