Skip to content

Commit

Permalink
Bug/SK-507 | Add functionality to inspect combiner queue state (debug…
Browse files Browse the repository at this point in the history
… mode) (#474)

* Combiner Report now returns the current size of the aggregator model update queue

* Resolve SK-507

* Resolve SK-507

* add doc string

---------

Co-authored-by: Andreas Hellander <[email protected]>
Co-authored-by: Fredrik Wrede <[email protected]>
  • Loading branch information
3 people authored Jun 20, 2023
1 parent dd416c1 commit 07629ff
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 6 deletions.
8 changes: 8 additions & 0 deletions fedn/fedn/network/combiner/aggregators/aggregatorbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ def next_model_update(self, helper):

return model_next, data, model_id

def get_state(self):
""" Get the state of the aggregator's queue, including the number of model updates."""
state = {
'queue_len': self.model_updates.qsize()

}
return state


def get_aggregator(aggregator_module_name, storage, server, modelservice, control):
""" Return an instance of the helper class.
Expand Down
8 changes: 3 additions & 5 deletions fedn/fedn/network/combiner/round.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,13 +331,11 @@ def execute_training_round(self, config):
model_id = str(uuid.uuid4())
self.modelservice.set_model(a, model_id)
a.close()
data['model_id'] = model_id

data['model_id'] = model_id
self.server.report_status(
"ROUNDCONTROL: TRAINING ROUND COMPLETED. Aggregated model id: {}, Job id: {}".format(model_id, config['_job_id']), flush=True)

print("------------------------------------------")
self.server.report_status(
"ROUNDCONTROL: TRAINING ROUND COMPLETED. Aggregated model id: {}, Job id: {}".format(model_id, config['_job_id']), flush=True)
print("\n")
return data

def run(self, polling_interval=1.0):
Expand Down
10 changes: 9 additions & 1 deletion fedn/fedn/network/combiner/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,15 @@ def Report(self, control: fedn.ControlRequest, context):
"""

response = fedn.ControlResponse()
print("\n RECIEVED **REPORT** from Controller\n", flush=True)
self.report_status("\n RECIEVED **REPORT** from Controller\n",
log_level=fedn.Status.INFO)

control_state = self.control.aggregator.get_state()
self.report_status("Aggregator state: {}".format(control_state), log_level=fedn.Status.INFO)
p = response.parameter.add()
for key, value in control_state.items():
p.key = str(key)
p.value = str(value)

active_trainers = self.get_active_trainers()
p = response.parameter.add()
Expand Down

0 comments on commit 07629ff

Please sign in to comment.