Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/async simulation #541

Merged
merged 63 commits into from
Mar 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
b9c33dc
work in progress
Feb 5, 2024
599fb9e
Cleanup, structuring
Mar 2, 2024
cb48448
fixes
Mar 5, 2024
094a368
Merge branch 'master' into feature/async-simulation
Mar 5, 2024
756bf6c
added run_experiment
Mar 5, 2024
e4e1c43
robust untaring of package
Mar 6, 2024
505b66d
robust untaring of package
Mar 6, 2024
3f73d51
latest
Mar 11, 2024
c52512d
Siplify client, merge attach/connect , detach/disconnect
Mar 11, 2024
0278e6f
Handle disconnected clients in send_status
Mar 11, 2024
16cea36
Rename client_config to combiner_config
Mar 11, 2024
a1defef
More error handling
Mar 13, 2024
e8ef743
bugfix, session status not initialized
Mar 13, 2024
dd36ba4
cleaned up _check_nr_round_clients
Mar 13, 2024
cdb523e
get_latest_model and get_model_trail updated to use /api/v1
niklastheman Mar 15, 2024
739039d
list_clients and get_active_clients uses /api/v1
niklastheman Mar 15, 2024
c7c098c
list_combiners and get_combiner uses /api/v1
niklastheman Mar 15, 2024
fc919a8
list_rounds and get_round uses /api/v1
niklastheman Mar 15, 2024
b615688
list_sessions, get_session, session_is_finished uses /api/v1, get_ses…
niklastheman Mar 15, 2024
20aabfc
get_package & list_compute_packages uses /api/v1
niklastheman Mar 15, 2024
9974403
get_events & list_validations uses /api/v1
niklastheman Mar 15, 2024
9f1bcd9
events => statuses, renamed list => get
niklastheman Mar 15, 2024
665d6d4
get_controller_config checksum fix
niklastheman Mar 15, 2024
19f431f
Merge branch 'master' into feature/SK-726
niklastheman Mar 15, 2024
6170d1e
get_model_parameters & download_model added to api client
niklastheman Mar 15, 2024
d88e074
some docstring updates
niklastheman Mar 15, 2024
1a3f332
updated docstrings
niklastheman Mar 16, 2024
9f07f85
latest
Mar 16, 2024
5853752
Merge branch 'master' into feature/async-simulation
Mar 16, 2024
0e73af4
Resolved conflict
Mar 16, 2024
4244bb7
start session bug fix
niklastheman Mar 18, 2024
25af203
latest
Mar 18, 2024
86d342e
Merge remote-tracking branch 'origin/feature/SK-726' into feature/asy…
Mar 18, 2024
2f12d46
get_models_count added
niklastheman Mar 18, 2024
de08e3c
Merge remote-tracking branch 'origin/feature/SK-726' into feature/asy…
Mar 18, 2024
4237525
include_self query param option and X-Reverse header option added to …
niklastheman Mar 18, 2024
bc7962f
Make use of X-Reverse and ?include_self (/api/v1/ancestors) in get_mo…
niklastheman Mar 18, 2024
0195bdc
get_*_count added to all entities (api client)
niklastheman Mar 18, 2024
73a0c6d
isort fix
niklastheman Mar 18, 2024
973269b
linter fix
niklastheman Mar 18, 2024
7e83be2
updated ci tests
niklastheman Mar 18, 2024
007e9f1
Merge remote-tracking branch 'origin/feature/SK-726' into feature/asy…
Mar 18, 2024
4d7beca
set_package_active => set_active_package
niklastheman Mar 18, 2024
fccb687
Merge remote-tracking branch 'origin/feature/SK-726' into feature/asy…
Mar 18, 2024
ebc494b
/models/<id>/parameters - returns array of arrays
niklastheman Mar 18, 2024
81782af
Merge remote-tracking branch 'origin/feature/SK-726' into feature/asy…
Mar 18, 2024
c6233b5
latest
Mar 18, 2024
ea572b1
Merge conflict
Mar 18, 2024
e30ab2a
Updated example notebook
Mar 19, 2024
696ee56
latest
Mar 19, 2024
13fddb9
Merge conflicts
Mar 19, 2024
832fc0f
wip
Mar 20, 2024
12c5fb0
updated notebook
Mar 20, 2024
f80d863
renamed folder
Mar 20, 2024
4b90e3d
wip
Mar 21, 2024
e04d789
clean up unused dependencies
Mar 21, 2024
d9e5543
Add back plotly
Mar 22, 2024
f3d32d5
Merge branch 'feature/clean-up-dependencies' into feature/async-simul…
Mar 22, 2024
b58ad52
Change default learning rate for fedadam, results complete
Mar 23, 2024
012fede
Change default learning rate for fedadam, results complete
Mar 23, 2024
c66280c
Results complete, notbook updated
Mar 23, 2024
45ef69f
Added illustration
Mar 23, 2024
7dcbb8a
Updated example README
Mar 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ data
*.npz
*.tgz
*.tar.gz
*.log
.async-simulation
client.yaml
387 changes: 387 additions & 0 deletions examples/async-clients/Experiment.ipynb

