From 5afc9d0ccf2994425eb854c68b335ece142e81fe Mon Sep 17 00:00:00 2001 From: Niklas Date: Fri, 1 Dec 2023 16:46:52 +0100 Subject: [PATCH 01/17] Started on new set compute package --- fedn/fedn/network/api/interface.py | 83 +++++++++++++++++-- .../network/statestore/mongostatestore.py | 35 ++++---- 2 files changed, 91 insertions(+), 27 deletions(-) diff --git a/fedn/fedn/network/api/interface.py b/fedn/fedn/network/api/interface.py index e56462493..a45e6bb38 100644 --- a/fedn/fedn/network/api/interface.py +++ b/fedn/fedn/network/api/interface.py @@ -55,14 +55,16 @@ def _allowed_file_extension( :param filename: The filename to check. :type filename: str - :return: True if file extension is allowed, else False. - :rtype: bool - """ + :return: True and extension str if file extension is allowed, else False and None. + :rtype: Tuple (bool, str) + """ + if "." in filename: + extension = filename.rsplit(".", 1)[1].lower() + if extension in ALLOWED_EXTENSIONS: + return (True, extension) + + return (False, None) - return ( - "." in filename - and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS - ) def get_clients(self, limit=None, skip=None, status=False): """Get all clients from the statestore. @@ -207,7 +209,7 @@ def get_session(self, session_id): payload[id] = info return jsonify(payload) - def set_compute_package(self, file, helper_type): + def set_compute_package(self, file, helper_type: str): """Set the compute package in the statestore. :param file: The compute package to set. @@ -216,7 +218,72 @@ def set_compute_package(self, file, helper_type): :rtype: :class:`flask.Response` """ + if ( + self.control.state() == ReducerState.instructing + or self.control.state() == ReducerState.monitoring + ): + return ( + jsonify( + { + "success": False, + "message": "Reducer is in instructing or monitoring state." + "Cannot set compute package.", + } + ), + 400, + ) + + if file is None: + return ( + jsonify( + { + "success": False, + "message": "No file provided.", + } + ), + 404, + ) + + success, extension = self._allowed_file_extension(file.filename) + + if not success: + return ( + jsonify( + { + "success": False, + "message": f"File extension {extension} not allowed.", + } + ), + 404, + ) + + file_name = file.filename + storage_file_name = secure_filename(f"{str(uuid.uuid4())}.{extension}") + + file_path = os.path.join("/app/client/package/", storage_file_name) + file.save(file_path) + + self.control.set_compute_package(storage_file_name, file_path) + self.statestore.set_helper(helper_type) + + success = self.statestore.set_compute_package(file_name, storage_file_name) + + if not success: + return ( + jsonify( + { + "success": False, + "message": "Failed to set compute package.", + } + ), + 400, + ) + + return jsonify({"success": True, "message": "Compute package set."}) + + if file and self._allowed_file_extension(file.filename): + filename = secure_filename(file.filename) # TODO: make configurable, perhaps in config.py or package.py file_path = os.path.join("/app/client/package/", filename) diff --git a/fedn/fedn/network/statestore/mongostatestore.py b/fedn/fedn/network/statestore/mongostatestore.py index 19d514f59..98518dae7 100644 --- a/fedn/fedn/network/statestore/mongostatestore.py +++ b/fedn/fedn/network/statestore/mongostatestore.py @@ -269,34 +269,31 @@ def get_validations(self, **kwargs): result = self.control.validations.find(kwargs) return result - def set_compute_package(self, filename): + def set_compute_package(self, file_name: str, storage_file_name: str): """Set the active compute package in statestore. - :param filename: The filename of the compute package. - :type filename: str + :param file_name: The file_name of the compute package. + :type file_name: str :return: True if successful. :rtype: bool """ + + obj = { + "file_name": file_name, + "storage_file_name": storage_file_name, + "committed_at": str(datetime.now()), + } + self.control.package.update_one( {"key": "active"}, - { - "$set": { - "filename": filename, - "committed_at": str(datetime.now()), - } - }, - True, - ) - self.control.package.update_one( - {"key": "package_trail"}, - { - "$push": { - "filename": filename, - "committed_at": str(datetime.now()), - } - }, + obj, True, ) + + trail_obj = {**{"key": "package_trail"}, **obj} + + self.control.package.insert_one(trail_obj) + return True def get_compute_package(self): From ce46471fb55cf99d71edc0d5d70372699d545457 Mon Sep 17 00:00:00 2001 From: Niklas Date: Mon, 4 Dec 2023 14:10:52 +0100 Subject: [PATCH 02/17] set compute package updated --- fedn/fedn/network/api/interface.py | 52 ++----------------- fedn/fedn/network/controller/controlbase.py | 2 +- .../network/statestore/mongostatestore.py | 7 ++- 3 files changed, 11 insertions(+), 50 deletions(-) diff --git a/fedn/fedn/network/api/interface.py b/fedn/fedn/network/api/interface.py index a45e6bb38..85a839fc1 100644 --- a/fedn/fedn/network/api/interface.py +++ b/fedn/fedn/network/api/interface.py @@ -57,14 +57,13 @@ def _allowed_file_extension( :type filename: str :return: True and extension str if file extension is allowed, else False and None. :rtype: Tuple (bool, str) - """ + """ if "." in filename: extension = filename.rsplit(".", 1)[1].lower() if extension in ALLOWED_EXTENSIONS: return (True, extension) - - return (False, None) + return (False, None) def get_clients(self, limit=None, skip=None, status=False): """Get all clients from the statestore. @@ -243,7 +242,7 @@ def set_compute_package(self, file, helper_type: str): ), 404, ) - + success, extension = self._allowed_file_extension(file.filename) if not success: @@ -256,7 +255,7 @@ def set_compute_package(self, file, helper_type: str): ), 404, ) - + file_name = file.filename storage_file_name = secure_filename(f"{str(uuid.uuid4())}.{extension}") @@ -264,9 +263,7 @@ def set_compute_package(self, file, helper_type: str): file.save(file_path) self.control.set_compute_package(storage_file_name, file_path) - self.statestore.set_helper(helper_type) - - success = self.statestore.set_compute_package(file_name, storage_file_name) + success = self.statestore.set_compute_package(file_name, storage_file_name, helper_type) if not success: return ( @@ -278,46 +275,7 @@ def set_compute_package(self, file, helper_type: str): ), 400, ) - - return jsonify({"success": True, "message": "Compute package set."}) - - - if file and self._allowed_file_extension(file.filename): - - filename = secure_filename(file.filename) - # TODO: make configurable, perhaps in config.py or package.py - file_path = os.path.join("/app/client/package/", filename) - file.save(file_path) - - if ( - self.control.state() == ReducerState.instructing - or self.control.state() == ReducerState.monitoring - ): - return ( - jsonify( - { - "success": False, - "message": "Reducer is in instructing or monitoring state." - "Cannot set compute package.", - } - ), - 400, - ) - self.control.set_compute_package(filename, file_path) - self.statestore.set_helper(helper_type) - - success = self.statestore.set_compute_package(filename) - if not success: - return ( - jsonify( - { - "success": False, - "message": "Failed to set compute package.", - } - ), - 400, - ) return jsonify({"success": True, "message": "Compute package set."}) def _get_compute_package_name(self): diff --git a/fedn/fedn/network/controller/controlbase.py b/fedn/fedn/network/controller/controlbase.py index fab6a2027..676168841 100644 --- a/fedn/fedn/network/controller/controlbase.py +++ b/fedn/fedn/network/controller/controlbase.py @@ -181,7 +181,7 @@ def get_compute_package_name(self): def set_compute_package(self, filename, path): """Persist the configuration for the compute package.""" self.model_repository.set_compute_package(filename, path) - self.statestore.set_compute_package(filename) + # self.statestore.set_compute_package(filename) def get_compute_package(self, compute_package=""): """ diff --git a/fedn/fedn/network/statestore/mongostatestore.py b/fedn/fedn/network/statestore/mongostatestore.py index 98518dae7..39fe880e6 100644 --- a/fedn/fedn/network/statestore/mongostatestore.py +++ b/fedn/fedn/network/statestore/mongostatestore.py @@ -269,7 +269,7 @@ def get_validations(self, **kwargs): result = self.control.validations.find(kwargs) return result - def set_compute_package(self, file_name: str, storage_file_name: str): + def set_compute_package(self, file_name: str, storage_file_name: str, helper_type: str): """Set the active compute package in statestore. :param file_name: The file_name of the compute package. @@ -281,12 +281,15 @@ def set_compute_package(self, file_name: str, storage_file_name: str): obj = { "file_name": file_name, "storage_file_name": storage_file_name, + "helper": helper_type, "committed_at": str(datetime.now()), } self.control.package.update_one( {"key": "active"}, - obj, + { + "$set": obj + }, True, ) From 3d6bbae013d54538b49ab454500f24aadabda3fb Mon Sep 17 00:00:00 2001 From: Niklas Date: Mon, 4 Dec 2023 14:52:50 +0100 Subject: [PATCH 03/17] list compute packages added --- fedn/fedn/network/api/client.py | 9 ++++ fedn/fedn/network/api/interface.py | 43 +++++++++++++++++++ fedn/fedn/network/api/server.py | 16 +++++++ .../network/statestore/mongostatestore.py | 40 ++++++++++++++++- 4 files changed, 107 insertions(+), 1 deletion(-) diff --git a/fedn/fedn/network/api/client.py b/fedn/fedn/network/api/client.py index 58fc27304..b6ffab2fc 100644 --- a/fedn/fedn/network/api/client.py +++ b/fedn/fedn/network/api/client.py @@ -192,6 +192,15 @@ def get_package(self): response = requests.get(self._get_url('get_package'), verify=self.verify) return response.json() + def list_compute_packages(self): + """ Get all compute packages from the statestore. + + :return: All compute packages with info. + :rtype: dict + """ + response = requests.get(self._get_url('list_compute_packages'), verify=self.verify) + return response.json() + def download_package(self, path): """ Download the compute package. diff --git a/fedn/fedn/network/api/interface.py b/fedn/fedn/network/api/interface.py index 85a839fc1..95b29d48f 100644 --- a/fedn/fedn/network/api/interface.py +++ b/fedn/fedn/network/api/interface.py @@ -320,6 +320,49 @@ def get_compute_package(self): payload[id] = info return jsonify(payload) + def list_compute_packages(self, limit: str = None, skip: str = None): + """Get paginated list of compute packages from the statestore. + + :return: All compute packages as a json response. + :rtype: :class:`flask.Response` + """ + + if limit is None: + return ( + jsonify( + { + "success": False, + "message": "No limit provided.", + } + ), + 404, + ) + + if limit is not None and skip is not None: + limit = int(limit) + skip = int(skip) + + result = self.statestore.list_compute_packages(limit, skip) + if result is None: + return ( + jsonify( + {"success": False, "message": "No compute packages found."} + ), + 404, + ) + arr = [] + for element in result["result"]: + obj = { + "file_name": element["file_name"], + "helper": element["helper"], + "committed_at": element["committed_at"], + "storage_file_name": element["storage_file_name"], + } + arr.append(obj) + + result = {"result": arr, "count": result["count"]} + return jsonify(result) + def download_compute_package(self, name): """Download the compute package. diff --git a/fedn/fedn/network/api/server.py b/fedn/fedn/network/api/server.py index cfb91bece..9199d8921 100644 --- a/fedn/fedn/network/api/server.py +++ b/fedn/fedn/network/api/server.py @@ -218,6 +218,22 @@ def get_package(): return api.get_compute_package() +@app.route("/list_compute_packages", methods=["GET"]) +def list_compute_packages(): + """Get the compute package from the statestore. + return: The compute package as a json object. + rtype: json + """ + + limit = request.args.get("limit", None) + skip = request.args.get("skip", None) + + return api.list_compute_packages( + limit=limit, + skip=skip, + ) + + @app.route("/download_package", methods=["GET"]) def download_package(): """Download the compute package. diff --git a/fedn/fedn/network/statestore/mongostatestore.py b/fedn/fedn/network/statestore/mongostatestore.py index 39fe880e6..02c16ca76 100644 --- a/fedn/fedn/network/statestore/mongostatestore.py +++ b/fedn/fedn/network/statestore/mongostatestore.py @@ -282,7 +282,7 @@ def set_compute_package(self, file_name: str, storage_file_name: str, helper_typ "file_name": file_name, "storage_file_name": storage_file_name, "helper": helper_type, - "committed_at": str(datetime.now()), + "committed_at": datetime.now(), } self.control.package.update_one( @@ -316,6 +316,44 @@ def get_compute_package(self): except (KeyError, IndexError): return None + def list_compute_packages(self, limit: int = None, skip: int = None, sort_key="committed_at", sort_order=pymongo.DESCENDING): + """List compute packages in the statestore (paginated). + + :param limit: The maximum number of compute packages to return. + :type limit: int + :param skip: The number of compute packages to skip. + :type skip: int + :param sort_key: The key to sort by. + :type sort_key: str + :param sort_order: The sort order. + :type sort_order: pymongo.ASCENDING or pymongo.DESCENDING + :return: Dictionary of compute packages in result and count. + :rtype: dict + """ + + result = None + count = None + + find_option = {"key": "package_trail"} + projection = {"_id": False, "key": False} + + try: + if limit is not None and skip is not None: + result = self.control.package.find(find_option, projection).limit(limit).skip(skip).sort(sort_key, sort_order) + else: + result = self.control.package.find(find_option, projection).sort(sort_key, sort_order) + + count = self.control.package.count_documents(find_option) + + except Exception as e: + print("ERROR: {}".format(e), flush=True) + return None + + return { + "result": result, + "count": count, + } + def set_helper(self, helper): """Set the active helper package in statestore. From bdf82e410f949025da8282ca410b4905f45a1cb5 Mon Sep 17 00:00:00 2001 From: Niklas Date: Mon, 4 Dec 2023 15:12:43 +0100 Subject: [PATCH 04/17] added properties to /get_package --- fedn/fedn/network/api/interface.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fedn/fedn/network/api/interface.py b/fedn/fedn/network/api/interface.py index 95b29d48f..0438b4ee6 100644 --- a/fedn/fedn/network/api/interface.py +++ b/fedn/fedn/network/api/interface.py @@ -314,8 +314,10 @@ def get_compute_package(self): payload = {} id = str(package_object["_id"]) info = { - "filename": package_object["filename"], + "file_name": package_object["file_name"], "helper": package_object["helper"], + "committed_at": package_object["committed_at"], + "storage_file_name": package_object["storage_file_name"], } payload[id] = info return jsonify(payload) From 46f1ae668e5f5cb9006fe19c7bb513225eed2dc7 Mon Sep 17 00:00:00 2001 From: Niklas Date: Tue, 5 Dec 2023 14:20:12 +0100 Subject: [PATCH 05/17] name and desription added to compute package --- fedn/fedn/network/api/client.py | 5 +++-- fedn/fedn/network/api/interface.py | 4 ++-- fedn/fedn/network/api/server.py | 5 ++++- fedn/fedn/network/statestore/mongostatestore.py | 4 +++- 4 files changed, 12 insertions(+), 6 deletions(-) diff --git a/fedn/fedn/network/api/client.py b/fedn/fedn/network/api/client.py index b6ffab2fc..4fe0cfe4a 100644 --- a/fedn/fedn/network/api/client.py +++ b/fedn/fedn/network/api/client.py @@ -169,7 +169,7 @@ def get_session(self, session_id): response = requests.get(self._get_url(f'get_session?session_id={session_id}'), self.verify) return response.json() - def set_package(self, path, helper): + def set_package(self, path: str, helper: str, name: str = None, description: str = None): """ Set the compute package in the statestore. :param path: The file path of the compute package to set. @@ -180,7 +180,8 @@ def set_package(self, path, helper): :rtype: dict """ with open(path, 'rb') as file: - response = requests.post(self._get_url('set_package'), files={'file': file}, data={'helper': helper}, verify=self.verify) + response = requests.post(self._get_url('set_package'), files={'file': file}, data={ + 'helper': helper, 'name': name, 'description': description}, verify=self.verify) return response.json() def get_package(self): diff --git a/fedn/fedn/network/api/interface.py b/fedn/fedn/network/api/interface.py index 0438b4ee6..0a402c46e 100644 --- a/fedn/fedn/network/api/interface.py +++ b/fedn/fedn/network/api/interface.py @@ -208,7 +208,7 @@ def get_session(self, session_id): payload[id] = info return jsonify(payload) - def set_compute_package(self, file, helper_type: str): + def set_compute_package(self, file, helper_type: str, name: str = None, description: str = None): """Set the compute package in the statestore. :param file: The compute package to set. @@ -263,7 +263,7 @@ def set_compute_package(self, file, helper_type: str): file.save(file_path) self.control.set_compute_package(storage_file_name, file_path) - success = self.statestore.set_compute_package(file_name, storage_file_name, helper_type) + success = self.statestore.set_compute_package(file_name, storage_file_name, helper_type, name, description) if not success: return ( diff --git a/fedn/fedn/network/api/server.py b/fedn/fedn/network/api/server.py index 9199d8921..f2cff4e2b 100644 --- a/fedn/fedn/network/api/server.py +++ b/fedn/fedn/network/api/server.py @@ -197,6 +197,9 @@ def set_package(): rtype: json """ helper_type = request.form.get("helper", None) + name = request.form.get("name", None) + description = request.form.get("description", None) + if helper_type is None: return ( jsonify({"success": False, "message": "Missing helper type."}), @@ -206,7 +209,7 @@ def set_package(): file = request.files["file"] except KeyError: return jsonify({"success": False, "message": "Missing file."}), 400 - return api.set_compute_package(file=file, helper_type=helper_type) + return api.set_compute_package(file=file, helper_type=helper_type, name=name, description=description) @app.route("/get_package", methods=["GET"]) diff --git a/fedn/fedn/network/statestore/mongostatestore.py b/fedn/fedn/network/statestore/mongostatestore.py index 02c16ca76..9ebfb2480 100644 --- a/fedn/fedn/network/statestore/mongostatestore.py +++ b/fedn/fedn/network/statestore/mongostatestore.py @@ -269,7 +269,7 @@ def get_validations(self, **kwargs): result = self.control.validations.find(kwargs) return result - def set_compute_package(self, file_name: str, storage_file_name: str, helper_type: str): + def set_compute_package(self, file_name: str, storage_file_name: str, helper_type: str, name: str = None, description: str = None): """Set the active compute package in statestore. :param file_name: The file_name of the compute package. @@ -283,6 +283,8 @@ def set_compute_package(self, file_name: str, storage_file_name: str, helper_typ "storage_file_name": storage_file_name, "helper": helper_type, "committed_at": datetime.now(), + "name": name, + "description": description, } self.control.package.update_one( From 29a31b1ef745aea8b4fc150140d4d0e701168ae0 Mon Sep 17 00:00:00 2001 From: Niklas Date: Tue, 5 Dec 2023 16:19:32 +0100 Subject: [PATCH 06/17] simplyfied compute package object response (get_package) --- fedn/fedn/network/api/interface.py | 16 ++++------------ fedn/fedn/network/statestore/mongostatestore.py | 15 +++++++-------- 2 files changed, 11 insertions(+), 20 deletions(-) diff --git a/fedn/fedn/network/api/interface.py b/fedn/fedn/network/api/interface.py index 0a402c46e..40d3007a3 100644 --- a/fedn/fedn/network/api/interface.py +++ b/fedn/fedn/network/api/interface.py @@ -303,24 +303,16 @@ def get_compute_package(self): :return: The compute package as a json response. :rtype: :class:`flask.Response` """ - package_object = self.statestore.get_compute_package() - if package_object is None: + result = self.statestore.get_compute_package() + if result is None: return ( jsonify( {"success": False, "message": "No compute package found."} ), 404, ) - payload = {} - id = str(package_object["_id"]) - info = { - "file_name": package_object["file_name"], - "helper": package_object["helper"], - "committed_at": package_object["committed_at"], - "storage_file_name": package_object["storage_file_name"], - } - payload[id] = info - return jsonify(payload) + + return jsonify(result) def list_compute_packages(self, limit: str = None, skip: str = None): """Get paginated list of compute packages from the statestore. diff --git a/fedn/fedn/network/statestore/mongostatestore.py b/fedn/fedn/network/statestore/mongostatestore.py index 9ebfb2480..f6a6087fd 100644 --- a/fedn/fedn/network/statestore/mongostatestore.py +++ b/fedn/fedn/network/statestore/mongostatestore.py @@ -307,15 +307,14 @@ def get_compute_package(self): :return: The active compute package. :rtype: ObjectID """ - ret = self.control.package.find({"key": "active"}) try: - retcheck = ret[0] - if ( - retcheck is None or retcheck == "" or retcheck == " " - ): # ugly check for empty string - return None - return retcheck - except (KeyError, IndexError): + + find = {"key": "active"} + projection = {"_id": False, "key": False} + ret = self.control.package.find_one(find, projection) + return ret + except Exception as e: + print("ERROR: {}".format(e), flush=True) return None def list_compute_packages(self, limit: int = None, skip: int = None, sort_key="committed_at", sort_order=pymongo.DESCENDING): From 759b481a181f6a0a7c69f3a8fb986c73aac6ab71 Mon Sep 17 00:00:00 2001 From: Niklas Date: Tue, 5 Dec 2023 16:53:01 +0100 Subject: [PATCH 07/17] include name and desc in compute package list response --- fedn/fedn/network/api/interface.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fedn/fedn/network/api/interface.py b/fedn/fedn/network/api/interface.py index 40d3007a3..f991bb02d 100644 --- a/fedn/fedn/network/api/interface.py +++ b/fedn/fedn/network/api/interface.py @@ -350,7 +350,9 @@ def list_compute_packages(self, limit: str = None, skip: str = None): "file_name": element["file_name"], "helper": element["helper"], "committed_at": element["committed_at"], - "storage_file_name": element["storage_file_name"], + "storage_file_name": element["storage_file_name"] if "storage_file_name" in element else "", + "name": element["name"] if "name" in element else "", + "description": element["description"] if "description" in element else "", } arr.append(obj) From 4f7c2567ab2ee9d968d6e040fa2acd0e88ae3f8a Mon Sep 17 00:00:00 2001 From: Niklas Date: Tue, 5 Dec 2023 17:41:53 +0100 Subject: [PATCH 08/17] include id in result for compute packages --- fedn/fedn/network/api/interface.py | 13 ++++++++++++- fedn/fedn/network/statestore/mongostatestore.py | 4 ++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/fedn/fedn/network/api/interface.py b/fedn/fedn/network/api/interface.py index f991bb02d..b4f564ecd 100644 --- a/fedn/fedn/network/api/interface.py +++ b/fedn/fedn/network/api/interface.py @@ -312,7 +312,17 @@ def get_compute_package(self): 404, ) - return jsonify(result) + obj = { + "id": result["_id"].__str__(), + "file_name": result["file_name"], + "helper": result["helper"], + "committed_at": result["committed_at"], + "storage_file_name": result["storage_file_name"] if "storage_file_name" in result else "", + "name": result["name"] if "name" in result else "", + "description": result["description"] if "description" in result else "", + } + + return jsonify(obj) def list_compute_packages(self, limit: str = None, skip: str = None): """Get paginated list of compute packages from the statestore. @@ -347,6 +357,7 @@ def list_compute_packages(self, limit: str = None, skip: str = None): arr = [] for element in result["result"]: obj = { + "id": element["_id"].__str__(), "file_name": element["file_name"], "helper": element["helper"], "committed_at": element["committed_at"], diff --git a/fedn/fedn/network/statestore/mongostatestore.py b/fedn/fedn/network/statestore/mongostatestore.py index f6a6087fd..bf5d722db 100644 --- a/fedn/fedn/network/statestore/mongostatestore.py +++ b/fedn/fedn/network/statestore/mongostatestore.py @@ -310,7 +310,7 @@ def get_compute_package(self): try: find = {"key": "active"} - projection = {"_id": False, "key": False} + projection = {"key": False} ret = self.control.package.find_one(find, projection) return ret except Exception as e: @@ -336,7 +336,7 @@ def list_compute_packages(self, limit: int = None, skip: int = None, sort_key="c count = None find_option = {"key": "package_trail"} - projection = {"_id": False, "key": False} + projection = {"key": False} try: if limit is not None and skip is not None: From a986fdaf3f1f4f13c460d78b9557e94772c29007 Mon Sep 17 00:00:00 2001 From: Niklas Date: Tue, 5 Dec 2023 18:21:44 +0100 Subject: [PATCH 09/17] added id property to compute packages to enable tracking active easier --- fedn/fedn/network/api/interface.py | 65 ++++++++++++------- fedn/fedn/network/api/server.py | 2 + .../network/statestore/mongostatestore.py | 6 +- 3 files changed, 46 insertions(+), 27 deletions(-) diff --git a/fedn/fedn/network/api/interface.py b/fedn/fedn/network/api/interface.py index b4f564ecd..025cddecd 100644 --- a/fedn/fedn/network/api/interface.py +++ b/fedn/fedn/network/api/interface.py @@ -313,7 +313,7 @@ def get_compute_package(self): ) obj = { - "id": result["_id"].__str__(), + "id": result["id"] if "id" in result else "", "file_name": result["file_name"], "helper": result["helper"], "committed_at": result["committed_at"], @@ -324,28 +324,19 @@ def get_compute_package(self): return jsonify(obj) - def list_compute_packages(self, limit: str = None, skip: str = None): + def list_compute_packages(self, limit: str = None, skip: str = None, include_active: str = None): """Get paginated list of compute packages from the statestore. :return: All compute packages as a json response. :rtype: :class:`flask.Response` """ - if limit is None: - return ( - jsonify( - { - "success": False, - "message": "No limit provided.", - } - ), - 404, - ) - if limit is not None and skip is not None: limit = int(limit) skip = int(skip) + include_active: bool = include_active == "true" + result = self.statestore.list_compute_packages(limit, skip) if result is None: return ( @@ -354,18 +345,42 @@ def list_compute_packages(self, limit: str = None, skip: str = None): ), 404, ) - arr = [] - for element in result["result"]: - obj = { - "id": element["_id"].__str__(), - "file_name": element["file_name"], - "helper": element["helper"], - "committed_at": element["committed_at"], - "storage_file_name": element["storage_file_name"] if "storage_file_name" in element else "", - "name": element["name"] if "name" in element else "", - "description": element["description"] if "description" in element else "", - } - arr.append(obj) + + active_package_id: str = None + + if include_active: + active_package = self.statestore.get_compute_package() + + if active_package is not None: + active_package_id = active_package["id"] if "id" in active_package else "" + + if include_active: + arr = [ + { + "id": element["id"] if "id" in element else "", + "file_name": element["file_name"], + "helper": element["helper"], + "committed_at": element["committed_at"], + "storage_file_name": element["storage_file_name"] if "storage_file_name" in element else "", + "name": element["name"] if "name" in element else "", + "description": element["description"] if "description" in element else "", + "active": "id" in element and element["id"] == active_package_id, + } + for element in result["result"] + ] + else: + arr = [ + { + "id": element["id"] if "id" in element else "", + "file_name": element["file_name"], + "helper": element["helper"], + "committed_at": element["committed_at"], + "storage_file_name": element["storage_file_name"] if "storage_file_name" in element else "", + "name": element["name"] if "name" in element else "", + "description": element["description"] if "description" in element else "", + } + for element in result["result"] + ] result = {"result": arr, "count": result["count"]} return jsonify(result) diff --git a/fedn/fedn/network/api/server.py b/fedn/fedn/network/api/server.py index f2cff4e2b..90ff019f4 100644 --- a/fedn/fedn/network/api/server.py +++ b/fedn/fedn/network/api/server.py @@ -230,10 +230,12 @@ def list_compute_packages(): limit = request.args.get("limit", None) skip = request.args.get("skip", None) + include_active = request.args.get("include_active", None) return api.list_compute_packages( limit=limit, skip=skip, + include_active=include_active ) diff --git a/fedn/fedn/network/statestore/mongostatestore.py b/fedn/fedn/network/statestore/mongostatestore.py index bf5d722db..6edca376c 100644 --- a/fedn/fedn/network/statestore/mongostatestore.py +++ b/fedn/fedn/network/statestore/mongostatestore.py @@ -1,4 +1,5 @@ import copy +import uuid from datetime import datetime import pymongo @@ -285,6 +286,7 @@ def set_compute_package(self, file_name: str, storage_file_name: str, helper_typ "committed_at": datetime.now(), "name": name, "description": description, + "id": str(uuid.uuid4()), } self.control.package.update_one( @@ -310,7 +312,7 @@ def get_compute_package(self): try: find = {"key": "active"} - projection = {"key": False} + projection = {"key": False, "_id": False} ret = self.control.package.find_one(find, projection) return ret except Exception as e: @@ -336,7 +338,7 @@ def list_compute_packages(self, limit: int = None, skip: int = None, sort_key="c count = None find_option = {"key": "package_trail"} - projection = {"key": False} + projection = {"key": False, "_id": False} try: if limit is not None and skip is not None: From 4d46ac71ee4c299eb89b1fb972e40c8ee2f169a2 Mon Sep 17 00:00:00 2001 From: Niklas Date: Wed, 6 Dec 2023 14:56:39 +0100 Subject: [PATCH 10/17] set active compute package --- fedn/fedn/network/api/interface.py | 17 ++++++++ fedn/fedn/network/api/server.py | 6 +++ .../network/statestore/mongostatestore.py | 39 ++++++++++++++++++- 3 files changed, 60 insertions(+), 2 deletions(-) diff --git a/fedn/fedn/network/api/interface.py b/fedn/fedn/network/api/interface.py index 025cddecd..32a9c00ea 100644 --- a/fedn/fedn/network/api/interface.py +++ b/fedn/fedn/network/api/interface.py @@ -208,6 +208,23 @@ def get_session(self, session_id): payload[id] = info return jsonify(payload) + def set_active_compute_package(self, id: str): + + success = self.statestore.set_active_compute_package(id) + + if not success: + return ( + jsonify( + { + "success": False, + "message": "Failed to set compute package.", + } + ), + 400, + ) + + return jsonify({"success": True, "message": "Compute package set."}) + def set_compute_package(self, file, helper_type: str, name: str = None, description: str = None): """Set the compute package in the statestore. diff --git a/fedn/fedn/network/api/server.py b/fedn/fedn/network/api/server.py index 90ff019f4..cc8d1b4a9 100644 --- a/fedn/fedn/network/api/server.py +++ b/fedn/fedn/network/api/server.py @@ -182,6 +182,12 @@ def get_session(): return api.get_session(session_id) +@app.route("/set_active_package", methods=["PUT"]) +def set_active_package(): + id = request.args.get("id", None) + return api.set_active_compute_package(id) + + @app.route("/set_package", methods=["POST"]) def set_package(): """ Set the compute package in the statestore. diff --git a/fedn/fedn/network/statestore/mongostatestore.py b/fedn/fedn/network/statestore/mongostatestore.py index 6edca376c..76a23b8fb 100644 --- a/fedn/fedn/network/statestore/mongostatestore.py +++ b/fedn/fedn/network/statestore/mongostatestore.py @@ -60,8 +60,12 @@ def __init__(self, network_id, config, model_storage_config): # Storage settings self.set_storage_backend(model_storage_config) + self.init_index() self.__inited = True + def init_index(self): + self.package.create_index([("id", pymongo.DESCENDING)]) + def is_inited(self): """Check if the statestore is intialized. @@ -270,6 +274,37 @@ def get_validations(self, **kwargs): result = self.control.validations.find(kwargs) return result + def set_active_compute_package(self, id: str): + """Set the active compute package in statestore. + + :param id: The id of the compute package (not document _id). + :type id: str + :return: True if successful. + :rtype: bool + """ + + try: + + find = {"id": id} + projection = {"_id": False, "key": False} + + doc = self.control.package.find_one(find, projection) + + if doc is None: + return False + + doc["key"] = "active" + + self.control.package.replace_one( + {"key": "active"}, doc + ) + + except Exception as e: + print("ERROR: {}".format(e), flush=True) + return False + + return True + def set_compute_package(self, file_name: str, storage_file_name: str, helper_type: str, name: str = None, description: str = None): """Set the active compute package in statestore. @@ -353,8 +388,8 @@ def list_compute_packages(self, limit: int = None, skip: int = None, sort_key="c return None return { - "result": result, - "count": count, + "result": result or [], + "count": count or 0, } def set_helper(self, helper): From 7166d3bbe4fa6e938270965c758d1fc2343237e0 Mon Sep 17 00:00:00 2001 From: Niklas Date: Wed, 6 Dec 2023 16:01:22 +0100 Subject: [PATCH 11/17] list_models can include active property --- fedn/fedn/network/api/interface.py | 28 ++++++++++++++++++++++++---- fedn/fedn/network/api/server.py | 3 ++- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/fedn/fedn/network/api/interface.py b/fedn/fedn/network/api/interface.py index 32a9c00ea..b9461b2c1 100644 --- a/fedn/fedn/network/api/interface.py +++ b/fedn/fedn/network/api/interface.py @@ -740,7 +740,7 @@ def get_latest_model(self): {"success": False, "message": "No initial model set."} ) - def get_models(self, session_id=None, limit=None, skip=None): + def get_models(self, session_id: str = None, limit: str = None, skip: str = None, include_active: str = None): result = self.statestore.list_models(session_id, limit, skip) if result is None: @@ -749,10 +749,30 @@ def get_models(self, session_id=None, limit=None, skip=None): 404, ) - arr = [] + include_active: bool = include_active == "true" + + if include_active: + + latest_model = self.statestore.get_latest_model() - for model in result["result"]: - arr.append(model) + arr = [ + { + "committed_at": element["committed_at"], + "model": element["model"], + "session_id": element["session_id"], + "active": element["model"] == latest_model, + } + for element in result["result"] + ] + else: + arr = [ + { + "committed_at": element["committed_at"], + "model": element["model"], + "session_id": element["session_id"], + } + for element in result["result"] + ] result = {"result": arr, "count": result["count"]} diff --git a/fedn/fedn/network/api/server.py b/fedn/fedn/network/api/server.py index cc8d1b4a9..4dc22bc84 100644 --- a/fedn/fedn/network/api/server.py +++ b/fedn/fedn/network/api/server.py @@ -44,8 +44,9 @@ def list_models(): session_id = request.args.get("session_id", None) limit = request.args.get("limit", None) skip = request.args.get("skip", None) + include_active = request.args.get("include_active", None) - return api.get_models(session_id, limit, skip) + return api.get_models(session_id, limit, skip, include_active) @app.route("/delete_model_trail", methods=["GET", "POST"]) From c3f3b602ab6ee80238b6fbcd6e1c40a31050decb Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 7 Dec 2023 14:03:17 +0100 Subject: [PATCH 12/17] select correct file name --- fedn/fedn/network/api/interface.py | 2 +- fedn/fedn/network/controller/controlbase.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/fedn/fedn/network/api/interface.py b/fedn/fedn/network/api/interface.py index b9461b2c1..05ca35a2a 100644 --- a/fedn/fedn/network/api/interface.py +++ b/fedn/fedn/network/api/interface.py @@ -307,7 +307,7 @@ def _get_compute_package_name(self): return None, message else: try: - name = package_objects["filename"] + name = package_objects["storage_file_name"] except KeyError as e: message = "No compute package found. Key error." print(e) diff --git a/fedn/fedn/network/controller/controlbase.py b/fedn/fedn/network/controller/controlbase.py index 676168841..9ac891183 100644 --- a/fedn/fedn/network/controller/controlbase.py +++ b/fedn/fedn/network/controller/controlbase.py @@ -167,7 +167,7 @@ def get_compute_package_name(self): definition = self.statestore.get_compute_package() if definition: try: - package_name = definition["filename"] + package_name = definition["storage_file_name"] return package_name except (IndexError, KeyError): print( From 33c4c540341f147241ac10a424c48b4aaeb24acc Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 7 Dec 2023 16:00:14 +0100 Subject: [PATCH 13/17] set active model --- fedn/fedn/network/api/interface.py | 23 ++++++++++++++++ fedn/fedn/network/api/server.py | 18 +++++++++++++ .../network/statestore/mongostatestore.py | 26 +++++++++++++++++++ 3 files changed, 67 insertions(+) diff --git a/fedn/fedn/network/api/interface.py b/fedn/fedn/network/api/interface.py index 05ca35a2a..9732811a1 100644 --- a/fedn/fedn/network/api/interface.py +++ b/fedn/fedn/network/api/interface.py @@ -740,6 +740,29 @@ def get_latest_model(self): {"success": False, "message": "No initial model set."} ) + def set_active_model(self, model_id: str): + """Set the active model in the statestore. + + :param model_id: The model id to set. + :type model_id: str + :return: A json response with success or failure message. + :rtype: :class:`flask.Response` + """ + success = self.statestore.set_active_model(model_id) + + if not success: + return ( + jsonify( + { + "success": False, + "message": "Failed to set active model.", + } + ), + 400, + ) + + return jsonify({"success": True, "message": "Active model set."}) + def get_models(self, session_id: str = None, limit: str = None, skip: str = None, include_active: str = None): result = self.statestore.list_models(session_id, limit, skip) diff --git a/fedn/fedn/network/api/server.py b/fedn/fedn/network/api/server.py index 4dc22bc84..770bcad7b 100644 --- a/fedn/fedn/network/api/server.py +++ b/fedn/fedn/network/api/server.py @@ -271,6 +271,24 @@ def get_latest_model(): return api.get_latest_model() +@app.route("/set_active_model", methods=["PUT"]) +def set_active_model(): + """Set the initial model in the statestore and upload to model repository. + Usage with curl: + curl -k -X PUT + -F id= + http://localhost:8092/set_initial_model + + param: id: The model id to set. + type: id: str + return: boolean. + rtype: json + """ + id = request.args.get("id", None) + if id is None: + return jsonify({"success": False, "message": "Missing model id."}), 400 + return api.set_active_model(id) + # Get initial model endpoint diff --git a/fedn/fedn/network/statestore/mongostatestore.py b/fedn/fedn/network/statestore/mongostatestore.py index 76a23b8fb..a94c99817 100644 --- a/fedn/fedn/network/statestore/mongostatestore.py +++ b/fedn/fedn/network/statestore/mongostatestore.py @@ -233,6 +233,32 @@ def get_latest_model(self): except (KeyError, IndexError): return None + def set_active_model(self, model_id: str): + """Set the active model in statestore. + + :param model_id: The model id. + :type model_id: str + :return: + """ + + try: + + committed_at = datetime.now() + + existing_model = self.model.find_one({"key": "models", "model": model_id}) + + if existing_model is not None: + + self.model.update_one( + {"key": "active_model"}, {"$set": {"model": model_id, "committed_at": committed_at, "session_id": None}}, True + ) + + return True + except Exception as e: + print("ERROR: {}".format(e), flush=True) + + return False + def get_latest_round(self): """Get the id of the most recent round. From efad487e990b822f12a0d37ea11768712aaddec0 Mon Sep 17 00:00:00 2001 From: Niklas Date: Mon, 18 Dec 2023 16:02:49 +0100 Subject: [PATCH 14/17] set CURRENT model --- fedn/fedn/network/api/interface.py | 4 ++-- fedn/fedn/network/api/server.py | 8 ++++---- fedn/fedn/network/statestore/mongostatestore.py | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/fedn/fedn/network/api/interface.py b/fedn/fedn/network/api/interface.py index 9732811a1..707d8b366 100644 --- a/fedn/fedn/network/api/interface.py +++ b/fedn/fedn/network/api/interface.py @@ -740,7 +740,7 @@ def get_latest_model(self): {"success": False, "message": "No initial model set."} ) - def set_active_model(self, model_id: str): + def set_current_model(self, model_id: str): """Set the active model in the statestore. :param model_id: The model id to set. @@ -748,7 +748,7 @@ def set_active_model(self, model_id: str): :return: A json response with success or failure message. :rtype: :class:`flask.Response` """ - success = self.statestore.set_active_model(model_id) + success = self.statestore.set_current_model(model_id) if not success: return ( diff --git a/fedn/fedn/network/api/server.py b/fedn/fedn/network/api/server.py index 770bcad7b..ad9acf010 100644 --- a/fedn/fedn/network/api/server.py +++ b/fedn/fedn/network/api/server.py @@ -271,13 +271,13 @@ def get_latest_model(): return api.get_latest_model() -@app.route("/set_active_model", methods=["PUT"]) -def set_active_model(): +@app.route("/set_current_model", methods=["PUT"]) +def set_current_model(): """Set the initial model in the statestore and upload to model repository. Usage with curl: curl -k -X PUT -F id= - http://localhost:8092/set_initial_model + http://localhost:8092/set_current_model param: id: The model id to set. type: id: str @@ -287,7 +287,7 @@ def set_active_model(): id = request.args.get("id", None) if id is None: return jsonify({"success": False, "message": "Missing model id."}), 400 - return api.set_active_model(id) + return api.set_current_model(id) # Get initial model endpoint diff --git a/fedn/fedn/network/statestore/mongostatestore.py b/fedn/fedn/network/statestore/mongostatestore.py index a94c99817..24bf4dcd3 100644 --- a/fedn/fedn/network/statestore/mongostatestore.py +++ b/fedn/fedn/network/statestore/mongostatestore.py @@ -233,8 +233,8 @@ def get_latest_model(self): except (KeyError, IndexError): return None - def set_active_model(self, model_id: str): - """Set the active model in statestore. + def set_current_model(self, model_id: str): + """Set the current model in statestore. :param model_id: The model id. :type model_id: str @@ -250,7 +250,7 @@ def set_active_model(self, model_id: str): if existing_model is not None: self.model.update_one( - {"key": "active_model"}, {"$set": {"model": model_id, "committed_at": committed_at, "session_id": None}}, True + {"key": "current_model"}, {"$set": {"model": model_id, "committed_at": committed_at, "session_id": None}}, True ) return True From b153e3f6826cbda3ffb4188040bc1deb7567bac1 Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 28 Dec 2023 10:41:02 +0100 Subject: [PATCH 15/17] Removed comment --- fedn/fedn/network/controller/controlbase.py | 1 - 1 file changed, 1 deletion(-) diff --git a/fedn/fedn/network/controller/controlbase.py b/fedn/fedn/network/controller/controlbase.py index 9ac891183..5cf85acc4 100644 --- a/fedn/fedn/network/controller/controlbase.py +++ b/fedn/fedn/network/controller/controlbase.py @@ -181,7 +181,6 @@ def get_compute_package_name(self): def set_compute_package(self, filename, path): """Persist the configuration for the compute package.""" self.model_repository.set_compute_package(filename, path) - # self.statestore.set_compute_package(filename) def get_compute_package(self, compute_package=""): """ From 8bd8fd51f3e8f20fbaa211ecac92582d8659bf21 Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 28 Dec 2023 11:27:54 +0100 Subject: [PATCH 16/17] lint fix --- fedn/fedn/network/storage/statestore/mongostatestore.py | 1 + 1 file changed, 1 insertion(+) diff --git a/fedn/fedn/network/storage/statestore/mongostatestore.py b/fedn/fedn/network/storage/statestore/mongostatestore.py index da004ad10..fe6f93c51 100644 --- a/fedn/fedn/network/storage/statestore/mongostatestore.py +++ b/fedn/fedn/network/storage/statestore/mongostatestore.py @@ -58,6 +58,7 @@ def __init__(self, network_id, config): raise self.init_index() + def connect(self): """ Establish client connection to MongoDB. From 3e471863c7888266168808ff77949c2b9092326c Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 4 Jan 2024 13:23:56 +0100 Subject: [PATCH 17/17] Added api test for list_compute_packages --- fedn/fedn/network/api/tests.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/fedn/fedn/network/api/tests.py b/fedn/fedn/network/api/tests.py index 7395d9bdf..cf9e27b3e 100644 --- a/fedn/fedn/network/api/tests.py +++ b/fedn/fedn/network/api/tests.py @@ -160,6 +160,19 @@ def test_list_combiners(self): # Assert api.get_all_combiners was called fedn.network.api.server.api.get_all_combiners.assert_called_once_with() + def test_list_compute_packages(self): + """ Test list_compute_packages endpoint. """ + # Mock api.list_compute_packages + return_value = {"test": "test"} + fedn.network.api.server.api.list_compute_packages = MagicMock(return_value=return_value) + # Make request + response = self.app.get('/list_combiners') + # Assert response + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json, return_value) + # Assert api.list_compute_packages was called + fedn.network.api.server.api.list_compute_packages.assert_called_once_with() + def test_list_rounds(self): """ Test list_rounds endpoint. """ # Mock api.get_all_rounds