Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/SK-675 | Model staging not in sync #523

Merged
merged 7 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions fedn/fedn/network/clients/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ def _initialize_dispatcher(self, config):
copy_tree(from_path, self.run_path)
self.dispatcher = Dispatcher(dispatch_config, self.run_path)

def get_model_from_combiner(self, id):
def get_model_from_combiner(self, id, timeout=20):
"""Fetch a model from the assigned combiner.
Downloads the model update object via a gRPC streaming channel.

Expand All @@ -369,8 +369,12 @@ def get_model_from_combiner(self, id):
:rtype: BytesIO
"""
data = BytesIO()
time_start = time.time()
request = fedn.ModelRequest(id=id)
request.sender.name = self.name
request.sender.role = fedn.WORKER

for part in self.modelStub.Download(fedn.ModelRequest(id=id), metadata=self.metadata):
for part in self.modelStub.Download(request, metadata=self.metadata):

if part.status == fedn.ModelStatus.IN_PROGRESS:
data.write(part.data)
Expand All @@ -381,6 +385,11 @@ def get_model_from_combiner(self, id):
if part.status == fedn.ModelStatus.FAILED:
return None

if part.status == fedn.ModelStatus.UNKNOWN:
if time.time() - time_start >= timeout:
return None
continue

return data

def send_model_to_combiner(self, model, id):
Expand Down Expand Up @@ -509,6 +518,9 @@ def _process_training_request(self, model_id):
meta = {}
tic = time.time()
mdl = self.get_model_from_combiner(str(model_id))
if mdl is None:
logger.error("Could not retrieve model from combiner. Aborting training request.")
return None, None
meta['fetch_model'] = time.time() - tic

inpath = self.helper.get_tmp_path()
Expand Down Expand Up @@ -573,6 +585,9 @@ def _process_validation_request(self, model_id, is_inference):
self.state = ClientState.validating
try:
model = self.get_model_from_combiner(str(model_id))
if model is None:
logger.error("Could not retrieve model from combiner. Aborting validation request.")
return None
inpath = self.helper.get_tmp_path()

with open(inpath, "wb") as fh:
Expand Down
15 changes: 13 additions & 2 deletions fedn/fedn/network/combiner/interfaces.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import base64
import copy
import json
import time
from io import BytesIO

import grpc
Expand Down Expand Up @@ -239,7 +240,7 @@ def submit(self, config):

return response

def get_model(self, id):
def get_model(self, id, timeout=10):
""" Download a model from the combiner server.

:param id: The model id.
Expand All @@ -255,14 +256,24 @@ def get_model(self, id):
data = BytesIO()
data.seek(0, 0)

parts = modelservice.Download(fedn.ModelRequest(id=id))
time_start = time.time()

request = fedn.ModelRequest(id=id)
request.sender.name = self.name
request.sender.role = fedn.WORKER

parts = modelservice.Download(request)
for part in parts:
if part.status == fedn.ModelStatus.IN_PROGRESS:
data.write(part.data)
if part.status == fedn.ModelStatus.OK:
return data
if part.status == fedn.ModelStatus.FAILED:
return None
if part.status == fedn.ModelStatus.UNKNOWN:
if time.time() - time_start > timeout:
return None
continue

def allowing_clients(self):
""" Check if the combiner is allowing additional client connections.
Expand Down
12 changes: 8 additions & 4 deletions fedn/fedn/network/combiner/modelservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,17 +179,20 @@ def Download(self, request, context):
:return: A model response iterator.
:rtype: :class:`fedn.network.grpc.fedn_pb2.ModelResponse`
"""
logger.debug("grpc.ModelService.Download: Called")
logger.info(f'grpc.ModelService.Download: {request.sender.role}:{request.sender.name} requested model {request.id}')
try:
if self.temp_model_storage.get_model_metadata(request.id) != fedn.ModelStatus.OK:
logger.error("Error file is not ready")
yield fedn.ModelResponse(id=request.id, data=None, status=fedn.ModelStatus.FAILED)
status = self.temp_model_storage.get_model_metadata(request.id)
if status != fedn.ModelStatus.OK:
logger.error(f'model file is not ready: {request.id}, status: {status}')
yield fedn.ModelResponse(id=request.id, data=None, status=status)
except Exception:
logger.error("Error file does not exist: {}".format(request.id))
yield fedn.ModelResponse(id=request.id, data=None, status=fedn.ModelStatus.FAILED)

try:
obj = self.temp_model_storage.get(request.id)
if obj is None:
raise Exception(f'File not found: {request.id}')
with obj as f:
while True:
piece = f.read(CHUNK_SIZE)
Expand All @@ -199,3 +202,4 @@ def Download(self, request, context):
yield fedn.ModelResponse(id=request.id, data=piece, status=fedn.ModelStatus.IN_PROGRESS)
except Exception as e:
logger.error("Downloading went wrong: {} {}".format(request.id, e))
yield fedn.ModelResponse(id=request.id, data=None, status=fedn.ModelStatus.FAILED)
1 change: 1 addition & 0 deletions fedn/fedn/network/grpc/fedn.proto
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ enum ModelStatus {
IN_PROGRESS = 1;
IN_PROGRESS_OK = 2;
FAILED = 3;
UNKNOWN = 4;
}

message ModelRequest {
Expand Down
Loading
Loading