Large diffs are not rendered by default.

77 changes: 77 additions & 0 deletions examples/async-clients/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# ASYNC CLIENTS
This example shows how to experiment with intermittent and asynchronous client workflows.

## Prerequisites
- [Python 3.8, 3.9 or 3.10](https://www.python.org/downloads)
- [Docker](https://docs.docker.com/get-docker)
- [Docker Compose](https://docs.docker.com/compose/install)

## Running the example (pseudo-distributed, single host)

First, make sure that FEDn is installed (we recommend using a virtual environment)

Clone FEDn
```sh
git clone https://github.com/scaleoutsystems/fedn.git
```

Install FEDn and dependencies

``
pip install fedn
```

Or from source, standing in the folder 'fedn/fedn'

```
pip install .
```

### Prepare the example environment, the compute package and seed model

Standing in the folder fedn/examples/async-clients
```
pip install -r requirements.txt
```

Create the compute package and seed model:
```
tar -czvf package.tgz client
```

```
python client/entrypoint init_seed
```

You will now have a file 'seed.npz' in the directory.

### Running a simulation

Deploy FEDn on localhost. Standing in the the FEDn root directory:

```
docker-compose up
```

Initialize FEDn with the compute package and seed model

```
python init_fedn.py
```

Start simulating clients
```
python run_clients.py
```

Start the experiment / training sessions:

```
python run_experiment.py
```

Once global models start being produced, you can start analyzing results using API Client, refer to the notebook "Experiment.ipynb" for instructions.




142 changes: 142 additions & 0 deletions examples/async-clients/client/entrypoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# /bin/python
import fire
import numpy as np
from sklearn.datasets import make_classification
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.neural_network import MLPClassifier

from fedn.utils.helpers.helpers import get_helper, save_metadata, save_metrics

HELPER_MODULE = 'numpyhelper'
ARRAY_SIZE = 10000


def compile_model(max_iter=1):
clf = MLPClassifier(max_iter=max_iter)
# This is needed to initialize some state variables needed to make predictions
# We will overwrite weights and biases during FL training
X_train, y_train, _, _ = make_data()
clf.fit(X_train, y_train)
return clf


def save_parameters(model, out_path):
""" Save model to disk.

:param model: The model to save.
:type model: torch.nn.Module
:param out_path: The path to save to.
:type out_path: str
"""
helper = get_helper(HELPER_MODULE)
parameters = model.coefs_ + model.intercepts_

helper.save(parameters, out_path)


def load_parameters(model_path):
""" Load model from disk.

param model_path: The path to load from.
:type model_path: str
:return: The loaded model.
:rtype: torch.nn.Module
"""
helper = get_helper(HELPER_MODULE)
parameters = helper.load(model_path)

return parameters


def init_seed(out_path='seed.npz'):
""" Initialize seed model.

:param out_path: The path to save the seed model to.
:type out_path: str
"""
# Init and save
model = compile_model()
save_parameters(model, out_path)


def make_data(n_min=50, n_max=100):
""" Generate / simulate a random number n data points.

n will fall in the interval (n_min, n_max)

"""
n_samples = 100000
X, y = make_classification(n_samples=n_samples, n_features=4, n_informative=4, n_redundant=0, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

n = np.random.randint(n_min, n_max, 1)[0]
ind = np.random.choice(len(X_train), n)
X_train = X_train[ind, :]
y_train = y_train[ind]
return X_train, y_train, X_test, y_test


def train(in_model_path, out_model_path):
""" Train model.

"""

# Load model
parameters = load_parameters(in_model_path)
model = compile_model()
n = len(parameters)//2
model.coefs_ = parameters[:n]
model.intercepts_ = parameters[n:]

# Train
X_train, y_train, _, _ = make_data()
epochs = 10
for i in range(epochs):
model.partial_fit(X_train, y_train)

# Metadata needed for aggregation server side
metadata = {
'num_examples': len(X_train),
}

# Save JSON metadata file
save_metadata(metadata, out_model_path)

# Save model update
save_parameters(model, out_model_path)


def validate(in_model_path, out_json_path):
""" Validate model.

:param in_model_path: The path to the input model.
:type in_model_path: str
:param out_json_path: The path to save the output JSON to.
:type out_json_path: str
:param data_path: The path to the data file.
:type data_path: str
"""
parameters = load_parameters(in_model_path)
model = compile_model()
n = len(parameters)//2
model.coefs_ = parameters[:n]
model.intercepts_ = parameters[n:]

X_train, y_train, X_test, y_test = make_data()

# JSON schema
report = {
"accuracy_score": accuracy_score(y_test, model.predict(X_test)),
}

# Save JSON
save_metrics(report, out_json_path)


if __name__ == '__main__':
fire.Fire({
'init_seed': init_seed,
'train': train,
'validate': validate
})
5 changes: 5 additions & 0 deletions examples/async-clients/client/fedn.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
entry_points:
train:
command: python entrypoint.py train $ENTRYPOINT_OPTS
validate:
command: python entrypoint.py validate $ENTRYPOINT_OPTS
Binary file added examples/async-clients/img/async-clients.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
DISCOVER_PORT = 8092

client = APIClient(DISCOVER_HOST, DISCOVER_PORT)
client.set_package('package.tgz', 'numpyhelper')
client.set_initial_model('seed.npz')
client.set_active_package('package.tgz', 'numpyhelper')
client.set_active_model('seed.npz')
3 changes: 3 additions & 0 deletions examples/async-clients/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
fire==0.3.1
numpy
scikit-learn
77 changes: 77 additions & 0 deletions examples/async-clients/run_clients.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""This scripts starts N_CLIENTS using the SDK.





If you are running with a local deploy of FEDn
using docker compose, you need to make sure that clients
are able to resolve the name "combiner" to 127.0.0.1

One way to accomplish this is to edit your /etc/host,
adding the line:

combiner 127.0.0.1

(this requires root previliges)
"""

import copy
import time
from multiprocessing import Process

import numpy as np

from fedn.network.clients.client import Client

settings = {
'DISCOVER_HOST': '127.0.0.1',
'DISCOVER_PORT': 8092,
'N_CLIENTS': 10,
'N_CYCLES': 100,
'CLIENTS_MAX_DELAY': 10,
'CLIENTS_ONLINE_FOR_SECONDS': 120
}

client_config = {'discover_host': settings['DISCOVER_HOST'], 'discover_port': settings['DISCOVER_PORT'], 'token': None, 'name': 'testclient',
'client_id': 1, 'remote_compute_context': True, 'force_ssl': False, 'dry_run': False, 'secure': False,
'preshared_cert': False, 'verify': False, 'preferred_combiner': False,
'validator': True, 'trainer': True, 'init': None, 'logfile': 'test.log', 'heartbeat_interval': 2,
'reconnect_after_missed_heartbeat': 30}


def run_client(online_for=120, name='client'):
""" Simulates a client that starts and stops
at random intervals.

The client will start after a radom time 'mean_delay',
stay online for 'online_for' seconds (deterministic),
then disconnect.

This is repeated for N_CYCLES.

"""

conf = copy.deepcopy(client_config)
conf['name'] = name

for i in range(settings['N_CYCLES']):
# Sample a delay until the client starts
t_start = np.random.randint(0, settings['CLIENTS_MAX_DELAY'])
time.sleep(t_start)
fl_client = Client(conf)
time.sleep(online_for)
fl_client.disconnect()


if __name__ == '__main__':

# We start N_CLIENTS independent client processes
processes = []
for i in range(settings['N_CLIENTS']):
p = Process(target=run_client, args=(settings['CLIENTS_ONLINE_FOR_SECONDS'], 'client{}'.format(i),))
processes.append(p)
p.start()

for p in processes:
p.join()
34 changes: 34 additions & 0 deletions examples/async-clients/run_experiment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import time
import uuid

from fedn import APIClient

DISCOVER_HOST = '127.0.0.1'
DISCOVER_PORT = 8092
client = APIClient(DISCOVER_HOST, DISCOVER_PORT)

if __name__ == '__main__':

# Run six sessions, each with 100 rounds.
num_sessions = 6
for s in range(num_sessions):

session_config = {
"helper": "numpyhelper",
"id": str(uuid.uuid4()),
"aggregator": "fedopt",
"round_timeout": 20,
"rounds": 100,
"validate": False,
}

session = client.start_session(**session_config)
if session['success'] is False:
print(session['message'])
exit(0)

print("Started session: {}".format(session))

# Wait for session to finish
while not client.session_is_finished(session_config['id']):
time.sleep(2)
Loading
Loading