Skip to content

Commit

Permalink
Feature/SK-505 | Flush model update queues at new session + Buffer si…
Browse files Browse the repository at this point in the history
…ze config (#476)

* model update queues gets flushed in the beginning of a new session

* Changed confusing log message

* buffer_size now configurable, solves sk-520

* exclude isort for protobuf files

* Deleted commented code

* Added response status message

---------

Co-authored-by: Andreas Hellander <[email protected]>
Co-authored-by: Fredrik Wrede <[email protected]>
  • Loading branch information
3 people committed Oct 25, 2023
1 parent 78e5346 commit 0209851
Show file tree
Hide file tree
Showing 10 changed files with 791 additions and 702 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/code-checks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,23 @@ jobs:
--skip .venv
--skip .mnist-keras
--skip .mnist-pytorch
--skip fedn_pb2.py
--skip fedn_pb2_grpc.py
- name: check Python formatting
run: >
.venv/bin/autopep8 --recursive --diff
--exclude .venv
--exclude .mnist-keras
--exclude .mnist-pytorch
--exclude fedn_pb2.py
--exclude fedn_pb2_grpc.py
.
- name: run Python linter
run: >
.venv/bin/flake8 .
--exclude ".venv,.mnist-keras,.mnist-pytorch,fedn_pb2.py"
--exclude ".venv,.mnist-keras,.mnist-pytorch,fedn_pb2.py,fedn_pb2_grpc.py"
- name: check for floating imports
run: >
Expand Down
6 changes: 3 additions & 3 deletions fedn/fedn/common/net/grpc/fedn.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package grpc;

message Response {
Client sender = 1;
//string client = 1;
string response = 2;
}

Expand All @@ -19,7 +18,6 @@ enum StatusType {

message Status {
Client sender = 1;
//string client = 1;
string status = 2;

enum LogLevel {
Expand Down Expand Up @@ -95,6 +93,7 @@ enum ModelStatus {
IN_PROGRESS_OK = 2;
FAILED = 3;
}

message ModelRequest {
Client sender = 1;
Client receiver = 2;
Expand Down Expand Up @@ -204,7 +203,8 @@ message ReportResponse {
service Control {
rpc Start(ControlRequest) returns (ControlResponse);
rpc Stop(ControlRequest) returns (ControlResponse);
rpc Configure(ControlRequest) returns (ReportResponse);
rpc Configure(ControlRequest) returns (ReportResponse);
rpc FlushAggregationQueue(ControlRequest) returns (ControlResponse);
rpc Report(ControlRequest) returns (ReportResponse);
}

Expand Down
337 changes: 169 additions & 168 deletions fedn/fedn/common/net/grpc/fedn_pb2.py

Large diffs are not rendered by default.

1,029 changes: 528 additions & 501 deletions fedn/fedn/common/net/grpc/fedn_pb2_grpc.py

Large diffs are not rendered by default.

19 changes: 18 additions & 1 deletion fedn/fedn/network/combiner/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def get_channel(self):


class CombinerInterface:
""" Interface for the Combiner (server).
""" Interface for the Combiner (aggregation server).
Abstraction on top of the gRPC server servicer.
"""
Expand Down Expand Up @@ -220,6 +220,23 @@ def configure(self, config=None):
else:
raise

def flush_model_update_queue(self):
""" Reset the model update queue on the combiner. """

channel = Channel(self.address, self.port,
self.certificate).get_channel()
control = rpc.ControlStub(channel)

request = fedn.ControlRequest()

try:
control.FlushAggregationQueue(request)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.UNAVAILABLE:
raise CombinerUnavailableError
else:
raise

def submit(self, config):
""" Submit a compute plan to the combiner.
Expand Down
10 changes: 7 additions & 3 deletions fedn/fedn/network/combiner/round.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ def waitforit(self, config, buffer_size=100, polling_interval=0.1):
"""

time_window = float(config['round_timeout'])
# buffer_size = int(config['buffer_size'])

tt = 0.0
while tt < time_window:
Expand Down Expand Up @@ -150,16 +149,21 @@ def _training_round(self, config, clients):
# Request model updates from all active clients.
self.server.request_model_update(config, clients=clients)

# If buffer_size is -1 (default), the round terminates when/if all clients have completed.
if int(config['buffer_size']) == -1:
buffer_size = len(clients)
else:
buffer_size = int(config['buffer_size'])

# Wait / block until the round termination policy has been met.
self.waitforit(config, buffer_size=len(clients))
self.waitforit(config, buffer_size=buffer_size)

tic = time.time()
model = None
data = None

try:
helper = get_helper(config['helper_type'])
# print config delete_models_storage
print("ROUNDCONTROL: Config delete_models_storage: {}".format(config['delete_models_storage']), flush=True)
if config['delete_models_storage'] == 'True':
delete_models = True
Expand Down
58 changes: 39 additions & 19 deletions fedn/fedn/network/combiner/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,19 @@ def __register_heartbeat(self, client):
self.__join_client(client)
self.clients[client.name]["lastseen"] = datetime.now()

def flush_model_update_queue(self):
"""Clear the model update queue (aggregator). """

q = self.control.aggregator.model_updates
try:
with q.mutex:
q.queue.clear()
q.all_tasks_done.notify_all()
q.unfinished_tasks = 0
return True
except Exception:
return False

#####################################################################################################################

# Control Service
Expand Down Expand Up @@ -400,6 +413,9 @@ def Start(self, control: fedn.ControlRequest, context):

return response

# RPCs related to remote configuration of the server, round controller,
# aggregator and their states.

def Configure(self, control: fedn.ControlRequest, context):
""" Configure the Combiner.
Expand All @@ -416,6 +432,29 @@ def Configure(self, control: fedn.ControlRequest, context):
response = fedn.ControlResponse()
return response

def FlushAggregationQueue(self, control: fedn.ControlRequest, context):
""" Flush the queue.
:param control: the control request
:type control: :class:`fedn.common.net.grpc.fedn_pb2.ControlRequest`
:param context: the context (unused)
:type context: :class:`grpc._server._Context`
:return: the control response
:rtype: :class:`fedn.common.net.grpc.fedn_pb2.ControlResponse`
"""

status = self.flush_model_update_queue()

response = fedn.ControlResponse()
if status:
response.message = 'Success'
else:
response.message = 'Failed'

return response

##############################################################################

def Stop(self, control: fedn.ControlRequest, context):
""" TODO: Not yet implemented.
Expand Down Expand Up @@ -494,25 +533,6 @@ def Report(self, control: fedn.ControlRequest, context):

#####################################################################################################################

def AllianceStatusStream(self, response, context):
""" A server stream RPC endpoint that emits status messages.
:param response: the response
:type response: :class:`fedn.common.net.grpc.fedn_pb2.Response`
:param context: the context (unused)
:type context: :class:`grpc._server._Context`"""
status = fedn.Status(
status="Client {} connecting to AllianceStatusStream.".format(response.sender))
status.log_level = fedn.Status.INFO
status.sender.name = self.id
status.sender.role = role_to_proto_role(self.role)
self._subscribe_client_to_queue(response.sender, fedn.Channel.STATUS)
q = self.__get_queue(response.sender, fedn.Channel.STATUS)
self._send_status(status)

while True:
yield q.get()

def SendStatus(self, status: fedn.Status, context):
""" A client stream RPC endpoint that accepts status messages.
Expand Down
13 changes: 9 additions & 4 deletions fedn/fedn/network/controller/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,23 @@ def session(self, config):
print("Controller already in INSTRUCTING state. A session is in progress.", flush=True)
return

if not self.get_latest_model():
print("No model in model chain, please provide a seed model!")
return

self._state = ReducerState.instructing

# Must be called to set info in the db
self.new_session(config)

if not self.get_latest_model():
print("No model in model chain, please provide a seed model!")

self._state = ReducerState.monitoring

last_round = int(self.get_latest_round_id())

# Clear potential stragglers/old model updates at combiners
for combiner in self.network.get_combiners():
combiner.flush_model_update_queue()

# Execute the rounds in this session
for round in range(1, int(config['rounds'] + 1)):
# Increment the round number
Expand Down Expand Up @@ -179,7 +184,7 @@ def round(self, session_config, round_id):
else:
# Print every 10 seconds based on value of wait
if wait % 10 == 0:
print("CONTROL: Round not found! Waiting...", flush=True)
print("CONTROL: Waiting for round to complete...", flush=True)
if wait >= session_config['round_timeout']:
print("CONTROL: Round timeout! Exiting round...", flush=True)
break
Expand Down
5 changes: 3 additions & 2 deletions fedn/fedn/network/dashboard/restservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ def control():
if request.method == 'POST':
# Get session configuration
round_timeout = float(request.form.get('timeout', 180))
buffer_size = int(request.form.get('buffer_size', -1))
rounds = int(request.form.get('rounds', 1))
delete_models = request.form.get('delete_models', True)
task = (request.form.get('task', ''))
Expand Down Expand Up @@ -601,8 +602,8 @@ def control():

latest_model_id = self.control.get_latest_model()

config = {'round_timeout': round_timeout, 'model_id': latest_model_id,
'rounds': rounds, 'delete_models_storage': delete_models,
config = {'round_timeout': round_timeout, 'buffer_size': buffer_size,
'model_id': latest_model_id, 'rounds': rounds, 'delete_models_storage': delete_models,
'clients_required': clients_required,
'clients_requested': clients_requested, 'task': task,
'validate': validate, 'helper_type': helper_type}
Expand Down
10 changes: 10 additions & 0 deletions fedn/fedn/network/dashboard/templates/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,16 @@ <h5 class="card-title">New session</h5>
<input class="form-control" type="text" id="timeout"
name="timeout" value="180">
</div>
</div>

<div class="mb-3 row">
<label class="col-form-label col-sm-3 text-sm-right"
for="buffer_size">Buffer size:
</label>
<div class="col-sm-2" style="min-width:300px;">
<input class="form-control" type="text" id="buffer_size"
name="buffer_size" value="-1">
</div>

</div>

Expand Down

0 comments on commit 0209851

Please sign in to comment.