Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update develop after release #481

Merged
merged 5 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/code-checks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,23 @@ jobs:
--skip .venv
--skip .mnist-keras
--skip .mnist-pytorch
--skip fedn_pb2.py
--skip fedn_pb2_grpc.py

- name: check Python formatting
run: >
.venv/bin/autopep8 --recursive --diff
--exclude .venv
--exclude .mnist-keras
--exclude .mnist-pytorch
--exclude fedn_pb2.py
--exclude fedn_pb2_grpc.py
.

- name: run Python linter
run: >
.venv/bin/flake8 .
--exclude ".venv,.mnist-keras,.mnist-pytorch,fedn_pb2.py"
--exclude ".venv,.mnist-keras,.mnist-pytorch,fedn_pb2.py,fedn_pb2_grpc.py"

- name: check for floating imports
run: >
Expand Down
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ To connect a client that uses the data partition 'data/clients/1/mnist.pt':
-v $PWD/data/clients/1:/var/data \
-e ENTRYPOINT_OPTS=--data_path=/var/data/mnist.pt \
--network=fedn_default \
ghcr.io/scaleoutsystems/fedn/fedn:develop-mnist-pytorch run client -in client.yaml --name client1
ghcr.io/scaleoutsystems/fedn/fedn:master-mnist-pytorch run client -in client.yaml --name client1

You are now ready to start training the model at http://localhost:8090/control.

Expand Down
6 changes: 3 additions & 3 deletions fedn/fedn/common/net/grpc/fedn.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package grpc;

message Response {
Client sender = 1;
//string client = 1;
string response = 2;
}

