From 0af7a7c9e1e94772c69fb7f1b0152d649525ace5 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Thu, 16 May 2024 15:42:40 +0000 Subject: [PATCH 1/3] add binary helper --- fedn/utils/helpers/helperbase.py | 9 --- fedn/utils/helpers/plugins/binaryhelper.py | 16 +++++ fedn/utils/helpers/plugins/numpyhelper.py | 77 +++++++++++++++++----- 3 files changed, 76 insertions(+), 26 deletions(-) create mode 100644 fedn/utils/helpers/plugins/binaryhelper.py diff --git a/fedn/utils/helpers/helperbase.py b/fedn/utils/helpers/helperbase.py index 109ab2a47..20b686bca 100644 --- a/fedn/utils/helpers/helperbase.py +++ b/fedn/utils/helpers/helperbase.py @@ -40,12 +40,3 @@ def load(self, fh): :return: Weights in array-like format. """ pass - - def get_tmp_path(self): - """Return a temporary output path compatible with save_model, load_model. - - :return: Path to file. - """ - fd, path = tempfile.mkstemp(suffix=".npz") - os.close(fd) - return path diff --git a/fedn/utils/helpers/plugins/binaryhelper.py b/fedn/utils/helpers/plugins/binaryhelper.py new file mode 100644 index 000000000..4e7ae8fbb --- /dev/null +++ b/fedn/utils/helpers/plugins/binaryhelper.py @@ -0,0 +1,16 @@ +from fedn.utils.helpers.plugins.numpyhelper import Helper + + +class Helper(Helper): + """FEDn helper class for models weights/parameters that can be transformed to numpy ndarrays.""" + + def __init__(self): + """Initialize helper.""" + super().__init__() + self.name = "binaryhelper" + + def load(self, path, file_type="raw_binary"): + return super().load(path, file_type) + + def save(self, model, path=None, file_type="raw_binary"): + return super().save(model, path, file_type) diff --git a/fedn/utils/helpers/plugins/numpyhelper.py b/fedn/utils/helpers/plugins/numpyhelper.py index 822ce929e..59418df29 100644 --- a/fedn/utils/helpers/plugins/numpyhelper.py +++ b/fedn/utils/helpers/plugins/numpyhelper.py @@ -1,3 +1,7 @@ +import os +import tempfile +from io import BytesIO + import numpy as np from fedn.utils.helpers.helperbase import HelperBase @@ -137,33 +141,72 @@ def ones(self, m1, a): res.append(np.ones(np.shape(x)) * a) return res - def save(self, weights, path=None): + def save(self, weights, path=None, file_type="npz"): """Serialize weights to file. The serialized model must be a single binary object. :param weights: List of weights in numpy format. :param path: Path to file. + :param file_type: File type to save to. Can be 'npz' or 'raw_binary'. Default is 'npz'. :return: Path to file. """ - if not path: - path = self.get_tmp_path() + self.check_supported_file_type(file_type) + + if file_type == "npz": + if not path: + path = self.get_tmp_path() + + weights_dict = {} + for i, w in enumerate(weights): + weights_dict[str(i)] = w + + np.savez_compressed(path, **weights_dict) + return path + else: + if not path: + path = self.get_tmp_path(suffix=".bin") + weights = np.concatenate(weights) + weights.tofile(path) + return path + + def load(self, path, file_type="npz"): + """Load weights from file or filelike. - weights_dict = {} - for i, w in enumerate(weights): - weights_dict[str(i)] = w + :param path: file path, filehandle, filelike. + :return: List of weights in numpy format. + """ + self.check_supported_file_type(file_type) + weights = [] + if file_type == "npz": + a = np.load(path) + for i in range(len(a.files)): + weights.append(a[str(i)]) + else: + if isinstance(path, BytesIO): + a = np.frombuffer(path.read(), dtype=np.float64) + else: + a = np.fromfile(path, dtype=np.float64) + weights.append(a) + return weights - np.savez_compressed(path, **weights_dict) + def get_tmp_path(self, suffix=".npz"): + """Return a temporary output path compatible with save_model, load_model. + :param suffix: File suffix. + :return: Path to file. + """ + fd, path = tempfile.mkstemp(suffix=suffix) + os.close(fd) return path - def load(self, fh): - """Load weights from file or filelike. + def check_supported_file_type(self, file_type): + """Check if the file type is supported. - :param fh: file path, filehandle, filelike. - :return: List of weights in numpy format. + :param file_type: File type to check. + :type file_type: str + :return: True if supported, False otherwise. + :rtype: bool """ - a = np.load(fh) - - weights = [] - for i in range(len(a.files)): - weights.append(a[str(i)]) - return weights + supported_file_types = ["npz", "raw_binary"] + if file_type not in supported_file_types: + raise ValueError("File type not supported. Supported types are: {}".format(supported_file_types)) + return True From 4e7690243814d7810838363d30a0b03adfbad366 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Fri, 17 May 2024 06:34:06 +0000 Subject: [PATCH 2/3] add traceback --- fedn/network/combiner/aggregators/fedavg.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/fedn/network/combiner/aggregators/fedavg.py b/fedn/network/combiner/aggregators/fedavg.py index 9ed0adf3c..0d965cfa0 100644 --- a/fedn/network/combiner/aggregators/fedavg.py +++ b/fedn/network/combiner/aggregators/fedavg.py @@ -1,9 +1,11 @@ +import traceback + from fedn.common.log_config import logger from fedn.network.combiner.aggregators.aggregatorbase import AggregatorBase class Aggregator(AggregatorBase): - """ Local SGD / Federated Averaging (FedAvg) aggregator. Computes a weighted mean + """Local SGD / Federated Averaging (FedAvg) aggregator. Computes a weighted mean of parameter updates. :param id: A reference to id of :class: `fedn.network.combiner.Combiner` @@ -48,8 +50,7 @@ def combine_models(self, helper=None, delete_models=True, parameters=None): nr_aggregated_models = 0 total_examples = 0 - logger.info( - "AGGREGATOR({}): Aggregating model updates... ".format(self.name)) + logger.info("AGGREGATOR({}): Aggregating model updates... ".format(self.name)) while not self.model_updates.empty(): try: @@ -61,8 +62,7 @@ def combine_models(self, helper=None, delete_models=True, parameters=None): logger.info("AGGREGATOR({}): Loading model metadata {}.".format(self.name, model_update.model_update_id)) model_next, metadata = self.load_model_update(model_update, helper) - logger.info( - "AGGREGATOR({}): Processing model update {}, metadata: {} ".format(self.name, model_update.model_update_id, metadata)) + logger.info("AGGREGATOR({}): Processing model update {}, metadata: {} ".format(self.name, model_update.model_update_id, metadata)) # Increment total number of examples total_examples += metadata["num_examples"] @@ -70,19 +70,18 @@ def combine_models(self, helper=None, delete_models=True, parameters=None): if nr_aggregated_models == 0: model = model_next else: - model = helper.increment_average( - model, model_next, metadata["num_examples"], total_examples) + model = helper.increment_average(model, model_next, metadata["num_examples"], total_examples) nr_aggregated_models += 1 # Delete model from storage if delete_models: self.modelservice.temp_model_storage.delete(model_update.model_update_id) - logger.info( - "AGGREGATOR({}): Deleted model update {} from storage.".format(self.name, model_update.model_update_id)) + logger.info("AGGREGATOR({}): Deleted model update {} from storage.".format(self.name, model_update.model_update_id)) self.model_updates.task_done() except Exception as e: - logger.error( - "AGGREGATOR({}): Error encoutered while processing model update {}, skipping this update.".format(self.name, e)) + tb = traceback.format_exc() + logger.error(f"AGGREGATOR({self.name}): Error encoutered while processing model update: {e}") + logger.error(tb) self.model_updates.task_done() data["nr_aggregated_models"] = nr_aggregated_models From 6f7452f6d6f90457d8a881e7125afb0211a64d1c Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Fri, 17 May 2024 08:57:37 +0200 Subject: [PATCH 3/3] fix --- fedn/utils/helpers/helperbase.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/fedn/utils/helpers/helperbase.py b/fedn/utils/helpers/helperbase.py index 20b686bca..256e3a367 100644 --- a/fedn/utils/helpers/helperbase.py +++ b/fedn/utils/helpers/helperbase.py @@ -1,5 +1,3 @@ -import os -import tempfile from abc import ABC, abstractmethod