From 5888067c899f9d60e024008e6ac716fa863d9c65 Mon Sep 17 00:00:00 2001 From: ShixiongQi Date: Mon, 3 Jun 2024 15:04:55 -0700 Subject: [PATCH] Optimizing eager aggregation - reduce deepcopy ops (#588) --- docs/prerequisites.md | 4 ++-- .../mode/horizontal/eager_syncfl/middle_aggregator.py | 11 ++++++----- .../mode/horizontal/eager_syncfl/top_aggregator.py | 7 ++++--- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/docs/prerequisites.md b/docs/prerequisites.md index dc430c25e..d01fcdea1 100644 --- a/docs/prerequisites.md +++ b/docs/prerequisites.md @@ -86,7 +86,7 @@ Python 3.9.6 The target runtime environment is Linux. Development has been mainly conducted under macOS environment. This section describes how to set up a development environment in macOS (Intel chip) and Ubuntu. The following tools and packages are needed as minimum: -- go 1.18+ +- go 1.22+ - golangci-lint After installing above packages, you could try a development setup called `fiab`, an acronym for flame-in-a-box, which is found [here](system/fiab.md). @@ -112,7 +112,7 @@ sudo apt update Install golang and and golangci-lint. ```bash -golang_file=go1.18.6.linux-amd64.tar.gz +golang_file=go1.22.3.linux-amd64.tar.gz curl -LO https://go.dev/dl/$golang_file && tar -C $HOME -xzf $golang_file echo "PATH=\"\$HOME/go/bin:\$PATH\"" >> $HOME/.bashrc source $HOME/.bashrc diff --git a/lib/python/flame/mode/horizontal/eager_syncfl/middle_aggregator.py b/lib/python/flame/mode/horizontal/eager_syncfl/middle_aggregator.py index 13b99783d..2c464bfa9 100644 --- a/lib/python/flame/mode/horizontal/eager_syncfl/middle_aggregator.py +++ b/lib/python/flame/mode/horizontal/eager_syncfl/middle_aggregator.py @@ -37,6 +37,7 @@ def _aggregate_weights(self, tag: str) -> None: return total = 0 + base_weights = deepcopy(self.weights) for msg, metadata in channel.recv_fifo(channel.ends()): end, _ = metadata @@ -60,18 +61,18 @@ def _aggregate_weights(self, tag: str) -> None: # optimizer conducts optimization (in this case, aggregation) global_weights = self.optimizer.do( - deepcopy(self.weights), self.cache, total=total + base_weights, self.cache, total=total ) if global_weights is None: logger.debug("failed model aggregation") time.sleep(1) return - # save global weights before updating it - self.prev_weights = self.weights + # save global weights before updating it + self.prev_weights = self.weights - # set global weights - self.weights = global_weights + # set global weights + self.weights = global_weights logger.debug(f"received {len(self.cache)} trainer updates in cache") diff --git a/lib/python/flame/mode/horizontal/eager_syncfl/top_aggregator.py b/lib/python/flame/mode/horizontal/eager_syncfl/top_aggregator.py index 6fae0b87d..8ae95ccf6 100644 --- a/lib/python/flame/mode/horizontal/eager_syncfl/top_aggregator.py +++ b/lib/python/flame/mode/horizontal/eager_syncfl/top_aggregator.py @@ -39,6 +39,7 @@ def _aggregate_weights(self, tag: str) -> None: return total = 0 + base_weights = deepcopy(self.weights) for msg, metadata in channel.recv_fifo(channel.ends()): end, timestamp = metadata @@ -72,7 +73,7 @@ def _aggregate_weights(self, tag: str) -> None: # optimizer conducts optimization (in this case, aggregation) global_weights = self.optimizer.do( - deepcopy(self.weights), + base_weights, self.cache, total=total, num_trainers=len(channel.ends()), @@ -82,8 +83,8 @@ def _aggregate_weights(self, tag: str) -> None: time.sleep(1) return - # set global weights - self.weights = global_weights + # set global weights + self.weights = global_weights # update model with global weights self._update_model()