Expand All @@ -19,7 +18,6 @@ enum StatusType {

message Status {
Client sender = 1;
//string client = 1;
string status = 2;

enum LogLevel {
Expand Down Expand Up @@ -95,6 +93,7 @@ enum ModelStatus {
IN_PROGRESS_OK = 2;
FAILED = 3;
}

message ModelRequest {
Client sender = 1;
Client receiver = 2;
Expand Down Expand Up @@ -204,7 +203,8 @@ message ReportResponse {
service Control {
rpc Start(ControlRequest) returns (ControlResponse);
rpc Stop(ControlRequest) returns (ControlResponse);
rpc Configure(ControlRequest) returns (ReportResponse);
rpc Configure(ControlRequest) returns (ReportResponse);
rpc FlushAggregationQueue(ControlRequest) returns (ControlResponse);
rpc Report(ControlRequest) returns (ReportResponse);
}

Expand Down
337 changes: 169 additions & 168 deletions fedn/fedn/common/net/grpc/fedn_pb2.py

Large diffs are not rendered by default.

1,029 changes: 528 additions & 501 deletions fedn/fedn/common/net/grpc/fedn_pb2_grpc.py

Large diffs are not rendered by default.

19 changes: 18 additions & 1 deletion fedn/fedn/network/combiner/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def get_channel(self):


class CombinerInterface:
""" Interface for the Combiner (server).
""" Interface for the Combiner (aggregation server).
Abstraction on top of the gRPC server servicer.

"""
Expand Down Expand Up @@ -220,6 +220,23 @@ def configure(self, config=None):
else:
raise

def flush_model_update_queue(self):
""" Reset the model update queue on the combiner. """

channel = Channel(self.address, self.port,
self.certificate).get_channel()
control = rpc.ControlStub(channel)

request = fedn.ControlRequest()

try:
control.FlushAggregationQueue(request)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.UNAVAILABLE:
raise CombinerUnavailableError
else:
raise

def submit(self, config):
""" Submit a compute plan to the combiner.

Expand Down
10 changes: 7 additions & 3 deletions fedn/fedn/network/combiner/round.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ def waitforit(self, config, buffer_size=100, polling_interval=0.1):
"""

time_window = float(config['round_timeout'])
# buffer_size = int(config['buffer_size'])

tt = 0.0
while tt < time_window:
Expand Down Expand Up @@ -150,16 +149,21 @@ def _training_round(self, config, clients):
# Request model updates from all active clients.
self.server.request_model_update(config, clients=clients)

# If buffer_size is -1 (default), the round terminates when/if all clients have completed.
if int(config['buffer_size']) == -1:
buffer_size = len(clients)
else:
buffer_size = int(config['buffer_size'])

# Wait / block until the round termination policy has been met.
self.waitforit(config, buffer_size=len(clients))
self.waitforit(config, buffer_size=buffer_size)

tic = time.time()
model = None
data = None

try:
helper = get_helper(config['helper_type'])
# print config delete_models_storage
print("ROUNDCONTROL: Config delete_models_storage: {}".format(config['delete_models_storage']), flush=True)
if config['delete_models_storage'] == 'True':
delete_models = True
Expand Down
58 changes: 39 additions & 19 deletions fedn/fedn/network/combiner/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,19 @@ def __register_heartbeat(self, client):
self.__join_client(client)
self.clients[client.name]["lastseen"] = datetime.now()

def flush_model_update_queue(self):
"""Clear the model update queue (aggregator). """

q = self.control.aggregator.model_updates
try:
with q.mutex:
q.queue.clear()
q.all_tasks_done.notify_all()
q.unfinished_tasks = 0
return True
except Exception:
return False

#####################################################################################################################

# Control Service
Expand Down Expand Up @@ -400,6 +413,9 @@ def Start(self, control: fedn.ControlRequest, context):

return response

# RPCs related to remote configuration of the server, round controller,
# aggregator and their states.

def Configure(self, control: fedn.ControlRequest, context):
""" Configure the Combiner.

Expand All @@ -416,6 +432,29 @@ def Configure(self, control: fedn.ControlRequest, context):
response = fedn.ControlResponse()
return response

def FlushAggregationQueue(self, control: fedn.ControlRequest, context):
""" Flush the queue.

:param control: the control request
:type control: :class:`fedn.common.net.grpc.fedn_pb2.ControlRequest`
:param context: the context (unused)
:type context: :class:`grpc._server._Context`
:return: the control response
:rtype: :class:`fedn.common.net.grpc.fedn_pb2.ControlResponse`
"""

status = self.flush_model_update_queue()

response = fedn.ControlResponse()
if status:
response.message = 'Success'
else:
response.message = 'Failed'

return response

##############################################################################

def Stop(self, control: fedn.ControlRequest, context):
""" TODO: Not yet implemented.

Expand Down Expand Up @@ -494,25 +533,6 @@ def Report(self, control: fedn.ControlRequest, context):

#####################################################################################################################

def AllianceStatusStream(self, response, context):
""" A server stream RPC endpoint that emits status messages.

:param response: the response
:type response: :class:`fedn.common.net.grpc.fedn_pb2.Response`
:param context: the context (unused)
:type context: :class:`grpc._server._Context`"""
status = fedn.Status(
status="Client {} connecting to AllianceStatusStream.".format(response.sender))
status.log_level = fedn.Status.INFO
status.sender.name = self.id
status.sender.role = role_to_proto_role(self.role)
self._subscribe_client_to_queue(response.sender, fedn.Channel.STATUS)
q = self.__get_queue(response.sender, fedn.Channel.STATUS)
self._send_status(status)

while True:
yield q.get()

def SendStatus(self, status: fedn.Status, context):
""" A client stream RPC endpoint that accepts status messages.

Expand Down
Empty file.
13 changes: 9 additions & 4 deletions fedn/fedn/network/controller/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,23 @@ def session(self, config):
print("Controller already in INSTRUCTING state. A session is in progress.", flush=True)
return

if not self.get_latest_model():
print("No model in model chain, please provide a seed model!")
return

self._state = ReducerState.instructing

# Must be called to set info in the db
self.new_session(config)

if not self.get_latest_model():
print("No model in model chain, please provide a seed model!")

self._state = ReducerState.monitoring

last_round = int(self.get_latest_round_id())

# Clear potential stragglers/old model updates at combiners
for combiner in self.network.get_combiners():
combiner.flush_model_update_queue()

# Execute the rounds in this session
for round in range(1, int(config['rounds'] + 1)):
# Increment the round number
Expand Down Expand Up @@ -179,7 +184,7 @@ def round(self, session_config, round_id):
else:
# Print every 10 seconds based on value of wait
if wait % 10 == 0:
print("CONTROL: Round not found! Waiting...", flush=True)
print("CONTROL: Waiting for round to complete...", flush=True)
if wait >= session_config['round_timeout']:
print("CONTROL: Round timeout! Exiting round...", flush=True)
break
Expand Down
5 changes: 3 additions & 2 deletions fedn/fedn/network/dashboard/restservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ def control():
if request.method == 'POST':
# Get session configuration
round_timeout = float(request.form.get('timeout', 180))
buffer_size = int(request.form.get('buffer_size', -1))
rounds = int(request.form.get('rounds', 1))
delete_models = request.form.get('delete_models', True)
task = (request.form.get('task', ''))
Expand Down Expand Up @@ -601,8 +602,8 @@ def control():

latest_model_id = self.control.get_latest_model()

config = {'round_timeout': round_timeout, 'model_id': latest_model_id,
'rounds': rounds, 'delete_models_storage': delete_models,
config = {'round_timeout': round_timeout, 'buffer_size': buffer_size,
'model_id': latest_model_id, 'rounds': rounds, 'delete_models_storage': delete_models,
'clients_required': clients_required,
'clients_requested': clients_requested, 'task': task,
'validate': validate, 'helper_type': helper_type}
Expand Down
10 changes: 10 additions & 0 deletions fedn/fedn/network/dashboard/templates/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,16 @@ <h5 class="card-title">New session</h5>
<input class="form-control" type="text" id="timeout"
name="timeout" value="180">
</div>
</div>

<div class="mb-3 row">
<label class="col-form-label col-sm-3 text-sm-right"
for="buffer_size">Buffer size:
</label>
<div class="col-sm-2" style="min-width:300px;">
<input class="form-control" type="text" id="buffer_size"
name="buffer_size" value="-1">
</div>

</div>

Expand Down
2 changes: 1 addition & 1 deletion fedn/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name='fedn',
version='0.5.0-dev',
version='0.5.0',
description="""Scaleout Federated Learning""",
long_description=open('README.md').read(),
long_description_content_type="text/markdown",
Expand Down
Loading