Skip to content

Commit

Permalink
changes related to cpp client
Browse files Browse the repository at this point in the history
  • Loading branch information
Wrede committed Sep 18, 2023
1 parent 7a18790 commit b250665
Show file tree
Hide file tree
Showing 10 changed files with 379 additions and 535 deletions.
2 changes: 1 addition & 1 deletion examples/mnist-keras/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
tensorflow==2.9.3
tensorflow-cpu==2.13.0
fire==0.3.1
docker==6.1.1
3 changes: 2 additions & 1 deletion fedn/fedn/common/net/grpc/fedn.proto
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
syntax = "proto3";

package grpc;
package fedn;

message Response {
Client sender = 1;
Expand Down Expand Up @@ -64,6 +64,7 @@ message ModelUpdate {
string correlation_id = 5;
string timestamp = 6;
string meta = 7;
string config = 8;
}

message ModelValidationRequest {
Expand Down
384 changes: 76 additions & 308 deletions fedn/fedn/common/net/grpc/fedn_pb2.py

Large diffs are not rendered by default.

380 changes: 190 additions & 190 deletions fedn/fedn/common/net/grpc/fedn_pb2_grpc.py

Large diffs are not rendered by default.

13 changes: 10 additions & 3 deletions fedn/fedn/network/clients/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,10 +377,17 @@ def upload_request_generator(mdl):
b = mdl.read(CHUNK_SIZE)
if b:
result = fedn.ModelRequest(
data=b, id=id, status=fedn.ModelStatus.IN_PROGRESS)
data=b,
id=id,
status=fedn.ModelStatus.IN_PROGRESS,
sender=fedn.Client(name=self.name, role=fedn.WORKER)
)
else:
result = fedn.ModelRequest(
id=id, status=fedn.ModelStatus.OK)
id=id,
status=fedn.ModelStatus.OK,
sender=fedn.Client(name=self.name, role=fedn.WORKER)
)

yield result
if not b:
Expand Down Expand Up @@ -574,11 +581,11 @@ def process_request(self):
request.model_id)
processing_time = time.time()-tic
meta['processing_time'] = processing_time
meta['config'] = request.data

if model_id is not None:
# Send model update to combiner
update = fedn.ModelUpdate()
update.config = request.data
update.sender.name = self.name
update.sender.role = fedn.WORKER
update.receiver.name = request.sender.name
Expand Down
12 changes: 8 additions & 4 deletions fedn/fedn/network/combiner/aggregators/aggregatorbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def on_model_update(self, model_update):
except Exception as e:
self.server.report_status("AGGREGATOR({}): Failed to receive model update! {}".format(self.name, e),
log_level=fedn.Status.WARNING)
pass
raise e

def _validate_model_update(self, model_update):
""" Validate the model update.
Expand All @@ -84,10 +84,14 @@ def _validate_model_update(self, model_update):
:rtype: bool
"""
# TODO: Validate the metadata to check that it contains all variables assumed by the aggregator.
data = json.loads(model_update.meta)['training_metadata']
if 'num_examples' not in data.keys():
meta = json.loads(model_update.meta)['training_metadata']
if 'num_examples' not in meta.keys():
self.server.report_status("AGGREGATOR({}): Model validation failed, num_examples missing in metadata.".format(self.name))
return False
config = json.loads(model_update.config)
if 'round_id' not in config.keys():
self.server.report_status("AGGREGATOR({}): Model validation failed, round_id missing in config.".format(self.name))
return False
return True

def next_model_update(self, helper):
Expand All @@ -103,7 +107,7 @@ def next_model_update(self, helper):
model_next = self.control.load_model_update(helper, model_id)
# Get relevant metadata
data = json.loads(model_update.meta)['training_metadata']
config = json.loads(json.loads(model_update.meta)['config'])
config = json.loads(model_update.config)
data['round_id'] = config['round_id']

