diff --git a/.github/workflows/integration-tests.yaml b/.github/workflows/integration-tests.yaml index 124e82afe..39f5781cc 100644 --- a/.github/workflows/integration-tests.yaml +++ b/.github/workflows/integration-tests.yaml @@ -38,9 +38,9 @@ jobs: - name: run ${{ matrix.to_test }} run: .ci/tests/examples/run.sh ${{ matrix.to_test }} - - name: run ${{ matrix.to_test }} inference - run: .ci/tests/examples/run_inference.sh ${{ matrix.to_test }} - if: ${{ matrix.os != 'macos-11' && matrix.to_test == 'mnist-keras keras' }} # example available for Keras + # - name: run ${{ matrix.to_test }} inference + # run: .ci/tests/examples/run_inference.sh ${{ matrix.to_test }} + # if: ${{ matrix.os != 'macos-11' && matrix.to_test == 'mnist-keras keras' }} # example available for Keras - name: print logs if: failure() diff --git a/fedn/fedn/network/clients/client.py b/fedn/fedn/network/clients/client.py index 23c63601b..184383aec 100644 --- a/fedn/fedn/network/clients/client.py +++ b/fedn/fedn/network/clients/client.py @@ -284,15 +284,11 @@ def _subscribe_to_combiner(self, config): # Start sending heartbeats to the combiner. threading.Thread(target=self._send_heartbeat, kwargs={ - 'update_frequency': config['heartbeat_interval']}, daemon=True).start() + 'update_frequency': config['heartbeat_interval']}, daemon=True).start() # Start listening for combiner training and validation messages - if config['trainer']: - threading.Thread( - target=self._listen_to_model_update_request_stream, daemon=True).start() - if config['validator']: - threading.Thread( - target=self._listen_to_model_validation_request_stream, daemon=True).start() + threading.Thread( + target=self._listen_to_task_stream, daemon=True).start() self._attached = True # Start processing the client message inbox @@ -359,7 +355,7 @@ def _initialize_dispatcher(self, config): copy_tree(from_path, self.run_path) self.dispatcher = Dispatcher(dispatch_config, self.run_path) - def get_model_from_combiner(self, id): + def get_model_from_combiner(self, id, timeout=20): """Fetch a model from the assigned combiner. Downloads the model update object via a gRPC streaming channel. @@ -369,8 +365,12 @@ def get_model_from_combiner(self, id): :rtype: BytesIO """ data = BytesIO() + time_start = time.time() + request = fedn.ModelRequest(id=id) + request.sender.name = self.name + request.sender.role = fedn.WORKER - for part in self.modelStub.Download(fedn.ModelRequest(id=id), metadata=self.metadata): + for part in self.modelStub.Download(request, metadata=self.metadata): if part.status == fedn.ModelStatus.IN_PROGRESS: data.write(part.data) @@ -381,6 +381,11 @@ def get_model_from_combiner(self, id): if part.status == fedn.ModelStatus.FAILED: return None + if part.status == fedn.ModelStatus.UNKNOWN: + if time.time() - time_start >= timeout: + return None + continue + return data def send_model_to_combiner(self, model, id): @@ -408,7 +413,7 @@ def send_model_to_combiner(self, model, id): return result - def _listen_to_model_update_request_stream(self): + def _listen_to_task_stream(self): """Subscribe to the model update request stream. :return: None @@ -423,16 +428,21 @@ def _listen_to_model_update_request_stream(self): while self._attached: try: - for request in self.combinerStub.ModelUpdateRequestStream(r, metadata=self.metadata): + for request in self.combinerStub.TaskStream(r, metadata=self.metadata): if request: logger.debug("Received model update request from combiner: {}.".format(request)) if request.sender.role == fedn.COMBINER: # Process training request self._send_status("Received model update request.", log_level=fedn.Status.AUDIT, type=fedn.StatusType.MODEL_UPDATE_REQUEST, request=request) - logger.info("Received model update request.") + logger.info("Received model update request of type {} for model_id {}".format(request.type, request.model_id)) - self.inbox.put(('train', request)) + if request.type == fedn.StatusType.MODEL_UPDATE and self.config['trainer']: + self.inbox.put(('train', request)) + elif request.type == fedn.StatusType.MODEL_VALIDATION and self.config['validator']: + self.inbox.put(('validate', request)) + else: + logger.error("Unknown request type: {}".format(request.type)) except grpc.RpcError as e: # Handle gRPC errors @@ -453,45 +463,6 @@ def _listen_to_model_update_request_stream(self): if not self._attached: return - def _listen_to_model_validation_request_stream(self): - """Subscribe to the model validation request stream. - - :return: None - :rtype: None - """ - - r = fedn.ClientAvailableMessage() - r.sender.name = self.name - r.sender.role = fedn.WORKER - while True: - try: - for request in self.combinerStub.ModelValidationRequestStream(r, metadata=self.metadata): - # Process validation request - model_id = request.model_id - self._send_status("Received model validation request for model_id {}".format(model_id), - log_level=fedn.Status.AUDIT, type=fedn.StatusType.MODEL_VALIDATION_REQUEST, - request=request) - logger.info("Received model validation request for model_id {}".format(model_id)) - self.inbox.put(('validate', request)) - - except grpc.RpcError as e: - # Handle gRPC errors - status_code = e.code() - if status_code == grpc.StatusCode.UNAVAILABLE: - logger.warning("GRPC server unavailable during model validation request stream. Retrying.") - # Retry after a delay - time.sleep(5) - else: - # Log the error and continue - logger.error(f"An error occurred during model validation request stream: {e}") - - except Exception as ex: - # Handle other exceptions - logger.error(f"An error occurred during model validation request stream: {ex}") - - if not self._attached: - return - def _process_training_request(self, model_id): """Process a training (model update) request. @@ -509,6 +480,9 @@ def _process_training_request(self, model_id): meta = {} tic = time.time() mdl = self.get_model_from_combiner(str(model_id)) + if mdl is None: + logger.error("Could not retrieve model from combiner. Aborting training request.") + return None, None meta['fetch_model'] = time.time() - tic inpath = self.helper.get_tmp_path() @@ -573,6 +547,9 @@ def _process_validation_request(self, model_id, is_inference): self.state = ClientState.validating try: model = self.get_model_from_combiner(str(model_id)) + if model is None: + logger.error("Could not retrieve model from combiner. Aborting validation request.") + return None inpath = self.helper.get_tmp_path() with open(inpath, "wb") as fh: @@ -641,7 +618,7 @@ def process_request(self): elif task_type == 'validate': self.state = ClientState.validating metrics = self._process_validation_request( - request.model_id, request.is_inference) + request.model_id, False) if metrics is not None: # Send validation @@ -658,11 +635,7 @@ def process_request(self): _ = self.combinerStub.SendModelValidation( validation, metadata=self.metadata) - # Set status type - if request.is_inference: - status_type = fedn.StatusType.INFERENCE - else: - status_type = fedn.StatusType.MODEL_VALIDATION + status_type = fedn.StatusType.MODEL_VALIDATION self._send_status("Model validation completed.", log_level=fedn.Status.AUDIT, type=status_type, request=validation) diff --git a/fedn/fedn/network/combiner/combiner.py b/fedn/fedn/network/combiner/combiner.py index 0233583ff..94d28c797 100644 --- a/fedn/fedn/network/combiner/combiner.py +++ b/fedn/fedn/network/combiner/combiner.py @@ -169,11 +169,12 @@ def request_model_update(self, config, clients=[]): """ # The request to be added to the client queue - request = fedn.ModelUpdateRequest() + request = fedn.TaskRequest() request.model_id = config['model_id'] request.correlation_id = str(uuid.uuid4()) request.timestamp = str(datetime.now()) request.data = json.dumps(config) + request.type = fedn.StatusType.MODEL_UPDATE request.sender.name = self.id request.sender.role = fedn.COMBINER @@ -184,7 +185,7 @@ def request_model_update(self, config, clients=[]): for client in clients: request.receiver.name = client request.receiver.role = fedn.WORKER - self._put_request_to_client_queue(request, fedn.Channel.MODEL_UPDATE_REQUESTS) + self._put_request_to_client_queue(request, fedn.Queue.TASK_QUEUE) if len(clients) < 20: logger.info("Sent model update request for model {} to clients {}".format( @@ -205,11 +206,15 @@ def request_model_validation(self, model_id, config, clients=[]): """ # The request to be added to the client queue - request = fedn.ModelValidationRequest() + request = fedn.TaskRequest() request.model_id = model_id - request.correlation_id = str(uuid.uuid4()) # Obsolete? + request.correlation_id = str(uuid.uuid4()) request.timestamp = str(datetime.now()) - request.is_inference = (config['task'] == 'inference') + # request.is_inference = (config['task'] == 'inference') + request.type = fedn.StatusType.MODEL_VALIDATION + + request.sender.name = self.id + request.sender.role = fedn.COMBINER if len(clients) == 0: clients = self.get_active_validators() @@ -217,7 +222,7 @@ def request_model_validation(self, model_id, config, clients=[]): for client in clients: request.receiver.name = client request.receiver.role = fedn.WORKER - self._put_request_to_client_queue(request, fedn.Channel.MODEL_VALIDATION_REQUESTS) + self._put_request_to_client_queue(request, fedn.Queue.TASK_QUEUE) if len(clients) < 20: logger.info("Sent model validation request for model {} to clients {}".format( @@ -232,7 +237,7 @@ def get_active_trainers(self): :return: the list of active trainers :rtype: list """ - trainers = self._list_active_clients(fedn.Channel.MODEL_UPDATE_REQUESTS) + trainers = self._list_active_clients(fedn.Queue.TASK_QUEUE) return trainers def get_active_validators(self): @@ -241,7 +246,7 @@ def get_active_validators(self): :return: the list of active validators :rtype: list """ - validators = self._list_active_clients(fedn.Channel.MODEL_VALIDATION_REQUESTS) + validators = self._list_active_clients(fedn.Queue.TASK_QUEUE) return validators def nr_active_trainers(self): @@ -349,7 +354,7 @@ def _deamon_thread_client_status(self, timeout=10): while True: time.sleep(timeout) # TODO: Also update validation clients - self._list_active_clients(fedn.Channel.MODEL_UPDATE_REQUESTS) + self._list_active_clients(fedn.Queue.TASK_QUEUE) def _put_request_to_client_queue(self, request, queue_name): """ Get a client specific queue and add a request to it. @@ -545,7 +550,7 @@ def AcceptingClients(self, request: fedn.ConnectionRequest, context): """ response = fedn.ConnectionResponse() active_clients = self._list_active_clients( - fedn.Channel.MODEL_UPDATE_REQUESTS) + fedn.Queue.TASK_QUEUE) try: requested = int(self.max_clients) @@ -588,33 +593,7 @@ def SendHeartbeat(self, heartbeat: fedn.Heartbeat, context): # Combiner Service - def ModelUpdateStream(self, update, context): - """ Model update stream RPC endpoint. Update status for client is connecting to stream. - - :param update: the update message - :type update: :class:`fedn.network.grpc.fedn_pb2.ModelUpdate` - :param context: the context - :type context: :class:`grpc._server._Context` - """ - client = update.sender - status = fedn.Status( - status="Client {} connecting to ModelUpdateStream.".format(client.name)) - status.log_level = fedn.Status.INFO - status.sender.name = self.id - status.sender.role = role_to_proto_role(self.role) - - self._subscribe_client_to_queue(client, fedn.Channel.MODEL_UPDATES) - q = self.__get_queue(client, fedn.Channel.MODEL_UPDATES) - - self._send_status(status) - - while context.is_active(): - try: - yield q.get(timeout=1.0) - except queue.Empty: - pass - - def ModelUpdateRequestStream(self, response, context): + def TaskStream(self, response, context): """ A server stream RPC endpoint (Update model). Messages from client stream. :param response: the response @@ -627,18 +606,18 @@ def ModelUpdateRequestStream(self, response, context): metadata = context.invocation_metadata() if metadata: metadata = dict(metadata) - logger.info("grpc.Combiner.ModelUpdateRequestStream: Client connected: {}\n".format(metadata['client'])) + logger.info("grpc.Combiner.TaskStream: Client connected: {}\n".format(metadata['client'])) status = fedn.Status( - status="Client {} connecting to ModelUpdateRequestStream.".format(client.name)) + status="Client {} connecting to TaskStream.".format(client.name)) status.log_level = fedn.Status.INFO status.timestamp.GetCurrentTime() self.__whoami(status.sender, self) self._subscribe_client_to_queue( - client, fedn.Channel.MODEL_UPDATE_REQUESTS) - q = self.__get_queue(client, fedn.Channel.MODEL_UPDATE_REQUESTS) + client, fedn.Queue.TASK_QUEUE) + q = self.__get_queue(client, fedn.Queue.TASK_QUEUE) self._send_status(status) @@ -657,62 +636,6 @@ def ModelUpdateRequestStream(self, response, context): except Exception as e: logger.error("Error in ModelUpdateRequestStream: {}".format(e)) - def ModelValidationStream(self, update, context): - """ Model validation stream RPC endpoint. Update status for client is connecting to stream. - - :param update: the update message - :type update: :class:`fedn.network.grpc.fedn_pb2.ModelValidation` - :param context: the context - :type context: :class:`grpc._server._Context` - """ - client = update.sender - status = fedn.Status( - status="Client {} connecting to ModelValidationStream.".format(client.name)) - status.log_level = fedn.Status.INFO - - status.sender.name = self.id - status.sender.role = role_to_proto_role(self.role) - - self._subscribe_client_to_queue(client, fedn.Channel.MODEL_VALIDATIONS) - q = self.__get_queue(client, fedn.Channel.MODEL_VALIDATIONS) - - self._send_status(status) - - while context.is_active(): - try: - yield q.get(timeout=1.0) - except queue.Empty: - pass - - def ModelValidationRequestStream(self, response, context): - """ A server stream RPC endpoint (Validation). Messages from client stream. - - :param response: the response - :type response: :class:`fedn.network.grpc.fedn_pb2.ModelValidationRequest` - :param context: the context - :type context: :class:`grpc._server._Context` - """ - - client = response.sender - status = fedn.Status( - status="Client {} connecting to ModelValidationRequestStream.".format(client.name)) - status.log_level = fedn.Status.INFO - status.sender.name = self.id - status.sender.role = role_to_proto_role(self.role) - status.timestamp.GetCurrentTime() - - self._subscribe_client_to_queue( - client, fedn.Channel.MODEL_VALIDATION_REQUESTS) - q = self.__get_queue(client, fedn.Channel.MODEL_VALIDATION_REQUESTS) - - self._send_status(status) - - while context.is_active(): - try: - yield q.get(timeout=1.0) - except queue.Empty: - pass - def SendModelUpdate(self, request, context): """ Send a model update response. diff --git a/fedn/fedn/network/combiner/interfaces.py b/fedn/fedn/network/combiner/interfaces.py index 1bb93168b..69b987be0 100644 --- a/fedn/fedn/network/combiner/interfaces.py +++ b/fedn/fedn/network/combiner/interfaces.py @@ -1,6 +1,7 @@ import base64 import copy import json +import time from io import BytesIO import grpc @@ -239,7 +240,7 @@ def submit(self, config): return response - def get_model(self, id): + def get_model(self, id, timeout=10): """ Download a model from the combiner server. :param id: The model id. @@ -255,7 +256,13 @@ def get_model(self, id): data = BytesIO() data.seek(0, 0) - parts = modelservice.Download(fedn.ModelRequest(id=id)) + time_start = time.time() + + request = fedn.ModelRequest(id=id) + request.sender.name = self.name + request.sender.role = fedn.WORKER + + parts = modelservice.Download(request) for part in parts: if part.status == fedn.ModelStatus.IN_PROGRESS: data.write(part.data) @@ -263,6 +270,10 @@ def get_model(self, id): return data if part.status == fedn.ModelStatus.FAILED: return None + if part.status == fedn.ModelStatus.UNKNOWN: + if time.time() - time_start > timeout: + return None + continue def allowing_clients(self): """ Check if the combiner is allowing additional client connections. diff --git a/fedn/fedn/network/combiner/modelservice.py b/fedn/fedn/network/combiner/modelservice.py index 909d5936a..59c63108b 100644 --- a/fedn/fedn/network/combiner/modelservice.py +++ b/fedn/fedn/network/combiner/modelservice.py @@ -179,17 +179,20 @@ def Download(self, request, context): :return: A model response iterator. :rtype: :class:`fedn.network.grpc.fedn_pb2.ModelResponse` """ - logger.debug("grpc.ModelService.Download: Called") + logger.info(f'grpc.ModelService.Download: {request.sender.role}:{request.sender.name} requested model {request.id}') try: - if self.temp_model_storage.get_model_metadata(request.id) != fedn.ModelStatus.OK: - logger.error("Error file is not ready") - yield fedn.ModelResponse(id=request.id, data=None, status=fedn.ModelStatus.FAILED) + status = self.temp_model_storage.get_model_metadata(request.id) + if status != fedn.ModelStatus.OK: + logger.error(f'model file is not ready: {request.id}, status: {status}') + yield fedn.ModelResponse(id=request.id, data=None, status=status) except Exception: logger.error("Error file does not exist: {}".format(request.id)) yield fedn.ModelResponse(id=request.id, data=None, status=fedn.ModelStatus.FAILED) try: obj = self.temp_model_storage.get(request.id) + if obj is None: + raise Exception(f'File not found: {request.id}') with obj as f: while True: piece = f.read(CHUNK_SIZE) @@ -199,3 +202,4 @@ def Download(self, request, context): yield fedn.ModelResponse(id=request.id, data=piece, status=fedn.ModelStatus.IN_PROGRESS) except Exception as e: logger.error("Downloading went wrong: {} {}".format(request.id, e)) + yield fedn.ModelResponse(id=request.id, data=None, status=fedn.ModelStatus.FAILED) diff --git a/fedn/fedn/network/grpc/fedn.proto b/fedn/fedn/network/grpc/fedn.proto index 8852fb95f..fc5cd9cdb 100644 --- a/fedn/fedn/network/grpc/fedn.proto +++ b/fedn/fedn/network/grpc/fedn.proto @@ -39,16 +39,12 @@ message Status { } -enum Channel { +enum Queue { DEFAULT = 0; - MODEL_UPDATE_REQUESTS = 1; - MODEL_UPDATES = 2; - MODEL_VALIDATION_REQUESTS = 3; - MODEL_VALIDATIONS = 4; - STATUS = 5; + TASK_QUEUE = 1; } -message ModelUpdateRequest { +message TaskRequest { Client sender = 1; Client receiver = 2; string model_id = 3; @@ -56,6 +52,8 @@ message ModelUpdateRequest { string correlation_id = 5; string timestamp = 6; string meta = 7; + string session_id = 8; + StatusType type = 9; } message ModelUpdate { @@ -68,17 +66,6 @@ message ModelUpdate { string meta = 7; } -message ModelValidationRequest { - Client sender = 1; - Client receiver = 2; - string model_id = 3; - string data = 4; - string correlation_id = 5; - string timestamp = 6; - string meta = 7; - bool is_inference = 8; -} - message ModelValidation { Client sender = 1; Client receiver = 2; @@ -94,6 +81,7 @@ enum ModelStatus { IN_PROGRESS = 1; IN_PROGRESS_OK = 2; FAILED = 3; + UNKNOWN = 4; } message ModelRequest { @@ -140,7 +128,7 @@ message ClientAvailableMessage { message ListClientsRequest { Client sender = 1; - Channel channel = 2; + Queue channel = 2; } message ClientList { @@ -244,10 +232,7 @@ service Connector { service Combiner { // Stream endpoints for training/validation pub/sub - rpc ModelUpdateRequestStream (ClientAvailableMessage) returns (stream ModelUpdateRequest); - rpc ModelUpdateStream (ClientAvailableMessage) returns (stream ModelUpdate); - rpc ModelValidationRequestStream (ClientAvailableMessage) returns (stream ModelValidationRequest); - rpc ModelValidationStream (ClientAvailableMessage) returns (stream ModelValidation); + rpc TaskStream (ClientAvailableMessage) returns (stream TaskRequest); rpc SendModelUpdate (ModelUpdate) returns (Response); rpc SendModelValidation (ModelValidation) returns (Response); diff --git a/fedn/fedn/network/grpc/fedn_pb2.py b/fedn/fedn/network/grpc/fedn_pb2.py index a585eb0b5..0b5718fa6 100644 --- a/fedn/fedn/network/grpc/fedn_pb2.py +++ b/fedn/fedn/network/grpc/fedn_pb2.py @@ -14,7 +14,7 @@ from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1c\x66\x65\x64n/network/grpc/fedn.proto\x12\x04grpc\x1a\x1fgoogle/protobuf/timestamp.proto\":\n\x08Response\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08response\x18\x02 \x01(\t\"\xa8\x02\n\x06Status\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x0e\n\x06status\x18\x02 \x01(\t\x12(\n\tlog_level\x18\x03 \x01(\x0e\x32\x15.grpc.Status.LogLevel\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12-\n\ttimestamp\x18\x06 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x1e\n\x04type\x18\x07 \x01(\x0e\x32\x10.grpc.StatusType\x12\r\n\x05\x65xtra\x18\x08 \x01(\t\"B\n\x08LogLevel\x12\x08\n\x04INFO\x10\x00\x12\t\n\x05\x44\x45\x42UG\x10\x01\x12\x0b\n\x07WARNING\x10\x02\x12\t\n\x05\x45RROR\x10\x03\x12\t\n\x05\x41UDIT\x10\x04\"\xab\x01\n\x12ModelUpdateRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12\x11\n\ttimestamp\x18\x06 \x01(\t\x12\x0c\n\x04meta\x18\x07 \x01(\t\"\xaf\x01\n\x0bModelUpdate\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\x12\x17\n\x0fmodel_update_id\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12\x11\n\ttimestamp\x18\x06 \x01(\t\x12\x0c\n\x04meta\x18\x07 \x01(\t\"\xc5\x01\n\x16ModelValidationRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12\x11\n\ttimestamp\x18\x06 \x01(\t\x12\x0c\n\x04meta\x18\x07 \x01(\t\x12\x14\n\x0cis_inference\x18\x08 \x01(\x08\"\xc4\x01\n\x0fModelValidation\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12-\n\ttimestamp\x18\x06 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0c\n\x04meta\x18\x07 \x01(\t\"\x89\x01\n\x0cModelRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\x12\n\n\x02id\x18\x04 \x01(\t\x12!\n\x06status\x18\x05 \x01(\x0e\x32\x11.grpc.ModelStatus\"]\n\rModelResponse\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\x12\n\n\x02id\x18\x02 \x01(\t\x12!\n\x06status\x18\x03 \x01(\x0e\x32\x11.grpc.ModelStatus\x12\x0f\n\x07message\x18\x04 \x01(\t\"U\n\x15GetGlobalModelRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\"h\n\x16GetGlobalModelResponse\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\")\n\tHeartbeat\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\"W\n\x16\x43lientAvailableMessage\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\t\x12\x11\n\ttimestamp\x18\x03 \x01(\t\"R\n\x12ListClientsRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x07\x63hannel\x18\x02 \x01(\x0e\x32\r.grpc.Channel\"*\n\nClientList\x12\x1c\n\x06\x63lient\x18\x01 \x03(\x0b\x32\x0c.grpc.Client\"0\n\x06\x43lient\x12\x18\n\x04role\x18\x01 \x01(\x0e\x32\n.grpc.Role\x12\x0c\n\x04name\x18\x02 \x01(\t\"m\n\x0fReassignRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x0e\n\x06server\x18\x03 \x01(\t\x12\x0c\n\x04port\x18\x04 \x01(\r\"c\n\x10ReconnectRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x11\n\treconnect\x18\x03 \x01(\r\"\'\n\tParameter\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t\"T\n\x0e\x43ontrolRequest\x12\x1e\n\x07\x63ommand\x18\x01 \x01(\x0e\x32\r.grpc.Command\x12\"\n\tparameter\x18\x02 \x03(\x0b\x32\x0f.grpc.Parameter\"F\n\x0f\x43ontrolResponse\x12\x0f\n\x07message\x18\x01 \x01(\t\x12\"\n\tparameter\x18\x02 \x03(\x0b\x32\x0f.grpc.Parameter\"\x13\n\x11\x43onnectionRequest\"<\n\x12\x43onnectionResponse\x12&\n\x06status\x18\x01 \x01(\x0e\x32\x16.grpc.ConnectionStatus*\x84\x01\n\nStatusType\x12\x07\n\x03LOG\x10\x00\x12\x18\n\x14MODEL_UPDATE_REQUEST\x10\x01\x12\x10\n\x0cMODEL_UPDATE\x10\x02\x12\x1c\n\x18MODEL_VALIDATION_REQUEST\x10\x03\x12\x14\n\x10MODEL_VALIDATION\x10\x04\x12\r\n\tINFERENCE\x10\x05*\x86\x01\n\x07\x43hannel\x12\x0b\n\x07\x44\x45\x46\x41ULT\x10\x00\x12\x19\n\x15MODEL_UPDATE_REQUESTS\x10\x01\x12\x11\n\rMODEL_UPDATES\x10\x02\x12\x1d\n\x19MODEL_VALIDATION_REQUESTS\x10\x03\x12\x15\n\x11MODEL_VALIDATIONS\x10\x04\x12\n\n\x06STATUS\x10\x05*F\n\x0bModelStatus\x12\x06\n\x02OK\x10\x00\x12\x0f\n\x0bIN_PROGRESS\x10\x01\x12\x12\n\x0eIN_PROGRESS_OK\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03*8\n\x04Role\x12\n\n\x06WORKER\x10\x00\x12\x0c\n\x08\x43OMBINER\x10\x01\x12\x0b\n\x07REDUCER\x10\x02\x12\t\n\x05OTHER\x10\x03*J\n\x07\x43ommand\x12\x08\n\x04IDLE\x10\x00\x12\t\n\x05START\x10\x01\x12\t\n\x05PAUSE\x10\x02\x12\x08\n\x04STOP\x10\x03\x12\t\n\x05RESET\x10\x04\x12\n\n\x06REPORT\x10\x05*I\n\x10\x43onnectionStatus\x12\x11\n\rNOT_ACCEPTING\x10\x00\x12\r\n\tACCEPTING\x10\x01\x12\x13\n\x0fTRY_AGAIN_LATER\x10\x02\x32z\n\x0cModelService\x12\x33\n\x06Upload\x12\x12.grpc.ModelRequest\x1a\x13.grpc.ModelResponse(\x01\x12\x35\n\x08\x44ownload\x12\x12.grpc.ModelRequest\x1a\x13.grpc.ModelResponse0\x01\x32\xf8\x01\n\x07\x43ontrol\x12\x34\n\x05Start\x12\x14.grpc.ControlRequest\x1a\x15.grpc.ControlResponse\x12\x33\n\x04Stop\x12\x14.grpc.ControlRequest\x1a\x15.grpc.ControlResponse\x12\x44\n\x15\x46lushAggregationQueue\x12\x14.grpc.ControlRequest\x1a\x15.grpc.ControlResponse\x12<\n\rSetAggregator\x12\x14.grpc.ControlRequest\x1a\x15.grpc.ControlResponse2V\n\x07Reducer\x12K\n\x0eGetGlobalModel\x12\x1b.grpc.GetGlobalModelRequest\x1a\x1c.grpc.GetGlobalModelResponse2\xab\x03\n\tConnector\x12\x44\n\x14\x41llianceStatusStream\x12\x1c.grpc.ClientAvailableMessage\x1a\x0c.grpc.Status0\x01\x12*\n\nSendStatus\x12\x0c.grpc.Status\x1a\x0e.grpc.Response\x12?\n\x11ListActiveClients\x12\x18.grpc.ListClientsRequest\x1a\x10.grpc.ClientList\x12\x45\n\x10\x41\x63\x63\x65ptingClients\x12\x17.grpc.ConnectionRequest\x1a\x18.grpc.ConnectionResponse\x12\x30\n\rSendHeartbeat\x12\x0f.grpc.Heartbeat\x1a\x0e.grpc.Response\x12\x37\n\x0eReassignClient\x12\x15.grpc.ReassignRequest\x1a\x0e.grpc.Response\x12\x39\n\x0fReconnectClient\x12\x16.grpc.ReconnectRequest\x1a\x0e.grpc.Response2\xca\x03\n\x08\x43ombiner\x12T\n\x18ModelUpdateRequestStream\x12\x1c.grpc.ClientAvailableMessage\x1a\x18.grpc.ModelUpdateRequest0\x01\x12\x46\n\x11ModelUpdateStream\x12\x1c.grpc.ClientAvailableMessage\x1a\x11.grpc.ModelUpdate0\x01\x12\\\n\x1cModelValidationRequestStream\x12\x1c.grpc.ClientAvailableMessage\x1a\x1c.grpc.ModelValidationRequest0\x01\x12N\n\x15ModelValidationStream\x12\x1c.grpc.ClientAvailableMessage\x1a\x15.grpc.ModelValidation0\x01\x12\x34\n\x0fSendModelUpdate\x12\x11.grpc.ModelUpdate\x1a\x0e.grpc.Response\x12<\n\x13SendModelValidation\x12\x15.grpc.ModelValidation\x1a\x0e.grpc.Responseb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1c\x66\x65\x64n/network/grpc/fedn.proto\x12\x04grpc\x1a\x1fgoogle/protobuf/timestamp.proto\":\n\x08Response\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08response\x18\x02 \x01(\t\"\xa8\x02\n\x06Status\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x0e\n\x06status\x18\x02 \x01(\t\x12(\n\tlog_level\x18\x03 \x01(\x0e\x32\x15.grpc.Status.LogLevel\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12-\n\ttimestamp\x18\x06 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x1e\n\x04type\x18\x07 \x01(\x0e\x32\x10.grpc.StatusType\x12\r\n\x05\x65xtra\x18\x08 \x01(\t\"B\n\x08LogLevel\x12\x08\n\x04INFO\x10\x00\x12\t\n\x05\x44\x45\x42UG\x10\x01\x12\x0b\n\x07WARNING\x10\x02\x12\t\n\x05\x45RROR\x10\x03\x12\t\n\x05\x41UDIT\x10\x04\"\xd8\x01\n\x0bTaskRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12\x11\n\ttimestamp\x18\x06 \x01(\t\x12\x0c\n\x04meta\x18\x07 \x01(\t\x12\x12\n\nsession_id\x18\x08 \x01(\t\x12\x1e\n\x04type\x18\t \x01(\x0e\x32\x10.grpc.StatusType\"\xaf\x01\n\x0bModelUpdate\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\x12\x17\n\x0fmodel_update_id\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12\x11\n\ttimestamp\x18\x06 \x01(\t\x12\x0c\n\x04meta\x18\x07 \x01(\t\"\xc4\x01\n\x0fModelValidation\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12-\n\ttimestamp\x18\x06 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0c\n\x04meta\x18\x07 \x01(\t\"\x89\x01\n\x0cModelRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\x12\n\n\x02id\x18\x04 \x01(\t\x12!\n\x06status\x18\x05 \x01(\x0e\x32\x11.grpc.ModelStatus\"]\n\rModelResponse\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\x12\n\n\x02id\x18\x02 \x01(\t\x12!\n\x06status\x18\x03 \x01(\x0e\x32\x11.grpc.ModelStatus\x12\x0f\n\x07message\x18\x04 \x01(\t\"U\n\x15GetGlobalModelRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\"h\n\x16GetGlobalModelResponse\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\")\n\tHeartbeat\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\"W\n\x16\x43lientAvailableMessage\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\t\x12\x11\n\ttimestamp\x18\x03 \x01(\t\"P\n\x12ListClientsRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1c\n\x07\x63hannel\x18\x02 \x01(\x0e\x32\x0b.grpc.Queue\"*\n\nClientList\x12\x1c\n\x06\x63lient\x18\x01 \x03(\x0b\x32\x0c.grpc.Client\"0\n\x06\x43lient\x12\x18\n\x04role\x18\x01 \x01(\x0e\x32\n.grpc.Role\x12\x0c\n\x04name\x18\x02 \x01(\t\"m\n\x0fReassignRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x0e\n\x06server\x18\x03 \x01(\t\x12\x0c\n\x04port\x18\x04 \x01(\r\"c\n\x10ReconnectRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x11\n\treconnect\x18\x03 \x01(\r\"\'\n\tParameter\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t\"T\n\x0e\x43ontrolRequest\x12\x1e\n\x07\x63ommand\x18\x01 \x01(\x0e\x32\r.grpc.Command\x12\"\n\tparameter\x18\x02 \x03(\x0b\x32\x0f.grpc.Parameter\"F\n\x0f\x43ontrolResponse\x12\x0f\n\x07message\x18\x01 \x01(\t\x12\"\n\tparameter\x18\x02 \x03(\x0b\x32\x0f.grpc.Parameter\"\x13\n\x11\x43onnectionRequest\"<\n\x12\x43onnectionResponse\x12&\n\x06status\x18\x01 \x01(\x0e\x32\x16.grpc.ConnectionStatus*\x84\x01\n\nStatusType\x12\x07\n\x03LOG\x10\x00\x12\x18\n\x14MODEL_UPDATE_REQUEST\x10\x01\x12\x10\n\x0cMODEL_UPDATE\x10\x02\x12\x1c\n\x18MODEL_VALIDATION_REQUEST\x10\x03\x12\x14\n\x10MODEL_VALIDATION\x10\x04\x12\r\n\tINFERENCE\x10\x05*$\n\x05Queue\x12\x0b\n\x07\x44\x45\x46\x41ULT\x10\x00\x12\x0e\n\nTASK_QUEUE\x10\x01*S\n\x0bModelStatus\x12\x06\n\x02OK\x10\x00\x12\x0f\n\x0bIN_PROGRESS\x10\x01\x12\x12\n\x0eIN_PROGRESS_OK\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03\x12\x0b\n\x07UNKNOWN\x10\x04*8\n\x04Role\x12\n\n\x06WORKER\x10\x00\x12\x0c\n\x08\x43OMBINER\x10\x01\x12\x0b\n\x07REDUCER\x10\x02\x12\t\n\x05OTHER\x10\x03*J\n\x07\x43ommand\x12\x08\n\x04IDLE\x10\x00\x12\t\n\x05START\x10\x01\x12\t\n\x05PAUSE\x10\x02\x12\x08\n\x04STOP\x10\x03\x12\t\n\x05RESET\x10\x04\x12\n\n\x06REPORT\x10\x05*I\n\x10\x43onnectionStatus\x12\x11\n\rNOT_ACCEPTING\x10\x00\x12\r\n\tACCEPTING\x10\x01\x12\x13\n\x0fTRY_AGAIN_LATER\x10\x02\x32z\n\x0cModelService\x12\x33\n\x06Upload\x12\x12.grpc.ModelRequest\x1a\x13.grpc.ModelResponse(\x01\x12\x35\n\x08\x44ownload\x12\x12.grpc.ModelRequest\x1a\x13.grpc.ModelResponse0\x01\x32\xf8\x01\n\x07\x43ontrol\x12\x34\n\x05Start\x12\x14.grpc.ControlRequest\x1a\x15.grpc.ControlResponse\x12\x33\n\x04Stop\x12\x14.grpc.ControlRequest\x1a\x15.grpc.ControlResponse\x12\x44\n\x15\x46lushAggregationQueue\x12\x14.grpc.ControlRequest\x1a\x15.grpc.ControlResponse\x12<\n\rSetAggregator\x12\x14.grpc.ControlRequest\x1a\x15.grpc.ControlResponse2V\n\x07Reducer\x12K\n\x0eGetGlobalModel\x12\x1b.grpc.GetGlobalModelRequest\x1a\x1c.grpc.GetGlobalModelResponse2\xab\x03\n\tConnector\x12\x44\n\x14\x41llianceStatusStream\x12\x1c.grpc.ClientAvailableMessage\x1a\x0c.grpc.Status0\x01\x12*\n\nSendStatus\x12\x0c.grpc.Status\x1a\x0e.grpc.Response\x12?\n\x11ListActiveClients\x12\x18.grpc.ListClientsRequest\x1a\x10.grpc.ClientList\x12\x45\n\x10\x41\x63\x63\x65ptingClients\x12\x17.grpc.ConnectionRequest\x1a\x18.grpc.ConnectionResponse\x12\x30\n\rSendHeartbeat\x12\x0f.grpc.Heartbeat\x1a\x0e.grpc.Response\x12\x37\n\x0eReassignClient\x12\x15.grpc.ReassignRequest\x1a\x0e.grpc.Response\x12\x39\n\x0fReconnectClient\x12\x16.grpc.ReconnectRequest\x1a\x0e.grpc.Response2\xbf\x01\n\x08\x43ombiner\x12?\n\nTaskStream\x12\x1c.grpc.ClientAvailableMessage\x1a\x11.grpc.TaskRequest0\x01\x12\x34\n\x0fSendModelUpdate\x12\x11.grpc.ModelUpdate\x1a\x0e.grpc.Response\x12<\n\x13SendModelValidation\x12\x15.grpc.ModelValidation\x1a\x0e.grpc.Responseb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -22,72 +22,70 @@ if _descriptor._USE_C_DESCRIPTORS == False: DESCRIPTOR._options = None - _globals['_STATUSTYPE']._serialized_start=2414 - _globals['_STATUSTYPE']._serialized_end=2546 - _globals['_CHANNEL']._serialized_start=2549 - _globals['_CHANNEL']._serialized_end=2683 - _globals['_MODELSTATUS']._serialized_start=2685 - _globals['_MODELSTATUS']._serialized_end=2755 - _globals['_ROLE']._serialized_start=2757 - _globals['_ROLE']._serialized_end=2813 - _globals['_COMMAND']._serialized_start=2815 - _globals['_COMMAND']._serialized_end=2889 - _globals['_CONNECTIONSTATUS']._serialized_start=2891 - _globals['_CONNECTIONSTATUS']._serialized_end=2964 + _globals['_STATUSTYPE']._serialized_start=2257 + _globals['_STATUSTYPE']._serialized_end=2389 + _globals['_QUEUE']._serialized_start=2391 + _globals['_QUEUE']._serialized_end=2427 + _globals['_MODELSTATUS']._serialized_start=2429 + _globals['_MODELSTATUS']._serialized_end=2512 + _globals['_ROLE']._serialized_start=2514 + _globals['_ROLE']._serialized_end=2570 + _globals['_COMMAND']._serialized_start=2572 + _globals['_COMMAND']._serialized_end=2646 + _globals['_CONNECTIONSTATUS']._serialized_start=2648 + _globals['_CONNECTIONSTATUS']._serialized_end=2721 _globals['_RESPONSE']._serialized_start=71 _globals['_RESPONSE']._serialized_end=129 _globals['_STATUS']._serialized_start=132 _globals['_STATUS']._serialized_end=428 _globals['_STATUS_LOGLEVEL']._serialized_start=362 _globals['_STATUS_LOGLEVEL']._serialized_end=428 - _globals['_MODELUPDATEREQUEST']._serialized_start=431 - _globals['_MODELUPDATEREQUEST']._serialized_end=602 - _globals['_MODELUPDATE']._serialized_start=605 - _globals['_MODELUPDATE']._serialized_end=780 - _globals['_MODELVALIDATIONREQUEST']._serialized_start=783 - _globals['_MODELVALIDATIONREQUEST']._serialized_end=980 - _globals['_MODELVALIDATION']._serialized_start=983 - _globals['_MODELVALIDATION']._serialized_end=1179 - _globals['_MODELREQUEST']._serialized_start=1182 - _globals['_MODELREQUEST']._serialized_end=1319 - _globals['_MODELRESPONSE']._serialized_start=1321 - _globals['_MODELRESPONSE']._serialized_end=1414 - _globals['_GETGLOBALMODELREQUEST']._serialized_start=1416 - _globals['_GETGLOBALMODELREQUEST']._serialized_end=1501 - _globals['_GETGLOBALMODELRESPONSE']._serialized_start=1503 - _globals['_GETGLOBALMODELRESPONSE']._serialized_end=1607 - _globals['_HEARTBEAT']._serialized_start=1609 - _globals['_HEARTBEAT']._serialized_end=1650 - _globals['_CLIENTAVAILABLEMESSAGE']._serialized_start=1652 - _globals['_CLIENTAVAILABLEMESSAGE']._serialized_end=1739 - _globals['_LISTCLIENTSREQUEST']._serialized_start=1741 - _globals['_LISTCLIENTSREQUEST']._serialized_end=1823 - _globals['_CLIENTLIST']._serialized_start=1825 - _globals['_CLIENTLIST']._serialized_end=1867 - _globals['_CLIENT']._serialized_start=1869 - _globals['_CLIENT']._serialized_end=1917 - _globals['_REASSIGNREQUEST']._serialized_start=1919 - _globals['_REASSIGNREQUEST']._serialized_end=2028 - _globals['_RECONNECTREQUEST']._serialized_start=2030 - _globals['_RECONNECTREQUEST']._serialized_end=2129 - _globals['_PARAMETER']._serialized_start=2131 - _globals['_PARAMETER']._serialized_end=2170 - _globals['_CONTROLREQUEST']._serialized_start=2172 - _globals['_CONTROLREQUEST']._serialized_end=2256 - _globals['_CONTROLRESPONSE']._serialized_start=2258 - _globals['_CONTROLRESPONSE']._serialized_end=2328 - _globals['_CONNECTIONREQUEST']._serialized_start=2330 - _globals['_CONNECTIONREQUEST']._serialized_end=2349 - _globals['_CONNECTIONRESPONSE']._serialized_start=2351 - _globals['_CONNECTIONRESPONSE']._serialized_end=2411 - _globals['_MODELSERVICE']._serialized_start=2966 - _globals['_MODELSERVICE']._serialized_end=3088 - _globals['_CONTROL']._serialized_start=3091 - _globals['_CONTROL']._serialized_end=3339 - _globals['_REDUCER']._serialized_start=3341 - _globals['_REDUCER']._serialized_end=3427 - _globals['_CONNECTOR']._serialized_start=3430 - _globals['_CONNECTOR']._serialized_end=3857 - _globals['_COMBINER']._serialized_start=3860 - _globals['_COMBINER']._serialized_end=4318 + _globals['_TASKREQUEST']._serialized_start=431 + _globals['_TASKREQUEST']._serialized_end=647 + _globals['_MODELUPDATE']._serialized_start=650 + _globals['_MODELUPDATE']._serialized_end=825 + _globals['_MODELVALIDATION']._serialized_start=828 + _globals['_MODELVALIDATION']._serialized_end=1024 + _globals['_MODELREQUEST']._serialized_start=1027 + _globals['_MODELREQUEST']._serialized_end=1164 + _globals['_MODELRESPONSE']._serialized_start=1166 + _globals['_MODELRESPONSE']._serialized_end=1259 + _globals['_GETGLOBALMODELREQUEST']._serialized_start=1261 + _globals['_GETGLOBALMODELREQUEST']._serialized_end=1346 + _globals['_GETGLOBALMODELRESPONSE']._serialized_start=1348 + _globals['_GETGLOBALMODELRESPONSE']._serialized_end=1452 + _globals['_HEARTBEAT']._serialized_start=1454 + _globals['_HEARTBEAT']._serialized_end=1495 + _globals['_CLIENTAVAILABLEMESSAGE']._serialized_start=1497 + _globals['_CLIENTAVAILABLEMESSAGE']._serialized_end=1584 + _globals['_LISTCLIENTSREQUEST']._serialized_start=1586 + _globals['_LISTCLIENTSREQUEST']._serialized_end=1666 + _globals['_CLIENTLIST']._serialized_start=1668 + _globals['_CLIENTLIST']._serialized_end=1710 + _globals['_CLIENT']._serialized_start=1712 + _globals['_CLIENT']._serialized_end=1760 + _globals['_REASSIGNREQUEST']._serialized_start=1762 + _globals['_REASSIGNREQUEST']._serialized_end=1871 + _globals['_RECONNECTREQUEST']._serialized_start=1873 + _globals['_RECONNECTREQUEST']._serialized_end=1972 + _globals['_PARAMETER']._serialized_start=1974 + _globals['_PARAMETER']._serialized_end=2013 + _globals['_CONTROLREQUEST']._serialized_start=2015 + _globals['_CONTROLREQUEST']._serialized_end=2099 + _globals['_CONTROLRESPONSE']._serialized_start=2101 + _globals['_CONTROLRESPONSE']._serialized_end=2171 + _globals['_CONNECTIONREQUEST']._serialized_start=2173 + _globals['_CONNECTIONREQUEST']._serialized_end=2192 + _globals['_CONNECTIONRESPONSE']._serialized_start=2194 + _globals['_CONNECTIONRESPONSE']._serialized_end=2254 + _globals['_MODELSERVICE']._serialized_start=2723 + _globals['_MODELSERVICE']._serialized_end=2845 + _globals['_CONTROL']._serialized_start=2848 + _globals['_CONTROL']._serialized_end=3096 + _globals['_REDUCER']._serialized_start=3098 + _globals['_REDUCER']._serialized_end=3184 + _globals['_CONNECTOR']._serialized_start=3187 + _globals['_CONNECTOR']._serialized_end=3614 + _globals['_COMBINER']._serialized_start=3617 + _globals['_COMBINER']._serialized_end=3808 # @@protoc_insertion_point(module_scope) diff --git a/fedn/fedn/network/grpc/fedn_pb2_grpc.py b/fedn/fedn/network/grpc/fedn_pb2_grpc.py index 1a03fe970..bb8086c3f 100644 --- a/fedn/fedn/network/grpc/fedn_pb2_grpc.py +++ b/fedn/fedn/network/grpc/fedn_pb2_grpc.py @@ -593,25 +593,10 @@ def __init__(self, channel): Args: channel: A grpc.Channel. """ - self.ModelUpdateRequestStream = channel.unary_stream( - '/grpc.Combiner/ModelUpdateRequestStream', + self.TaskStream = channel.unary_stream( + '/grpc.Combiner/TaskStream', request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelUpdateRequest.FromString, - ) - self.ModelUpdateStream = channel.unary_stream( - '/grpc.Combiner/ModelUpdateStream', - request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelUpdate.FromString, - ) - self.ModelValidationRequestStream = channel.unary_stream( - '/grpc.Combiner/ModelValidationRequestStream', - request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelValidationRequest.FromString, - ) - self.ModelValidationStream = channel.unary_stream( - '/grpc.Combiner/ModelValidationStream', - request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelValidation.FromString, + response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.TaskRequest.FromString, ) self.SendModelUpdate = channel.unary_unary( '/grpc.Combiner/SendModelUpdate', @@ -628,31 +613,13 @@ def __init__(self, channel): class CombinerServicer(object): """Missing associated documentation comment in .proto file.""" - def ModelUpdateRequestStream(self, request, context): + def TaskStream(self, request, context): """Stream endpoints for training/validation pub/sub """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') - def ModelUpdateStream(self, request, context): - """Missing associated documentation comment in .proto file.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def ModelValidationRequestStream(self, request, context): - """Missing associated documentation comment in .proto file.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def ModelValidationStream(self, request, context): - """Missing associated documentation comment in .proto file.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - def SendModelUpdate(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) @@ -668,25 +635,10 @@ def SendModelValidation(self, request, context): def add_CombinerServicer_to_server(servicer, server): rpc_method_handlers = { - 'ModelUpdateRequestStream': grpc.unary_stream_rpc_method_handler( - servicer.ModelUpdateRequestStream, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelUpdateRequest.SerializeToString, - ), - 'ModelUpdateStream': grpc.unary_stream_rpc_method_handler( - servicer.ModelUpdateStream, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelUpdate.SerializeToString, - ), - 'ModelValidationRequestStream': grpc.unary_stream_rpc_method_handler( - servicer.ModelValidationRequestStream, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelValidationRequest.SerializeToString, - ), - 'ModelValidationStream': grpc.unary_stream_rpc_method_handler( - servicer.ModelValidationStream, + 'TaskStream': grpc.unary_stream_rpc_method_handler( + servicer.TaskStream, request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelValidation.SerializeToString, + response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.TaskRequest.SerializeToString, ), 'SendModelUpdate': grpc.unary_unary_rpc_method_handler( servicer.SendModelUpdate, @@ -709,58 +661,7 @@ class Combiner(object): """Missing associated documentation comment in .proto file.""" @staticmethod - def ModelUpdateRequestStream(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_stream(request, target, '/grpc.Combiner/ModelUpdateRequestStream', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelUpdateRequest.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) - - @staticmethod - def ModelUpdateStream(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_stream(request, target, '/grpc.Combiner/ModelUpdateStream', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelUpdate.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) - - @staticmethod - def ModelValidationRequestStream(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_stream(request, target, '/grpc.Combiner/ModelValidationRequestStream', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelValidationRequest.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) - - @staticmethod - def ModelValidationStream(request, + def TaskStream(request, target, options=(), channel_credentials=None, @@ -770,9 +671,9 @@ def ModelValidationStream(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_stream(request, target, '/grpc.Combiner/ModelValidationStream', + return grpc.experimental.unary_stream(request, target, '/grpc.Combiner/TaskStream', fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelValidation.FromString, + fedn_dot_network_dot_grpc_dot_fedn__pb2.TaskRequest.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/fedn/fedn/network/storage/models/tempmodelstorage.py b/fedn/fedn/network/storage/models/tempmodelstorage.py index b60e7edec..7e32ea866 100644 --- a/fedn/fedn/network/storage/models/tempmodelstorage.py +++ b/fedn/fedn/network/storage/models/tempmodelstorage.py @@ -59,7 +59,11 @@ def get_ptr(self, model_id): def get_model_metadata(self, model_id): - return self.models_metadata[model_id] + try: + status = self.models_metadata[model_id] + except KeyError: + status = fedn.ModelStatus.UNKNOWN + return status def set_model_metadata(self, model_id, model_metadata): diff --git a/fedn/setup.py b/fedn/setup.py index 16cb88238..33e083d9f 100644 --- a/fedn/setup.py +++ b/fedn/setup.py @@ -16,7 +16,7 @@ "minio", "python-slugify", "grpcio~=1.57.0", - "grpcio-tools", + "grpcio-tools~=1.57.0", "numpy>=1.21.6", "protobuf", "pymongo", @@ -32,7 +32,7 @@ "pandas", "bokeh<3.0.0", "networkx", - "grpcio-health-checking" + "grpcio-health-checking~=1.57.0" ], license='Apache 2.0', zip_safe=False,