diff --git a/.github/workflows/pr-title-check.yaml b/.github/workflows/pr-title-check.yaml new file mode 100644 index 000000000..f0311634f --- /dev/null +++ b/.github/workflows/pr-title-check.yaml @@ -0,0 +1,35 @@ +name: PR Title Check + +on: + pull_request: + types: [opened, edited, reopened, synchronize] + +jobs: + title-check: + runs-on: ubuntu-latest + + steps: + - name: Check out repository + uses: actions/checkout@v3 + + - name: Check if PR is internal + id: check_internal + run: | + if [[ "${{ github.event.pull_request.head.repo.full_name }}" == "${{ github.repository }}" ]]; then + echo "internal=true" >> $GITHUB_ENV + else + echo "internal=false" >> $GITHUB_ENV + fi + + - name: Run title check script + if: env.internal == 'true' + run: | + pr_title="${{ github.event.pull_request.title }}" + pattern="^(Feature|Fix|Bug|Bugfix|Docs|Refactor|Chore|Github)\/SK-[0-9]+ \| .+" + if [[ ! "$pr_title" =~ $pattern ]]; then + echo "Error: PR title does not follow the required pattern." + echo "Please ensure the title follows the pattern: 'Feature|Fix|Bug|Bugfix|Docs|Refactor|Chore|Github/SK- | '" + exit 1 + else + echo "PR title is valid." + fi \ No newline at end of file diff --git a/fedn/cli/main.py b/fedn/cli/main.py index d6f912e62..0d5660c0b 100644 --- a/fedn/cli/main.py +++ b/fedn/cli/main.py @@ -1,3 +1,5 @@ +import importlib.metadata + import click CONTEXT_SETTINGS = dict( @@ -5,10 +7,16 @@ help_option_names=["-h", "--help"], ) +# Dynamically get the version of the package +try: + version = importlib.metadata.version("fedn") +except importlib.metadata.PackageNotFoundError: + version = "unknown" + @click.group(context_settings=CONTEXT_SETTINGS) +@click.version_option(version) @click.pass_context def main(ctx): - """:param ctx: - """ + """:param ctx:""" ctx.obj = dict() diff --git a/fedn/common/config.py b/fedn/common/config.py index 4864ce1ef..94b346d65 100644 --- a/fedn/common/config.py +++ b/fedn/common/config.py @@ -2,9 +2,6 @@ import yaml -global STATESTORE_CONFIG -global MODELSTORAGE_CONFIG - SECRET_KEY = os.environ.get("FEDN_JWT_SECRET_KEY", False) FEDN_JWT_CUSTOM_CLAIM_KEY = os.environ.get("FEDN_JWT_CUSTOM_CLAIM_KEY", False) FEDN_JWT_CUSTOM_CLAIM_VALUE = os.environ.get("FEDN_JWT_CUSTOM_CLAIM_VALUE", False) diff --git a/fedn/network/api/v1/session_routes.py b/fedn/network/api/v1/session_routes.py index c34d1d841..a045e60bf 100644 --- a/fedn/network/api/v1/session_routes.py +++ b/fedn/network/api/v1/session_routes.py @@ -5,6 +5,8 @@ from fedn.network.api.auth import jwt_auth_required from fedn.network.api.shared import control from fedn.network.api.v1.shared import api_version, get_post_data_to_kwargs, get_typed_list_headers, mdb +from fedn.network.combiner.interfaces import CombinerUnavailableError +from fedn.network.state import ReducerState from fedn.network.storage.statestore.stores.session_store import SessionStore from fedn.network.storage.statestore.stores.shared import EntityNotFound @@ -354,6 +356,18 @@ def post(): return jsonify({"message": "An unexpected error occurred"}), 500 +def _get_number_of_available_clients(): + result = 0 + for combiner in control.network.get_combiners(): + try: + nr_active_clients = len(combiner.list_active_clients()) + result = result + int(nr_active_clients) + except CombinerUnavailableError: + return 0 + + return result + + @bp.route("/start", methods=["POST"]) @jwt_auth_required(role="admin") def start_session(): @@ -367,24 +381,30 @@ def start_session(): data = request.json if request.headers["Content-Type"] == "application/json" else request.form.to_dict() session_id: str = data.get("session_id") rounds: int = data.get("rounds", "") + round_timeout: int = data.get("round_timeout", None) if not session_id or session_id == "": return jsonify({"message": "Session ID is required"}), 400 - if not rounds or rounds == "": - return jsonify({"message": "Rounds is required"}), 400 - - if not isinstance(rounds, int): - return jsonify({"message": "Rounds must be an integer"}), 400 - session = session_store.get(session_id, use_typing=False) session_config = session["session_config"] model_id = session_config["model_id"] + min_clients = session_config["clients_required"] + + if control.state() == ReducerState.monitoring: + return jsonify({"message": "A session is already running."}) + + if not rounds or not isinstance(rounds, int): + rounds = session_config["rounds"] + nr_available_clients = _get_number_of_available_clients() + + if nr_available_clients < min_clients: + return jsonify({"message": f"Number of available clients is lower than the required minimum of {min_clients}"}), 400 _ = model_store.get(model_id, use_typing=False) - threading.Thread(target=control.start_session, args=(session_id, rounds)).start() + threading.Thread(target=control.start_session, args=(session_id, rounds, round_timeout)).start() return jsonify({"message": "Session started"}), 200 except Exception: diff --git a/fedn/network/combiner/roundhandler.py b/fedn/network/combiner/roundhandler.py index ef9029de9..816957323 100644 --- a/fedn/network/combiner/roundhandler.py +++ b/fedn/network/combiner/roundhandler.py @@ -309,7 +309,6 @@ def _assign_round_clients(self, n, type="trainers"): clients = self.server.get_active_trainers() else: logger.error("(ERROR): {} is not a supported type of client".format(type)) - raise # If the number of requested trainers exceeds the number of available, use all available. if n > len(clients): diff --git a/fedn/network/controller/control.py b/fedn/network/controller/control.py index eb2f12843..38e775d0e 100644 --- a/fedn/network/controller/control.py +++ b/fedn/network/controller/control.py @@ -93,7 +93,7 @@ def __init__(self, statestore): super().__init__(statestore) self.name = "DefaultControl" - def start_session(self, session_id: str, rounds: int) -> None: + def start_session(self, session_id: str, rounds: int, round_timeout: int) -> None: if self._state == ReducerState.instructing: logger.info("Controller already in INSTRUCTING state. A session is in progress.") return @@ -116,6 +116,9 @@ def start_session(self, session_id: str, rounds: int) -> None: logger.error("Session not properly configured.") return + if round_timeout is not None: + session_config["round_timeout"] = round_timeout + self._state = ReducerState.monitoring last_round = int(self.get_latest_round_id()) @@ -151,6 +154,8 @@ def start_session(self, session_id: str, rounds: int) -> None: self.set_session_status(session_id, "Finished") self._state = ReducerState.idle + self.set_session_config(session_id, session_config) + def session(self, config: RoundConfig) -> None: """Execute a new training session. A session consists of one or several global rounds. All rounds in the same session diff --git a/fedn/network/controller/controlbase.py b/fedn/network/controller/controlbase.py index 3ae8e3731..297efd426 100644 --- a/fedn/network/controller/controlbase.py +++ b/fedn/network/controller/controlbase.py @@ -193,6 +193,16 @@ def get_session_status(self, session_id): """ return self.statestore.get_session_status(session_id) + def set_session_config(self, session_id: str, config: dict): + """Set the model id for a session. + + :param session_id: The session unique identifier + :type session_id: str + :param config: The session config + :type config: dict + """ + self.statestore.set_session_config_v2(session_id, config) + def create_round(self, round_data): """Initialize a new round in backend db.""" self.statestore.create_round(round_data) diff --git a/fedn/network/loadbalancer/leastpacked.py b/fedn/network/loadbalancer/leastpacked.py index 786dd8de0..8e793e95a 100644 --- a/fedn/network/loadbalancer/leastpacked.py +++ b/fedn/network/loadbalancer/leastpacked.py @@ -16,18 +16,16 @@ def find_combiner(self): """Find the combiner with the least number of attached clients. """ - min_clients = None + min_clients = -1 selected_combiner = None - for combiner in self.network.get_combiners(): try: if combiner.allowing_clients(): # Using default default Channel = 1, MODEL_UPDATE_REQUESTS nr_active_clients = len(combiner.list_active_clients()) - if not min_clients or nr_active_clients < min_clients: + if min_clients == -1 or nr_active_clients < min_clients: min_clients = nr_active_clients selected_combiner = combiner except CombinerUnavailableError: pass - return selected_combiner diff --git a/fedn/network/storage/statestore/mongostatestore.py b/fedn/network/storage/statestore/mongostatestore.py index 7e25b3ff4..7262e5554 100644 --- a/fedn/network/storage/statestore/mongostatestore.py +++ b/fedn/network/storage/statestore/mongostatestore.py @@ -882,6 +882,17 @@ def set_session_config(self, id: str, config: RoundConfig) -> None: """ self.sessions.update_one({"session_id": str(id)}, {"$push": {"session_config": config}}, True) + # Added to accomodate new session config structure + def set_session_config_v2(self, id: str, config: RoundConfig) -> None: + """Set the session configuration. + + :param id: The session id + :type id: str + :param config: Session configuration + :type config: dict + """ + self.sessions.update_one({"session_id": str(id)}, {"$set": {"session_config": config}}, True) + def set_session_status(self, id, status): """Set session status.