From e7fa8f488583526b930912339638366efbca76d9 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Tue, 11 Jun 2024 15:34:12 +0000 Subject: [PATCH 1/4] presigned url and predict entrypoint --- examples/mnist-pytorch/client/fedn.yaml | 4 +- examples/mnist-pytorch/client/predict.py | 39 ++++++++++++ fedn/network/clients/client.py | 76 ++++++++++++++++++++++-- fedn/network/clients/state.py | 1 + fedn/network/combiner/combiner.py | 7 ++- fedn/network/storage/s3/repository.py | 34 +++++++++++ 6 files changed, 152 insertions(+), 9 deletions(-) create mode 100644 examples/mnist-pytorch/client/predict.py diff --git a/examples/mnist-pytorch/client/fedn.yaml b/examples/mnist-pytorch/client/fedn.yaml index b05504102..30873488b 100644 --- a/examples/mnist-pytorch/client/fedn.yaml +++ b/examples/mnist-pytorch/client/fedn.yaml @@ -7,4 +7,6 @@ entry_points: train: command: python train.py validate: - command: python validate.py \ No newline at end of file + command: python validate.py + predict: + command: python predict.py \ No newline at end of file diff --git a/examples/mnist-pytorch/client/predict.py b/examples/mnist-pytorch/client/predict.py new file mode 100644 index 000000000..b37765c3b --- /dev/null +++ b/examples/mnist-pytorch/client/predict.py @@ -0,0 +1,39 @@ +import os +import sys + +import torch +from data import load_data +from model import load_parameters + +from fedn.utils.helpers.helpers import save_metrics + +dir_path = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(os.path.abspath(dir_path)) + + +def predict(in_model_path, out_artifact_path, data_path=None): + """Validate model. + + :param in_model_path: The path to the input model. + :type in_model_path: str + :param out_artifact_path: The path to save the predict output to. + :type out_artifact_path: str + :param data_path: The path to the data file. + :type data_path: str + """ + # Load data + x_test, y_test = load_data(data_path, is_train=False) + + # Load model + model = load_parameters(in_model_path) + model.eval() + + # Predict + with torch.no_grad(): + y_pred = model(x_test) + # Save prediction to file/artifact, the artifact will be uploaded to the object store by the client + torch.save(y_pred, out_artifact_path) + + +if __name__ == "__main__": + predict(sys.argv[1], sys.argv[2]) diff --git a/fedn/network/clients/client.py b/fedn/network/clients/client.py index c0f1b0baa..81480d508 100644 --- a/fedn/network/clients/client.py +++ b/fedn/network/clients/client.py @@ -14,6 +14,7 @@ from shutil import copytree import grpc +import requests from cryptography.hazmat.primitives.serialization import Encoding from google.protobuf.json_format import MessageToJson from OpenSSL import SSL @@ -22,13 +23,11 @@ import fedn.network.grpc.fedn_pb2 as fedn import fedn.network.grpc.fedn_pb2_grpc as rpc from fedn.common.config import FEDN_AUTH_SCHEME, FEDN_PACKAGE_EXTRACT_DIR -from fedn.common.log_config import (logger, set_log_level_from_string, - set_log_stream) +from fedn.common.log_config import logger, set_log_level_from_string, set_log_stream from fedn.network.clients.connect import ConnectorClient, Status from fedn.network.clients.package import PackageRuntime from fedn.network.clients.state import ClientState, ClientStateToString -from fedn.network.combiner.modelservice import (get_tmp_path, - upload_request_generator) +from fedn.network.combiner.modelservice import get_tmp_path, upload_request_generator from fedn.utils.dispatcher import Dispatcher from fedn.utils.helpers.helpers import get_helper @@ -438,12 +437,18 @@ def _listen_to_task_stream(self): request=request, sesssion_id=request.session_id, ) - logger.info("Received model update request of type {} for model_id {}".format(request.type, request.model_id)) + logger.info("Received task request of type {} for model_id {}".format(request.type, request.model_id)) if request.type == fedn.StatusType.MODEL_UPDATE and self.config["trainer"]: self.inbox.put(("train", request)) elif request.type == fedn.StatusType.MODEL_VALIDATION and self.config["validator"]: self.inbox.put(("validate", request)) + elif request.type == fedn.StatusType.INFERENCE and self.config["validator"]: + logger.info("Received inference request for model_id {}".format(request.model_id)) + presined_url = json.loads(request.data) + presined_url = presined_url["presigned_url"] + logger.info("Inference presigned URL: {}".format(presined_url)) + self.inbox.put(("infer", request)) else: logger.error("Unknown request type: {}".format(request.type)) @@ -586,6 +591,51 @@ def _process_validation_request(self, model_id: str, is_inference: bool, session self.state = ClientState.idle return validation + def _process_inference_request(self, model_id: str, session_id: str, presigned_url: str): + """Process an inference request. + + :param model_id: The model id of the model to be used for inference. + :type model_id: str + :param session_id: The id of the current session. + :type session_id: str + :param presigned_url: The presigned URL for the data to be used for inference. + :type presigned_url: str + :return: None + """ + self.send_status(f"Processing inference request for model_id {model_id}", sesssion_id=session_id) + try: + model = self.get_model_from_combiner(str(model_id)) + if model is None: + logger.error("Could not retrieve model from combiner. Aborting inference request.") + return + inpath = self.helper.get_tmp_path() + + with open(inpath, "wb") as fh: + fh.write(model.getbuffer()) + + outpath = get_tmp_path() + self.dispatcher.run_cmd(f"predict {inpath} {outpath}") + + # Upload the inference result to the presigned URL + with open(outpath, "rb") as fh: + response = requests.put(presigned_url, data=fh.read()) + + os.unlink(inpath) + os.unlink(outpath) + + if response.status_code != 200: + logger.warning("Inference upload failed with status code {}".format(response.status_code)) + self.state = ClientState.idle + return + + except Exception as e: + logger.warning("Inference failed with exception {}".format(e)) + self.state = ClientState.idle + return + + self.state = ClientState.idle + return + def process_request(self): """Process training and validation tasks.""" while True: @@ -682,6 +732,22 @@ def process_request(self): self.state = ClientState.idle self.inbox.task_done() + elif task_type == "infer": + self.state = ClientState.inferencing + try: + presigned_url = json.loads(request.data) + except json.JSONDecodeError as e: + logger.error(f"Failed to decode inference request data: {e}") + self.state = ClientState.idle + continue + + if "presigned_url" not in presigned_url: + logger.error("Inference request missing presigned_url.") + self.state = ClientState.idle + continue + presigned_url = presigned_url["presigned_url"] + _ = self._process_inference_request(request.model_id, request.session_id, presigned_url) + self.state = ClientState.idle except queue.Empty: pass except grpc.RpcError as e: diff --git a/fedn/network/clients/state.py b/fedn/network/clients/state.py index a349f846e..d7f82a769 100644 --- a/fedn/network/clients/state.py +++ b/fedn/network/clients/state.py @@ -7,6 +7,7 @@ class ClientState(Enum): idle = 1 training = 2 validating = 3 + inferencing = 4 def ClientStateToString(state): diff --git a/fedn/network/combiner/combiner.py b/fedn/network/combiner/combiner.py index 70755ac6b..a8abd78ab 100644 --- a/fedn/network/combiner/combiner.py +++ b/fedn/network/combiner/combiner.py @@ -244,16 +244,17 @@ def _send_request_type(self, request_type, session_id, model_id, config=None, cl if len(clients) == 0: clients = self.get_active_validators() elif request_type == fedn.StatusType.INFERENCE: - request.data = json.dumps(config) if len(clients) == 0: # TODO: add inference clients type clients = self.get_active_validators() - # TODO: if inference, request.data should be user-defined data/parameters - for client in clients: request.receiver.name = client request.receiver.role = fedn.WORKER + if request_type == fedn.StatusType.INFERENCE: + presigned_url = self.repository.presigned_put_url(self.repository.inference_bucket, f"{client}/{session_id}") + # TODO: in inference, request.data should also contain user-defined data/parameters + request.data = json.dumps({"presigned_url": presigned_url}) self._put_request_to_client_queue(request, fedn.Queue.TASK_QUEUE) return request, clients diff --git a/fedn/network/storage/s3/repository.py b/fedn/network/storage/s3/repository.py index c1704e5ca..2a5ee3449 100644 --- a/fedn/network/storage/s3/repository.py +++ b/fedn/network/storage/s3/repository.py @@ -1,3 +1,4 @@ +import datetime import uuid from fedn.common.log_config import logger @@ -10,12 +11,17 @@ class Repository: def __init__(self, config): self.model_bucket = config["storage_bucket"] self.context_bucket = config["context_bucket"] + try: + self.inference_bucket = config["inference_bucket"] + except KeyError: + self.inference_bucket = "fedn-inference" # TODO: Make a plug-in solution self.client = MINIORepository(config) self.client.create_bucket(self.context_bucket) self.client.create_bucket(self.model_bucket) + self.client.create_bucket(self.inference_bucket) def get_model(self, model_id): """Retrieve a model with id model_id. @@ -104,3 +110,31 @@ def delete_compute_package(self, compute_package): except Exception: logger.error("Failed to delete compute_package from repository.") raise + + def presigned_put_url(self, bucket: str, object_name: str, expires: datetime.timedelta = datetime.timedelta(hours=1)): + """Generate a presigned URL for an upload object request. + + :param bucket: The bucket name + :type bucket: str + :param object_name: The object name + :type object_name: str + :param expires: The time the URL is valid + :type expires: datetime.timedelta + :return: The URL + :rtype: str + """ + return self.client.client.presigned_put_object(bucket, object_name, expires) + + def presigned_get_url(self, bucket: str, object_name: str, expires: datetime.timedelta = datetime.timedelta(hours=1)) -> str: + """Generate a presigned URL for a download object request. + + :param bucket: The bucket name + :type bucket: str + :param object_name: The object name + :type object_name: str + :param expires: The time the URL is valid + :type expires: datetime.timedelta + :return: The URL + :rtype: str + """ + return self.client.client.presigned_get_object(bucket, object_name, expires) From 5e58d4d92a83afca7b87f458800c8a245c979129 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Tue, 11 Jun 2024 15:40:01 +0000 Subject: [PATCH 2/4] fix import --- examples/mnist-pytorch/client/predict.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/examples/mnist-pytorch/client/predict.py b/examples/mnist-pytorch/client/predict.py index b37765c3b..aaf9f0f50 100644 --- a/examples/mnist-pytorch/client/predict.py +++ b/examples/mnist-pytorch/client/predict.py @@ -5,8 +5,6 @@ from data import load_data from model import load_parameters -from fedn.utils.helpers.helpers import save_metrics - dir_path = os.path.dirname(os.path.realpath(__file__)) sys.path.append(os.path.abspath(dir_path)) From 5d4212f707db35cb06aacc01d9af508357b0d159 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Thu, 13 Jun 2024 14:21:55 +0000 Subject: [PATCH 3/4] fix request instancd same between clients --- fedn/network/combiner/combiner.py | 59 +++++++++++++++---------------- 1 file changed, 28 insertions(+), 31 deletions(-) diff --git a/fedn/network/combiner/combiner.py b/fedn/network/combiner/combiner.py index a8abd78ab..48e62466c 100644 --- a/fedn/network/combiner/combiner.py +++ b/fedn/network/combiner/combiner.py @@ -169,12 +169,12 @@ def request_model_update(self, session_id, model_id, config, clients=[]): :type clients: list """ - request, clients = self._send_request_type(fedn.StatusType.MODEL_UPDATE, session_id, model_id, config, clients) + clients = self._send_request_type(fedn.StatusType.MODEL_UPDATE, session_id, model_id, config, clients) if len(clients) < 20: - logger.info("Sent model update request for model {} to clients {}".format(request.model_id, clients)) + logger.info("Sent model update request for model {} to clients {}".format(model_id, clients)) else: - logger.info("Sent model update request for model {} to {} clients".format(request.model_id, len(clients))) + logger.info("Sent model update request for model {} to {} clients".format(model_id, len(clients))) def request_model_validation(self, session_id, model_id, clients=[]): """Ask clients to validate the current global model. @@ -187,12 +187,12 @@ def request_model_validation(self, session_id, model_id, clients=[]): :type clients: list """ - request, clients = self._send_request_type(fedn.StatusType.MODEL_VALIDATION, session_id, model_id, clients) + clients = self._send_request_type(fedn.StatusType.MODEL_VALIDATION, session_id, model_id, clients) if len(clients) < 20: - logger.info("Sent model validation request for model {} to clients {}".format(request.model_id, clients)) + logger.info("Sent model validation request for model {} to clients {}".format(model_id, clients)) else: - logger.info("Sent model validation request for model {} to {} clients".format(request.model_id, len(clients))) + logger.info("Sent model validation request for model {} to {} clients".format(model_id, len(clients))) def request_model_inference(self, session_id: str, model_id: str, clients: list = []) -> None: """Ask clients to perform inference on the model. @@ -205,12 +205,12 @@ def request_model_inference(self, session_id: str, model_id: str, clients: list :type clients: list """ - request, clients = self._send_request_type(fedn.StatusType.INFERENCE, session_id, model_id, clients) + clients = self._send_request_type(fedn.StatusType.INFERENCE, session_id, model_id, clients) if len(clients) < 20: - logger.info("Sent model inference request for model {} to clients {}".format(request.model_id, clients)) + logger.info("Sent model inference request for model {} to clients {}".format(model_id, clients)) else: - logger.info("Sent model inference request for model {} to {} clients".format(request.model_id, len(clients))) + logger.info("Sent model inference request for model {} to {} clients".format(model_id, len(clients))) def _send_request_type(self, request_type, session_id, model_id, config=None, clients=[]): """Send a request of a specific type to clients. @@ -223,41 +223,38 @@ def _send_request_type(self, request_type, session_id, model_id, config=None, cl :type config: dict :param clients: the clients to send the request to :type clients: list - :return: the request and the clients - :rtype: tuple + :return: the clients + :rtype: list """ - request = fedn.TaskRequest() - request.model_id = model_id - request.correlation_id = str(uuid.uuid4()) - request.timestamp = str(datetime.now()) - request.type = request_type - request.session_id = session_id - - request.sender.name = self.id - request.sender.role = fedn.COMBINER - - if request_type == fedn.StatusType.MODEL_UPDATE: - request.data = json.dumps(config) - if len(clients) == 0: + if len(clients) == 0: + if request_type == fedn.StatusType.MODEL_UPDATE: clients = self.get_active_trainers() - elif request_type == fedn.StatusType.MODEL_VALIDATION: - if len(clients) == 0: + elif request_type == fedn.StatusType.MODEL_VALIDATION: clients = self.get_active_validators() - elif request_type == fedn.StatusType.INFERENCE: - if len(clients) == 0: + elif request_type == fedn.StatusType.INFERENCE: # TODO: add inference clients type clients = self.get_active_validators() - for client in clients: + request = fedn.TaskRequest() + request.model_id = model_id + request.correlation_id = str(uuid.uuid4()) + request.timestamp = str(datetime.now()) + request.type = request_type + request.session_id = session_id + + request.sender.name = self.id + request.sender.role = fedn.COMBINER request.receiver.name = client request.receiver.role = fedn.WORKER + # Set the request data, not used in validation if request_type == fedn.StatusType.INFERENCE: presigned_url = self.repository.presigned_put_url(self.repository.inference_bucket, f"{client}/{session_id}") # TODO: in inference, request.data should also contain user-defined data/parameters request.data = json.dumps({"presigned_url": presigned_url}) + elif request_type == fedn.StatusType.MODEL_UPDATE: + request.data = json.dumps(config) self._put_request_to_client_queue(request, fedn.Queue.TASK_QUEUE) - - return request, clients + return clients def get_active_trainers(self): """Get a list of active trainers. From 7422f6057df21b3477341090609adcc543691c5d Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Fri, 14 Jun 2024 08:35:40 +0000 Subject: [PATCH 4/4] fix --- fedn/network/clients/client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fedn/network/clients/client.py b/fedn/network/clients/client.py index 81480d508..6f5edd332 100644 --- a/fedn/network/clients/client.py +++ b/fedn/network/clients/client.py @@ -445,9 +445,9 @@ def _listen_to_task_stream(self): self.inbox.put(("validate", request)) elif request.type == fedn.StatusType.INFERENCE and self.config["validator"]: logger.info("Received inference request for model_id {}".format(request.model_id)) - presined_url = json.loads(request.data) - presined_url = presined_url["presigned_url"] - logger.info("Inference presigned URL: {}".format(presined_url)) + presigned_url = json.loads(request.data) + presigned_url = presigned_url["presigned_url"] + logger.info("Inference presigned URL: {}".format(presigned_url)) self.inbox.put(("infer", request)) else: logger.error("Unknown request type: {}".format(request.type))