Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/SK-634 | Add logging to controller #506

Merged
merged 3 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
## Examples
The examples distributed here in this folder are regularly maintained by Scaleout and are part of the continuous integrations tests.
The examples distributed here in this folder are maintained by Scaleout.

### External examples
Below we maintain a list of examples provided both by the Scaleout core team and users. They may or may not be tested with the latest release of FEDn, please refer to the README of each specific project/example for detail. If you have a project that you want to include in this list, talk to a core developer in [Discord](https://discord.gg/CCRgjpMsVA).
Below we maintain a list of examples provided both by the Scaleout core team and users. They may or may not be tested with the latest release of FEDn, please refer to the README of each specific project/example for detail.
If you have a project that you want to include in this list, talk to a core developer in [Discord](https://discord.gg/CCRgjpMsVA).

- [C++ version of the MNIST example](https://github.com/scaleoutsystems/examples) Also executes in Intel SGX TEE via Gramine.
- [Human activity recognition use case from the FEDn paper](https://github.com/scaleoutsystems/examples) IoT/cross-device.
Expand Down
4 changes: 2 additions & 2 deletions examples/async-simulation/Experiment.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,14 @@
},
{
"cell_type": "code",
"execution_count": 70,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

??

"execution_count": 74,
"id": "f0380d35",
"metadata": {},
"outputs": [],
"source": [
"session_config_fedavg = {\n",
" \"helper\": \"numpyhelper\",\n",
" \"session_id\": \"experiment_fedavg4\",\n",
" \"session_id\": \"experiment_fedavg6\",\n",
" \"aggregator\": \"fedavg\",\n",
" \"model_id\": seed_model['model_id'],\n",
" \"rounds\": 1,\n",
Expand Down
15 changes: 15 additions & 0 deletions examples/async-simulation/docker-compose.override.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Compose schema version
version: '3.3'

# Overriding requirements
services:
client:
build:
args:
REQUIREMENTS: examples/async-simulation/requirements.txt
deploy:
replicas: 2
volumes:
- ${HOST_REPO_DIR:-.}/fedn:/app/fedn
- ${HOST_REPO_DIR:-.}/examples/async-simulation/data:/var/data
- /var/run/docker.sock:/var/run/docker.sock
32 changes: 12 additions & 20 deletions fedn/fedn/network/combiner/roundhandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ def push_round_config(self, round_config):
round_config['_job_id'] = str(uuid.uuid4())
self.round_configs.put(round_config)
except Exception:
logger.warning(
"ROUNDCONTROL: Failed to push round config.")
logger.error("Failed to push round config.")
raise
return round_config['_job_id']

Expand Down Expand Up @@ -100,9 +99,7 @@ def load_model_update_str(self, model_id, retry=3):
while tries < retry:
tries += 1
if not model_str or sys.getsizeof(model_str) == 80:
logger.warning(
"ROUNDCONTROL: Model download failed. retrying")

logger.warning("Model download failed. retrying")
time.sleep(1)
model_str = self.modelservice.get_model(model_id)

Expand Down Expand Up @@ -170,7 +167,7 @@ def _training_round(self, config, clients):

try:
helper = get_helper(config['helper_type'])
logger.info("ROUNDCONTROL: Config delete_models_storage: {}".format(config['delete_models_storage']))
logger.info("Config delete_models_storage: {}".format(config['delete_models_storage']))
if config['delete_models_storage'] == 'True':
delete_models = True
else:
Expand Down Expand Up @@ -209,9 +206,9 @@ def stage_model(self, model_id, timeout_retry=3, retry=2):

# If the model is already in memory at the server we do not need to do anything.
if self.modelservice.temp_model_storage.exist(model_id):
logger.info("ROUNDCONTROL: Model already exists in memory, skipping model staging.")
logger.info("Model already exists in memory, skipping model staging.")
return
logger.info("ROUNDCONTROL: Model Staging, fetching model from storage...")
logger.info("Model Staging, fetching model from storage...")
# If not, download it and stage it in memory at the combiner.
tries = 0
while True:
Expand All @@ -220,12 +217,11 @@ def stage_model(self, model_id, timeout_retry=3, retry=2):
if model:
break
except Exception:
logger.info("ROUNDCONTROL: Could not fetch model from storage backend, retrying.")
logger.warning("Could not fetch model from storage backend, retrying.")
time.sleep(timeout_retry)
tries += 1
if tries > retry:
logger.info(
"ROUNDCONTROL: Failed to stage model {} from storage backend!".format(model_id))
logger.error("Failed to stage model {} from storage backend!".format(model_id))
raise

self.modelservice.set_model(model, model_id)
Expand All @@ -246,8 +242,7 @@ def _assign_round_clients(self, n, type="trainers"):
elif type == "trainers":
clients = self.server.get_active_trainers()
else:
logger.info(
"ROUNDCONTROL(ERROR): {} is not a supported type of client".format(type))
logger.error("(ERROR): {} is not a supported type of client".format(type))
raise

# If the number of requested trainers exceeds the number of available, use all available.
Expand Down Expand Up @@ -315,8 +310,7 @@ def execute_training_round(self, config):
:rtype: dict
"""

logger.info(
"ROUNDCONTROL: Processing training round, job_id {}".format(config['_job_id']))
logger.info("Processing training round, job_id {}".format(config['_job_id']))

data = {}
data['config'] = config
Expand All @@ -341,7 +335,7 @@ def execute_training_round(self, config):
data['model_id'] = model_id

logger.info(
"ROUNDCONTROL: TRAINING ROUND COMPLETED. Aggregated model id: {}, Job id: {}".format(model_id, config['_job_id']))
"TRAINING ROUND COMPLETED. Aggregated model id: {}, Job id: {}".format(model_id, config['_job_id']))

# Delete temp model
self.modelservice.temp_model_storage.delete(config['model_id'])
Expand Down Expand Up @@ -374,14 +368,12 @@ def run(self, polling_interval=1.0):
elif round_config['task'] == 'validation' or round_config['task'] == 'inference':
self.execute_validation_round(round_config)
else:
logger.warning(
"ROUNDCONTROL: Round config contains unkown task type.")
logger.warning("config contains unkown task type.")
else:
round_meta = {}
round_meta['status'] = "Failed"
round_meta['reason'] = "Failed to meet client allocation requirements for this round config."
logger.warning(
"ROUNDCONTROL: {0}".format(round_meta['reason']))
logger.warning("{0}".format(round_meta['reason']))

self.round_configs.task_done()
except queue.Empty:
Expand Down
Loading
Loading