return model_next, data, model_id
Expand Down
5 changes: 4 additions & 1 deletion fedn/fedn/network/combiner/aggregators/fedavg.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ def combine_models(self, helper=None, time_window=180, max_nr_models=100, delete
self.model_updates.task_done()
except Exception as e:
self.server.report_status(
"AGGREGATOR({}): Error encoutered while processing model update {}, skipping this update.".format(self.name, e))
"AGGREGATOR({}): Error encoutered while processing model update, skipping this update. Error: {}".format(self.name, e))
if model_id:
self.server.report_status(
"AGGREGATOR({}): Error for model update {}".format(self.name, model_id))
self.model_updates.task_done()

data['nr_aggregated_models'] = nr_aggregated_models
Expand Down
58 changes: 44 additions & 14 deletions fedn/fedn/network/combiner/modelservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class ModelService(rpc.ModelServiceServicer):

def __init__(self):
self.models = TempModelStorage()
self.name = "ModelService"

def exist(self, model_id):
""" Check if a model exists on the server.
Expand Down Expand Up @@ -119,10 +120,12 @@ def upload_request_generator(mdl):
b = mdl.read(CHUNK_SIZE)
if b:
result = fedn.ModelRequest(
data=b, id=id, status=fedn.ModelStatus.IN_PROGRESS)
data=b, id=id, status=fedn.ModelStatus.IN_PROGRESS,
sender=fedn.Client(name=self.name, role=fedn.WORKER))
else:
result = fedn.ModelRequest(
id=id, data=None, status=fedn.ModelStatus.OK)
id=id, data=None, status=fedn.ModelStatus.OK,
sender=fedn.Client(name=self.name, role=fedn.WORKER))
yield result
if not b:
break
Expand All @@ -141,21 +144,48 @@ def Upload(self, request_iterator, context):
:return: A model response object.
:rtype: :class:`fedn.common.net.grpc.fedn_pb2.ModelResponse`
"""

result = None
for request in request_iterator:
print("ModelRequest (Upload): Processing incoming request from {}".format(request.sender.name), flush=True)
if request.status == fedn.ModelStatus.IN_PROGRESS:
self.models.get_ptr(request.id).write(request.data)
self.models.set_meta(request.id, fedn.ModelStatus.IN_PROGRESS)

if request.status == fedn.ModelStatus.OK and not request.data:
result = fedn.ModelResponse(id=request.id, status=fedn.ModelStatus.OK,
message="Got model successfully.")
# self.models_metadata.update({request.id: fedn.ModelStatus.OK})
self.models.set_meta(request.id, fedn.ModelStatus.OK)
self.models.get_ptr(request.id).flush()
self.models.get_ptr(request.id).close()
# Print sender and model id
if request.sender:
print("ModelRequest (Upload): Received {} bytes from {}, ModelStatus.IN_PROGRESS".format(
len(request.data), request.sender.name), flush=True)
try:
self.models.get_ptr(request.id).write(request.data)
self.models.set_meta(request.id, fedn.ModelStatus.IN_PROGRESS)
except Exception as e:
message = "ModelRequest (Upload): Error writing model to file: {}".format(e)
print(message, flush=True)
result = fedn.ModelResponse(id=request.id, status=fedn.ModelStatus.FAILED, message=message)
return result

elif request.status == fedn.ModelStatus.OK and not request.data:
if request.sender:
print("ModelRequest (Upload): Received final ModeStatus.OK from {}".format(
request.sender.name), flush=True)
message = "ModelRequest (Upload): Received final ModeStatus.OK"
try:
self.models.set_meta(request.id, fedn.ModelStatus.OK)
self.models.get_ptr(request.id).flush()
self.models.get_ptr(request.id).close()
except Exception as e:
message = "ModelRequest (Upload): Error writing model to file: {}".format(e)
print(message, flush=True)
result = fedn.ModelResponse(id=request.id, status=fedn.ModelStatus.FAILED, message=message)
return result
result = fedn.ModelResponse(id=request.id, status=fedn.ModelStatus.OK, message=message)
return result
elif request.status == fedn.ModelStatus.OK and request.data:
message = "ModelRequest (Upload): Internal ERROR, got {} but data still exists in ModelRequest".format(request.status)
print(message, flush=True)
result = fedn.ModelResponse(id=request.id, status=fedn.ModelStatus.FAILED, message=message)
return result
else:
message = "ModelRequest (Upload): Internal ERROR, got unknown ModelStatus {}".format(request.status)
print(message, flush=True)
result = fedn.ModelResponse(id=request.id, status=fedn.ModelStatus.FAILED, message=message)
return result

def Download(self, request, context):
""" RPC endpoints for downloading a model.
Expand Down
55 changes: 43 additions & 12 deletions fedn/fedn/network/combiner/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from datetime import datetime, timedelta
from enum import Enum

from grpc import StatusCode

import fedn.common.net.grpc.fedn_pb2 as fedn
import fedn.common.net.grpc.fedn_pb2_grpc as rpc
from fedn.common.net.grpc.server import Server
Expand Down Expand Up @@ -649,11 +651,23 @@ def SendHeartbeat(self, heartbeat: fedn.Heartbeat, context):
:return: the response
:rtype: :class:`fedn.common.net.grpc.fedn_pb2.Response`
"""
self.__register_heartbeat(heartbeat.sender)
try:
self.__register_heartbeat(heartbeat.sender)
message = "Heartbeat received from client {}".format(
heartbeat.sender.name)
context.set_details(message)
context.set_code(StatusCode.OK)
except Exception as e:
message = "Failed to register heartbeat: CLIENT: {} INTERNAL ERROR".format(heartbeat.sender.name)
print(message, flush=True)
print(e, flush=True)
context.set_details(message)
context.set_code(StatusCode.INTERNAL)

response = fedn.Response()
response.sender.name = heartbeat.sender.name
response.sender.role = heartbeat.sender.role
response.response = "Heartbeat received"
response.response = message
return response

# Combiner Service
Expand Down Expand Up @@ -694,10 +708,8 @@ def ModelUpdateRequestStream(self, response, context):
"""

client = response.sender
metadata = context.invocation_metadata()
if metadata:
metadata = dict(metadata)
print("\nClient connected: {}\n".format(metadata['client']), flush=True)

print("\nClient connected: {}\n".format(client.name), flush=True)

status = fedn.Status(
status="Client {} connecting to ModelUpdateRequestStream.".format(client.name))
Expand All @@ -714,6 +726,11 @@ def ModelUpdateRequestStream(self, response, context):

while context.is_active():
try:
# Track time, send heatbeat every 5 seconds
now = datetime.now()
then = self.clients[client.name]["lastseen"]
if (now - then) > timedelta(seconds=5):
self.__register_heartbeat(client)
yield q.get(timeout=1.0)
except queue.Empty:
pass
Expand Down Expand Up @@ -792,7 +809,7 @@ def SendModelUpdateRequest(self, request, context):
return response # TODO Fill later

def SendModelUpdate(self, request, context):
""" Send a model update response.
""" Process ModelUpdate message from client and send a model update response.
:param request: the request
:type request: :class:`fedn.common.net.grpc.fedn_pb2.ModelUpdate`
Expand All @@ -801,12 +818,26 @@ def SendModelUpdate(self, request, context):
:return: the response
:rtype: :class:`fedn.common.net.grpc.fedn_pb2.Response`
"""
self.control.aggregator.on_model_update(request)

response = fedn.Response()
response.response = "RECEIVED ModelUpdate {} from client {}".format(
response, response.sender.name)
return response # TODO Fill later
message = "SendModelUpdate: ModelUpdate {} from client {}".format(
request.model_id,
request.sender.name
)
print(message, flush=True)
print("SendModelUpdate: LocalModelUpdateID {} from client {}".format(
request.model_update_id,
request.sender.name
), flush=True
)
try:
self.control.aggregator.on_model_update(request)
except Exception as e:
print("SendModelUpdate: Failed to process model update: {}".format(e), flush=True)
message = "SERVER ERROR: Failed to process model update"
pass

response.response = message
return response

def SendModelValidationRequest(self, request, context):
""" Send a model validation request.
Expand Down
2 changes: 1 addition & 1 deletion fedn/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"urllib3>=1.26.4",
"minio",
"python-slugify",
"grpcio~=1.48.0",
"grpcio~=1.57.0",
"grpcio-tools",
"numpy>=1.21.6",
"protobuf",
Expand Down

0 comments on commit b250665

Please sign in to comment.