diff --git a/.ci/tests/chaos_test.py b/.ci/tests/chaos_test.py new file mode 100644 index 000000000..09841474f --- /dev/null +++ b/.ci/tests/chaos_test.py @@ -0,0 +1,210 @@ +from toxiproxy import Toxiproxy +import unittest +import grpc +import time +from fedn.network.clients.grpc_handler import GrpcHandler +import fedn.network.grpc.fedn_pb2 as fedn + + +class TestGRPCWithToxiproxy(unittest.TestCase): + @classmethod + def setUpClass(cls): + client_name = 'test-client' + client_id = 'test-client-id' + host = 'localhost' + port_proxy = 12081 + port_server = 12080 + token = "" + combiner_name = 'combiner' + + cls.toxiproxy = Toxiproxy() + if cls.toxiproxy.proxies(): + cls.toxiproxy.destroy_all() + + @classmethod + def tearDownClass(cls): + # Close the proxy and gRPC channel when done + cls.toxiproxy.destroy_all() + + @unittest.skip("Not implemented") + def test_normal_heartbeat(self): + # Test the heartbeat without any toxic + client_name = 'test-client' + client_id = 'test-client-id' + # Random proxy port + grpc_handler = GrpcHandler(host='localhost', port=12080, name=client_name, token='', combiner_name='combiner') + try: + response = grpc_handler.heartbeat(client_name, client_id) + self.assertIsInstance(response, fedn.Response) + except grpc.RpcError as e: + self.fail(f'gRPC error: {e.code()} {e.details()}') + finally: + grpc_handler.channel.close() + + @unittest.skip("Not implemented") + def test_latency_2s_toxic_heartbeat(self): + # Add latency of 1000ms + client_name = 'test-client' + client_id = 'test-client-id' + + proxy = self.toxiproxy.create(name='test_latency_toxic_heartbeat', listen='localhost:12082', upstream='localhost:12080') + grpc_handler = GrpcHandler(host='localhost', port=12082, name=client_name, token='', combiner_name='combiner') + proxy.add_toxic(name='latency', type='latency', attributes={'latency': 2000}) + + start_time = time.time() + try: + response = grpc_handler.heartbeat(client_name, client_id) + finally: + grpc_handler.channel.close() + proxy.destroy() + end_time = time.time() + + # Check that the latency delay is present + self.assertGreaterEqual(end_time - start_time, 2) # Expect at least 1 second delay + self.assertIsInstance(response, fedn.Response) + + def test_latency_long_toxic_heartbeat(self): + """Test gRPC request with a simulated latency of 25s. Should timeout based on KEEPALIVE_TIMEOUT_MS (default set to 20000).""" + client_name = 'test-client' + client_id = 'test-client-id' + latency = 20 # 15s latency + + proxy = self.toxiproxy.create(name='test_latency_toxic_heartbeat', listen='localhost:12083', upstream='localhost:12080') + grpc_handler = GrpcHandler(host='localhost', port=12083, name=client_name, token='', combiner_name='combiner') + proxy.add_toxic(name='latency', type='latency', attributes={'latency': latency * 1000}) + + start_time = time.time() + try: + response = grpc_handler.heartbeat(client_name, client_id) + except grpc.RpcError as e: + response = e + finally: + grpc_handler.channel.close() + proxy.destroy() + end_time = time.time() + + response + + # Check that the latency delay is present + self.assertGreaterEqual(end_time - start_time, latency) # Expect at least 1 second delay + self.assertIsInstance(response, grpc.RpcError) + self.assertEqual(response.code(), grpc.StatusCode.UNAVAILABLE) + self.assertEqual(response.details(), 'failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:12083: connection attempt timed out before receiving SETTINGS frame') + + def test_close_channel(self): + """ + Test closing the gRPC channel and trying to send a heartbeat. + Expect a ValueError to be raised. + """ + + client_name = 'test-client' + client_id = 'test-client-id' + + grpc_handler = GrpcHandler(host='localhost', port=12080, name=client_name, token='', combiner_name='combiner') + + # Close the channel + grpc_handler._disconnect() + + # Try to send heartbeat + with self.assertRaises(ValueError) as context: + response = grpc_handler.heartbeat(client_name, client_id) + self.assertEqual(str(context.exception), 'Cannot invoke RPC on closed channel!') + + + @unittest.skip("Not implemented") + def test_disconnect_toxic_heartbeat(self): + """Test gRPC request with a simulated disconnection.""" + # Add a timeout toxic to simulate network disconnection + client_name = 'test-client' + client_id = 'test-client-id' + + proxy = self.toxiproxy.create(name='test_disconnect_toxic_heartbeat', listen='localhost:12084', upstream='localhost:12080') + grpc_handler = GrpcHandler(host='localhost', port=12084, name=client_name, token='', combiner_name='combiner') + proxy.add_toxic(name='timeout', type='timeout', attributes={'timeout': 1000}) + + try: + response = grpc_handler.heartbeat(client_name, client_id) + except grpc.RpcError as e: + response = e + finally: + grpc_handler.channel.close() + proxy.destroy() + + # Assert that the response is a gRPC error with status code UNAVAILABLE + self.assertEqual(response.code(), grpc.StatusCode.UNAVAILABLE) + self.assertEqual(response.details(), 'failed to connect to all addresses; last error: UNAVAILABLE: ipv4:127.0.0.1:12084: Socket closed') + + @unittest.skip("Not implemented") + def test_timeout_toxic_heartbeat(self): + """Stops all data from getting through, and closes the connection after timeout. timeout is 0, + the connection won't close, and data will be delayed until the toxic is removed. + """ + # Add a timeout toxic to simulate network disconnection + client_name = 'test-client' + client_id = 'test-client-id' + + proxy = self.toxiproxy.create(name='test_timeout_toxic_heartbeat', listen='localhost:12085', upstream='localhost:12080') + grpc_handler = GrpcHandler(host='localhost', port=12085, name=client_name, token='', combiner_name='combiner') + proxy.add_toxic(name='timeout', type='timeout', attributes={'timeout': 0}) + + try: + response = grpc_handler.heartbeat(client_name, client_id) + except grpc.RpcError as e: + response = e + finally: + grpc_handler.channel.close() + proxy.destroy() + + # Assert that the response is a gRPC error with status code UNAVAILABLE + self.assertEqual(response.code(), grpc.StatusCode.UNAVAILABLE) + self.assertEqual(response.details(), 'failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:12085: connection attempt timed out before receiving SETTINGS frame') + + @unittest.skip("Not implemented") + def test_rate_limit_toxic_heartbeat(self): + # Purpose: Limits the number of connections that can be established within a certain time frame. + # Toxic: rate_limit + # Use Case: Useful for testing how the client behaves under strict rate limits. For example, in Federated Learning, + # this could simulate constraints in networks with multiple clients trying to access the server. + + # Add a rate limit toxic to the proxy + self.proxy.add_rate_limit(rate=1000) + + @unittest.skip("Not implemented") + def test_bandwidth_toxic_heartbeat(self): + # Purpose: Limits the bandwidth of the connection. + # Toxic: bandwidth + # Use Case: Useful for testing how the client behaves under limited bandwidth. For example, in Federated Learning, + # this could simulate a slow network connection between the client and the server. + + # Add a bandwidth toxic to the proxy + self.proxy.add_bandwidth(rate=1000) # 1 KB/s + + @unittest.skip("Not implemented") + def test_connection_reset(self): + # Purpose: Immediately resets the connection, simulating an abrupt network drop. + # Toxic: add_reset + # Use Case: This is helpful for testing error-handling logic on sudden network failures, + # ensuring the client retries appropriately or fails gracefully + + # Add a connection_reset toxic to the proxy + self.proxy.add_reset() + + @unittest.skip("Not implemented") + def test_slow_close(self): + # Purpose: Simulates a slow closing of the connection. + # Toxic: slow_close + # Use Case: Useful for testing how the client behaves when the server closes the connection slowly. + # This can help ensure that the client handles slow network disconnections gracefully. + + # Add a slow_close toxic to the proxy + self.proxy.add_slow_close(delay=1000) # Delay closing the connection by 1 second + + @unittest.skip("Not implemented") + def test_slicer(self): + # Purpose: Slices the data into smaller chunks. + # Toxic: slicer + # Use Case: Useful for testing how the client handles fragmented data. + # This can help ensure that the client can reassemble the data correctly and handle partial data gracefully. + + # Add a slicer toxic to the proxy + self.proxy.add_slicer(average_size=1000, size_variation=100) # Slice data into chunks of 1 KB with 100 bytes variation \ No newline at end of file diff --git a/.github/workflows/build-containers.yaml b/.github/workflows/build-containers.yaml index 89648764e..6c33e90c8 100644 --- a/.github/workflows/build-containers.yaml +++ b/.github/workflows/build-containers.yaml @@ -20,6 +20,7 @@ jobs: permissions: packages: write contents: read + security-events: write # for github/codeql-action/upload-sarif to upload SARIF results steps: - uses: actions/checkout@v4 @@ -48,7 +49,28 @@ jobs: - name: Build and push uses: docker/build-push-action@v4 with: - push: "${{ github.event_name != 'pull_request' }}" + push: true #"${{ github.event_name != 'pull_request' }}" tags: ${{ steps.meta1.outputs.tags }} labels: ${{ steps.meta1.outputs.labels }} file: Dockerfile + + # if push to master of release, run trivy scan on the image + - name: Trivy scan + if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/master' }} + uses: aquasecurity/trivy-action@0.28.0 + env: + TRIVY_DB_REPOSITORY: public.ecr.aws/aquasecurity/trivy-db,aquasec/trivy-db,ghcr.io/aquasecurity/trivy-db + TRIVY_JAVA_DB_REPOSITORY: public.ecr.aws/aquasecurity/trivy-java-db,aquasec/trivy-java-db,ghcr.io/aquasecurity/trivy-java-db + with: + image-ref: ghcr.io/${{ github.repository }}/fedn:master + format: 'sarif' + output: 'trivy-results.sarif' + severity: HIGH,CRITICAL + vuln-type: 'os,library' + github-pat: ${{ secrets.GITHUB_TOKEN }} + + - name: Upload Trivy scan results to GitHub Security tab + uses: github/codeql-action/upload-sarif@v3 + if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/master' }} + with: + sarif_file: 'trivy-results.sarif' diff --git a/.github/workflows/code-checks.yaml b/.github/workflows/code-checks.yaml index c379eb07c..d7a097558 100644 --- a/.github/workflows/code-checks.yaml +++ b/.github/workflows/code-checks.yaml @@ -4,7 +4,7 @@ on: push jobs: code-checks: - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest steps: - name: checkout uses: actions/checkout@v4 diff --git a/.github/workflows/integration-tests.yaml b/.github/workflows/integration-tests.yaml index 22c38dd1b..867860c83 100644 --- a/.github/workflows/integration-tests.yaml +++ b/.github/workflows/integration-tests.yaml @@ -23,9 +23,9 @@ jobs: to_test: - "mnist-keras numpyhelper" - "mnist-pytorch numpyhelper" - python_version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + python_version: ["3.9", "3.10", "3.11", "3.12"] os: - - ubuntu-22.04 + - ubuntu-24.04 runs-on: ${{ matrix.os }} steps: - name: checkout diff --git a/Dockerfile b/Dockerfile index b651dbea4..169fc5097 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,54 +1,61 @@ -# Base image -ARG BASE_IMG=python:3.10-slim -FROM $BASE_IMG +# Stage 1: Builder +ARG BASE_IMG=python:3.12-slim +FROM $BASE_IMG as builder ARG GRPC_HEALTH_PROBE_VERSION="" - -# Requirements (use MNIST Keras as default) ARG REQUIREMENTS="" +WORKDIR /build + +# Install build dependencies +RUN apt-get update && apt-get upgrade -y && apt-get install -y --no-install-recommends python3-dev gcc wget \ + && rm -rf /var/lib/apt/lists/* + # Add FEDn and default configs -COPY . /app -COPY config/settings-client.yaml.template /app/config/settings-client.yaml -COPY config/settings-combiner.yaml.template /app/config/settings-combiner.yaml -COPY config/settings-hooks.yaml.template /app/config/settings-hooks.yaml -COPY config/settings-reducer.yaml.template /app/config/settings-reducer.yaml -COPY $REQUIREMENTS /app/config/requirements.txt +COPY . /build +COPY $REQUIREMENTS /build/requirements.txt -# Install developer tools (needed for psutil) -RUN apt-get update && apt-get install -y python3-dev gcc +# Install dependencies +RUN python -m venv /venv \ + && /venv/bin/pip install --upgrade pip \ + && /venv/bin/pip install --no-cache-dir 'setuptools>=65' \ + && /venv/bin/pip install --no-cache-dir . \ + && if [[ ! -z "$REQUIREMENTS" ]]; then \ + /venv/bin/pip install --no-cache-dir -r /build/requirements.txt; \ + fi \ + && rm -rf /build/requirements.txt -# Install grpc health probe checker + +# Install grpc health probe RUN if [ ! -z "$GRPC_HEALTH_PROBE_VERSION" ]; then \ - apt-get install -y wget && \ - wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \ - chmod +x /bin/grpc_health_probe && \ - apt-get remove -y wget && apt autoremove -y; \ - else \ - echo "No grpc_health_probe version specified, skipping installation"; \ + wget -qO /build/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \ + chmod +x /build/grpc_health_probe; \ fi -# Setup working directory +# Stage 2: Runtime +FROM $BASE_IMG + WORKDIR /app -# Create FEDn app directory -SHELL ["/bin/bash", "-c"] -RUN mkdir -p /app \ - && mkdir -p /app/client \ - && mkdir -p /app/certs \ - && mkdir -p /app/client/package \ - && mkdir -p /app/certs \ - # - # Install FEDn and requirements - && python -m venv /venv \ - && /venv/bin/pip install --upgrade pip \ - && /venv/bin/pip install --no-cache-dir 'setuptools>=65' \ - && /venv/bin/pip install --no-cache-dir -e . \ - && if [[ ! -z "$REQUIREMENTS" ]]; then \ - /venv/bin/pip install --no-cache-dir -r /app/config/requirements.txt; \ - fi \ - # - # Clean up - && rm -r /app/config/requirements.txt +# Copy application and venv from the builder stage +COPY --from=builder /venv /venv +COPY --from=builder /build /app + +# Use a non-root user +RUN set -ex \ + # Create a non-root user + && addgroup --system --gid 1001 appgroup \ + && adduser --system --uid 1001 --gid 1001 --no-create-home appuser \ + # Creare application specific tmp directory, set ENV TMPDIR to /app/tmp + && mkdir -p /app/tmp \ + && chown -R appuser:appgroup /venv /app \ + # Upgrade the package index and install security upgrades + && apt-get update \ + && apt-get upgrade -y \ + && apt-get autoremove -y \ + && apt-get clean -y \ + && rm -rf /var/lib/apt/lists/* +USER appuser + +ENTRYPOINT [ "/venv/bin/fedn" ] -ENTRYPOINT [ "/venv/bin/fedn" ] \ No newline at end of file diff --git a/SECURITY.md b/SECURITY.md new file mode 100644 index 000000000..07abb3dc6 --- /dev/null +++ b/SECURITY.md @@ -0,0 +1,41 @@ +# Security Policy + +## Supported Versions + +We actively maintain and patch the following versions of this project. Please ensure you are using one of these supported versions before reporting security issues. + +| Version | Supported | +| ------- | ------------------ | +| >= 0.18.0 | :white_check_mark: | +| <= 0.17.0 | :x: | + +## Reporting a Vulnerability + +If you discover a security vulnerability, please follow these steps: + +1. **Do not disclose publicly.** + To protect users, please avoid discussing the vulnerability in public forums, issues, or pull requests. + +2. **Report privately.** + Email to **[security@scaleoutsystems.com](mailto:security@scaleoutsystems.com)** with the following information: + - A clear description of the vulnerability. + - Steps to reproduce the issue (if applicable). + - Any potential impact you foresee. + - Any patches or workarounds you've already implemented (if applicable). + +3. **Confirmation.** + We will confirm the report a.s.a.p by assessing the risk degree of the vulnerability and add it as an internal issue. We will email you about this process and let you know when the issue + has been addressed and in which release. + + +5. **Resolution.** + Once the vulnerability is resolved, we will issue an advisory and release a patch if required. We will credit the reporter unless anonymity is requested. + +## Security Best Practices + +While using this project, we recommend: +- Keeping the project up-to-date (which in turn will keep dependencies up-to-date). If you install from source, don't forget to reinstall the project after a pull. +- Reviewing [our documentation](https://docs.scaleoutsystems.com/en/stable/) for secure configuration tips. +- Reporting issues responsibly. + +We thank you for helping us keep this project safe for the community! diff --git a/docker-compose.yaml b/docker-compose.yaml index f020d9a0c..598aad0cb 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -58,21 +58,23 @@ services: - USER=test - PROJECT=project - FLASK_DEBUG=1 - - STATESTORE_CONFIG=/app/config/settings-reducer.yaml - - MODELSTORAGE_CONFIG=/app/config/settings-reducer.yaml + - STATESTORE_CONFIG=/app/config/settings-reducer.yaml.template + - MODELSTORAGE_CONFIG=/app/config/settings-reducer.yaml.template + - FEDN_COMPUTE_PACKAGE_DIR=/app + - TMPDIR=/app/tmp build: context: . args: - BASE_IMG: ${BASE_IMG:-python:3.10-slim} + BASE_IMG: ${BASE_IMG:-python:3.12-slim} working_dir: /app volumes: - ${HOST_REPO_DIR:-.}/fedn:/app/fedn depends_on: - minio - mongo - entrypoint: [ "sh", "-c" ] command: - - "/venv/bin/pip install --no-cache-dir -e . && /venv/bin/fedn controller start" + - controller + - start ports: - 8092:8092 @@ -81,24 +83,27 @@ services: environment: - PYTHONUNBUFFERED=0 - GET_HOSTS_FROM=dns - - STATESTORE_CONFIG=/app/config/settings-combiner.yaml - - MODELSTORAGE_CONFIG=/app/config/settings-combiner.yaml + - STATESTORE_CONFIG=/app/config/settings-combiner.yaml.template + - MODELSTORAGE_CONFIG=/app/config/settings-combiner.yaml.template - HOOK_SERVICE_HOST=hook:12081 + - TMPDIR=/app/tmp build: context: . args: - BASE_IMG: ${BASE_IMG:-python:3.10-slim} - GRPC_HEALTH_PROBE_VERSION: v0.4.24 + BASE_IMG: ${BASE_IMG:-python:3.12-slim} + GRPC_HEALTH_PROBE_VERSION: v0.4.35 working_dir: /app volumes: - ${HOST_REPO_DIR:-.}/fedn:/app/fedn - entrypoint: [ "sh", "-c" ] command: - - "/venv/bin/pip install --no-cache-dir -e . && /venv/bin/fedn combiner start --init config/settings-combiner.yaml" + - combiner + - start + - --init + - config/settings-combiner.yaml.template ports: - 12080:12080 healthcheck: - test: [ "CMD", "/bin/grpc_health_probe", "-addr=localhost:12080" ] + test: [ "CMD", "/app/grpc_health_probe", "-addr=localhost:12080" ] interval: 20s timeout: 10s retries: 5 @@ -110,11 +115,12 @@ services: container_name: hook environment: - GET_HOSTS_FROM=dns + - TMPDIR=/app/tmp build: context: . args: - BASE_IMG: ${BASE_IMG:-python:3.10-slim} - GRPC_HEALTH_PROBE_VERSION: v0.4.24 + BASE_IMG: ${BASE_IMG:-python:3.12-slim} + GRPC_HEALTH_PROBE_VERSION: v0.4.35 working_dir: /app volumes: - ${HOST_REPO_DIR:-.}/fedn:/app/fedn @@ -141,9 +147,11 @@ services: working_dir: /app volumes: - ${HOST_REPO_DIR:-.}/fedn:/app/fedn - entrypoint: [ "sh", "-c" ] command: - - "/venv/bin/pip install --no-cache-dir -e . && /venv/bin/fedn client start --api-url http://api-server:8092" + - client + - start + - --api-url + - http://api-server:8092 deploy: replicas: 0 depends_on: diff --git a/docs/conf.py b/docs/conf.py index c48e378f4..f451f8b74 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -11,7 +11,7 @@ author = "Scaleout Systems AB" # The full version, including alpha/beta/rc tags -release = "0.19.0" +release = "0.20.0" # Add any Sphinx extension module names here, as strings extensions = [ diff --git a/docs/quickstart.rst b/docs/quickstart.rst index e7641f0d4..0a309bb32 100644 --- a/docs/quickstart.rst +++ b/docs/quickstart.rst @@ -9,7 +9,7 @@ Getting started with FEDn **Prerequisites** -- `Python >=3.8, <=3.12 `__ +- `Python >=3.9, <=3.12 `__ - `A FEDn Studio account `__ diff --git a/examples/FedSimSiam/README.rst b/examples/FedSimSiam/README.rst index 5831fd3ea..47fa93c6b 100644 --- a/examples/FedSimSiam/README.rst +++ b/examples/FedSimSiam/README.rst @@ -16,7 +16,7 @@ To run the example, follow the steps below. For a more detailed explanation, fol Prerequisites ------------- -- `Python >=3.8, <=3.12 `__ +- `Python >=3.9, <=3.12 `__ - `A project in FEDn Studio `__ Creating the compute package and seed model diff --git a/examples/FedSimSiam/client/python_env.yaml b/examples/FedSimSiam/client/python_env.yaml index d728b82be..45f23ad30 100644 --- a/examples/FedSimSiam/client/python_env.yaml +++ b/examples/FedSimSiam/client/python_env.yaml @@ -9,7 +9,6 @@ dependencies: - torch==2.2.2; sys_platform == "darwin" and platform_machine == "x86_64" - torchvision==0.19.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") - torchvision==0.17.2; sys_platform == "darwin" and platform_machine == "x86_64" - - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64" and python_version >= "3.9") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux" and python_version >= "3.9") - - numpy==1.26.4; (sys_platform == "darwin" and platform_machine == "x86_64" and python_version >= "3.9") - - numpy==1.24.4; python_version == "3.8" + - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") + - numpy==1.26.4; (sys_platform == "darwin" and platform_machine == "x86_64") - fedn diff --git a/examples/flower-client/client/python_env.yaml b/examples/flower-client/client/python_env.yaml index a82e7e50d..06b00186c 100644 --- a/examples/flower-client/client/python_env.yaml +++ b/examples/flower-client/client/python_env.yaml @@ -10,8 +10,7 @@ dependencies: - torch==2.2.2; sys_platform == "darwin" and platform_machine == "x86_64" - torchvision==0.19.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") - torchvision==0.17.2; sys_platform == "darwin" and platform_machine == "x86_64" - - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64" and python_version >= "3.9") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux" and python_version >= "3.9") - - numpy==1.26.4; (sys_platform == "darwin" and platform_machine == "x86_64" and python_version >= "3.9") - - numpy==1.24.4; python_version == "3.8" + - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") + - numpy==1.26.4; (sys_platform == "darwin" and platform_machine == "x86_64") - fire==0.3.1 - flwr-datasets[vision]==0.1.0 \ No newline at end of file diff --git a/examples/huggingface/README.rst b/examples/huggingface/README.rst index eaaad3254..68bceb685 100644 --- a/examples/huggingface/README.rst +++ b/examples/huggingface/README.rst @@ -27,7 +27,7 @@ To run the example, follow the steps below. For a more detailed explanation, fol Prerequisites ------------- -- `Python >=3.8, <=3.12 `__ +- `Python >=3.9, <=3.12 `__ - `A project in FEDn Studio `__ Creating the compute package and seed model diff --git a/examples/huggingface/client/python_env.yaml b/examples/huggingface/client/python_env.yaml index 87ee6f32d..6cc2925b4 100644 --- a/examples/huggingface/client/python_env.yaml +++ b/examples/huggingface/client/python_env.yaml @@ -9,9 +9,8 @@ dependencies: - torch==2.2.2; sys_platform == "darwin" and platform_machine == "x86_64" - torchvision==0.19.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") - torchvision==0.17.2; sys_platform == "darwin" and platform_machine == "x86_64" - - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64" and python_version >= "3.9") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux" and python_version >= "3.9") - - numpy==1.26.4; (sys_platform == "darwin" and platform_machine == "x86_64" and python_version >= "3.9") - - numpy==1.24.4; python_version == "3.8" + - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") + - numpy==1.26.4; (sys_platform == "darwin" and platform_machine == "x86_64") - transformers - datasets - fedn diff --git a/examples/mnist-keras/README.rst b/examples/mnist-keras/README.rst index aaf13c21d..741813362 100644 --- a/examples/mnist-keras/README.rst +++ b/examples/mnist-keras/README.rst @@ -8,7 +8,7 @@ This is a TF/Keras version of the PyTorch Quickstart Tutorial. For a step-by-ste Prerequisites ------------------------------------------- -- `Python >=3.8, <=3.12 `__ +- `Python >=3.9, <=3.12 `__ Creating the compute package and seed model ------------------------------------------- diff --git a/examples/mnist-pytorch-DPSGD/README.rst b/examples/mnist-pytorch-DPSGD/README.rst index 88220584a..cfe03f081 100644 --- a/examples/mnist-pytorch-DPSGD/README.rst +++ b/examples/mnist-pytorch-DPSGD/README.rst @@ -9,7 +9,7 @@ We have expanded our baseline MNIST-PyTorch example by incorporating the Opacus Prerequisites ------------- -- `Python >=3.8, <=3.12 `__ +- `Python >=3.9, <=3.12 `__ - `A project in FEDn Studio `__ Edit Differential Privacy budget diff --git a/examples/mnist-pytorch-DPSGD/client/python_env.yaml b/examples/mnist-pytorch-DPSGD/client/python_env.yaml index 13d586102..526022145 100644 --- a/examples/mnist-pytorch-DPSGD/client/python_env.yaml +++ b/examples/mnist-pytorch-DPSGD/client/python_env.yaml @@ -10,7 +10,6 @@ dependencies: - torch==2.2.2; sys_platform == "darwin" and platform_machine == "x86_64" - torchvision==0.19.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") - torchvision==0.17.2; sys_platform == "darwin" and platform_machine == "x86_64" - - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64" and python_version >= "3.9") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux" and python_version >= "3.9") - - numpy==1.26.4; (sys_platform == "darwin" and platform_machine == "x86_64" and python_version >= "3.9") - - numpy==1.24.4; python_version == "3.8" + - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") + - numpy==1.26.4; (sys_platform == "darwin" and platform_machine == "x86_64") - opacus diff --git a/examples/mnist-pytorch/README.rst b/examples/mnist-pytorch/README.rst index 990b902b2..1c0afc5d0 100644 --- a/examples/mnist-pytorch/README.rst +++ b/examples/mnist-pytorch/README.rst @@ -9,7 +9,7 @@ The example is intented as a minimalistic quickstart to learn how to use FEDn. Prerequisites ------------- -- `Python >=3.8, <=3.12 `__ +- `Python >=3.9, <=3.12 `__ - `A project in FEDn Studio `__ Creating the compute package and seed model diff --git a/examples/mnist-pytorch/client/python_env.yaml b/examples/mnist-pytorch/client/python_env.yaml index 272b196ea..7a35ff7a2 100644 --- a/examples/mnist-pytorch/client/python_env.yaml +++ b/examples/mnist-pytorch/client/python_env.yaml @@ -10,6 +10,5 @@ dependencies: - torch==2.2.2; sys_platform == "darwin" and platform_machine == "x86_64" - torchvision==0.19.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") - torchvision==0.17.2; sys_platform == "darwin" and platform_machine == "x86_64" - - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64" and python_version >= "3.9") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux" and python_version >= "3.9") - - numpy==1.26.4; (sys_platform == "darwin" and platform_machine == "x86_64" and python_version >= "3.9") - - numpy==1.24.4; python_version == "3.8" + - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") + - numpy==1.26.4; (sys_platform == "darwin" and platform_machine == "x86_64") diff --git a/examples/monai-2D-mednist/README.rst b/examples/monai-2D-mednist/README.rst index f61820682..00eca5321 100644 --- a/examples/monai-2D-mednist/README.rst +++ b/examples/monai-2D-mednist/README.rst @@ -16,7 +16,7 @@ Prerequisites Using FEDn Studio: -- `Python 3.8, 3.9, 3.10 or 3.11 `__ +- `Python 3.9, 3.10 or 3.11 `__ - `A FEDn Studio account `__ diff --git a/examples/monai-2D-mednist/client/python_env.yaml b/examples/monai-2D-mednist/client/python_env.yaml index 389b3a42a..546f1ffbe 100644 --- a/examples/monai-2D-mednist/client/python_env.yaml +++ b/examples/monai-2D-mednist/client/python_env.yaml @@ -10,7 +10,6 @@ dependencies: - torch==2.2.2; sys_platform == "darwin" and platform_machine == "x86_64" - torchvision==0.19.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") - torchvision==0.17.2; sys_platform == "darwin" and platform_machine == "x86_64" - - numpy==1.26.4; python_version >= "3.9" - - numpy==1.24.4; python_version == "3.8" + - numpy==1.26.4 - monai-weekly[pillow, tqdm] - scikit-learn diff --git a/examples/monai-2D-mednist/client/validate.py b/examples/monai-2D-mednist/client/validate.py index ff4eb9263..f4b8cfd33 100644 --- a/examples/monai-2D-mednist/client/validate.py +++ b/examples/monai-2D-mednist/client/validate.py @@ -55,7 +55,7 @@ def validate(in_model_path, out_json_path, data_path=None, client_settings_path= image_list = clients["client " + str(split_index)]["validation"] - val_ds = MedNISTDataset(data_path=data_path+"/MedNIST/", transforms=val_transforms, image_files=image_list) + val_ds = MedNISTDataset(data_path=data_path + "/MedNIST/", transforms=val_transforms, image_files=image_list) val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=True, num_workers=num_workers) @@ -86,8 +86,9 @@ def validate(in_model_path, out_json_path, data_path=None, client_settings_path= # JSON schema report.update({"test_accuracy": accuracy_score(y_true, y_pred), "test_f1_score": f1_score(y_true, y_pred, average="macro")}) - for r in report: - print(r, ": ", report[r]) + + for key, value in report.items(): + print(f"{key}: {value}") # Save JSON save_metrics(report, out_json_path) diff --git a/examples/server-functions/README.rst b/examples/server-functions/README.rst index c594fac28..fc70ffcff 100644 --- a/examples/server-functions/README.rst +++ b/examples/server-functions/README.rst @@ -1,11 +1,36 @@ FEDn Project: Server functions toy example ----------------------------- -See server_functions.py for details. +Beta useage will be available by request from FEDn 0.20.0 and onward. -README Will be updated after studio update. +See server_functions.py for details on server-functionality can be supplied. -To run with server functions: +To apply server-functionality in studio first connect to your project through the APIClient. -from server_functions import ServerFunctions -client.start_session(server_functions=ServerFunctions) \ No newline at end of file +See https://docs.scaleoutsystems.com/en/stable/apiclient.html for more information. + +When connected to the project API you can start sessions with your supplied server functions. + +Full commands to run through the API client: + +Get your token from the settings page in your studio project and add it in your system environment. + +.. code-block:: + + export FEDN_AUTH_TOKEN= + +Connect through the APIClient from a python instance, you can find your controller host on the studio Dashboard page. + +.. code-block:: + + from fedn import APIClient + client = APIClient(host="", secure=True, verify=True) + +Start a session with your ServerFunctions code (assuming you have uploaded a model seed, compute package and have connected clients). + +.. code-block:: + + from server_functions import ServerFunctions + client.start_session(server_functions=ServerFunctions) + +Logs from the server functions code are visible from the studio dashboard logs page. diff --git a/fedn/cli/__init__.py b/fedn/cli/__init__.py index 7028dbfa6..be680eb23 100644 --- a/fedn/cli/__init__.py +++ b/fedn/cli/__init__.py @@ -11,3 +11,4 @@ from .status_cmd import status_cmd # noqa: F401 from .validation_cmd import validation_cmd # noqa: F401 from .controller_cmd import controller_cmd # noqa: F401 +from .login_cmd import login_cmd # noqa: F401 diff --git a/fedn/cli/combiner_cmd.py b/fedn/cli/combiner_cmd.py index c6b3d19d7..0a6403587 100644 --- a/fedn/cli/combiner_cmd.py +++ b/fedn/cli/combiner_cmd.py @@ -88,9 +88,6 @@ def list_combiners(ctx, protocol: str, host: str, port: str, token: str = None, if _token: headers["Authorization"] = _token - - click.echo(f"\nListing combiners: {url}\n") - click.echo(f"Headers: {headers}") try: response = requests.get(url, headers=headers) print_response(response, "combiners", None) @@ -114,7 +111,6 @@ def get_combiner(ctx, protocol: str, host: str, port: str, token: str = None, id url = get_api_url(protocol=protocol, host=host, port=port, endpoint="combiners") headers = {} - _token = get_token(token) if _token: @@ -123,7 +119,6 @@ def get_combiner(ctx, protocol: str, host: str, port: str, token: str = None, id if id: url = f"{url}{id}" - try: response = requests.get(url, headers=headers) print_response(response, "combiner", id) diff --git a/fedn/cli/login_cmd.py b/fedn/cli/login_cmd.py new file mode 100644 index 000000000..d2ce8ac90 --- /dev/null +++ b/fedn/cli/login_cmd.py @@ -0,0 +1,61 @@ +import os +from getpass import getpass + +import click +import requests +import yaml + +from .main import main + +# Replace this with the platform's actual login endpoint +home_dir = os.path.expanduser("~") + + +@main.group("studio") +@click.pass_context +def login_cmd(ctx): + """:param ctx:""" + pass + + +@login_cmd.command("login") +@click.option("-p", "--protocol", required=False, default="https", help="Communication protocol") +@click.option("-H", "--host", required=False, default="fedn.scaleoutsystems.com", help="Hostname of controller (api)") +@click.pass_context +def login_cmd(ctx, protocol: str, host: str): + """Logging into FEDn Studio""" + # Step 1: Display welcome message + click.secho("Welcome to Scaleout FEDn!", fg="green") + + url = f"{protocol}://{host}/api/token/" + + # Step 3: Prompt for username and password + username = input("Please enter your username: ") + password = getpass("Please enter your password: ") + + # Call the authentication API + try: + response = requests.post(url, json={"username": username, "password": password}, headers={"Content-Type": "application/json"}) + response.raise_for_status() # Raise an error for HTTP codes 4xx/5xx + except requests.exceptions.RequestException as e: + click.secho("Error connecting to the platform. Please try again.", fg="red") + click.secho(str(e), fg="red") + return + + # Handle the response + if response.status_code == 200: + data = response.json() + if data.get("access"): + click.secho("Login successful!", fg="green") + context_path = os.path.join(home_dir, ".fedn") + if not os.path.exists(context_path): + os.makedirs(context_path) + try: + with open(f"{context_path}/context.yaml", "w") as yaml_file: + yaml.dump(data, yaml_file, default_flow_style=False) # Add access and refresh tokens to context yaml file + except Exception as e: + print(f"Error: Failed to write to YAML file. Details: {e}") + else: + click.secho("Login failed. Please check your credentials.", fg="red") + else: + click.secho(f"Unexpected error: {response.text}", fg="red") diff --git a/fedn/network/api/client.py b/fedn/network/api/client.py index ab3e2e07d..aca36cbdc 100644 --- a/fedn/network/api/client.py +++ b/fedn/network/api/client.py @@ -329,8 +329,18 @@ def set_active_model(self, path): :return: A dict with success or failure message. :rtype: dict """ + if path.endswith(".npz"): + helper = "numpyhelper" + elif path.endswith(".bin"): + helper = "binaryhelper" + + if helper: + response = requests.put(self._get_url_api_v1("helpers/active"), json={"helper": helper}, verify=self.verify, headers=self.headers) + with open(path, "rb") as file: - response = requests.post(self._get_url("set_initial_model"), files={"file": file}, verify=self.verify, headers=self.headers) + response = requests.post( + self._get_url("set_initial_model"), files={"file": file}, data={"helper": helper}, verify=self.verify, headers=self.headers + ) return response.json() # --- Packages --- # @@ -606,27 +616,50 @@ def start_session( :return: A dict with success or failure message and session config. :rtype: dict """ + if model_id is None: + response = requests.get(self._get_url_api_v1("models/active"), verify=self.verify, headers=self.headers) + if response.status_code == 200: + model_id = response.json() + else: + return response.json() + response = requests.post( - self._get_url("start_session"), + self._get_url_api_v1("sessions/"), json={ "session_id": id, - "aggregator": aggregator, - "aggregator_kwargs": aggregator_kwargs, - "model_id": model_id, - "round_timeout": round_timeout, - "rounds": rounds, - "round_buffer_size": round_buffer_size, - "delete_models": delete_models, - "validate": validate, - "helper": helper, - "min_clients": min_clients, - "requested_clients": requested_clients, - "server_functions": None if server_functions is None else inspect.getsource(server_functions), + "session_config": { + "aggregator": aggregator, + "aggregator_kwargs": aggregator_kwargs, + "round_timeout": round_timeout, + "buffer_size": round_buffer_size, + "model_id": model_id, + "delete_models_storage": delete_models, + "clients_required": min_clients, + "requested_clients": requested_clients, + "validate": validate, + "helper_type": helper, + "server_functions": None if server_functions is None else inspect.getsource(server_functions), + }, }, verify=self.verify, headers=self.headers, ) + if id is None: + id = response.json()["session_id"] + + if response.status_code == 201: + response = requests.post( + self._get_url_api_v1("sessions/start"), + json={ + "session_id": id, + "rounds": rounds, + "round_timeout": round_timeout, + }, + verify=self.verify, + headers=self.headers, + ) + _json = response.json() return _json diff --git a/fedn/network/clients/client_v2.py b/fedn/network/clients/client_v2.py index 6d1f52fb4..43edc9b79 100644 --- a/fedn/network/clients/client_v2.py +++ b/fedn/network/clients/client_v2.py @@ -10,7 +10,7 @@ from fedn.common.log_config import logger from fedn.network.clients.fedn_client import ConnectToApiResult, FednClient, GrpcConnectionOptions from fedn.network.combiner.modelservice import get_tmp_path -from fedn.utils.helpers.helpers import get_helper +from fedn.utils.helpers.helpers import get_helper, save_metadata def get_url(api_url: str, api_port: int) -> str: @@ -132,15 +132,15 @@ def set_helper(self, response: GrpcConnectionOptions = None): # Priority: helper_type from constructor > helper_type from response > default helper_type self.helper = get_helper(helper_type_to_use) - def on_train(self, in_model): - out_model, meta = self._process_training_request(in_model) + def on_train(self, in_model, client_settings): + out_model, meta = self._process_training_request(in_model, client_settings) return out_model, meta def on_validation(self, in_model): metrics = self._process_validation_request(in_model) return metrics - def _process_training_request(self, in_model: BytesIO) -> Tuple[BytesIO, dict]: + def _process_training_request(self, in_model: BytesIO, client_settings: dict) -> Tuple[BytesIO, dict]: """Process a training (model update) request. :param in_model: The model to be updated. @@ -156,6 +156,8 @@ def _process_training_request(self, in_model: BytesIO) -> Tuple[BytesIO, dict]: with open(inpath, "wb") as fh: fh.write(in_model.getbuffer()) + save_metadata(metadata=client_settings, filename=inpath) + outpath = self.helper.get_tmp_path() tic = time.time() diff --git a/fedn/network/clients/fedn_client.py b/fedn/network/clients/fedn_client.py index 828758131..3f7124cb2 100644 --- a/fedn/network/clients/fedn_client.py +++ b/fedn/network/clients/fedn_client.py @@ -225,8 +225,9 @@ def update_local_model(self, request): self.send_status(f"\t Starting processing of training request for model_id {model_id}", sesssion_id=request.session_id, sender_name=self.name) logger.info(f"Running train callback with model ID: {model_id}") + client_settings = json.loads(request.data).get("client_settings", {}) tic = time.time() - out_model, meta = self.train_callback(in_model) + out_model, meta = self.train_callback(in_model, client_settings) meta["processing_time"] = time.time() - tic tic = time.time() diff --git a/fedn/network/clients/grpc_handler.py b/fedn/network/clients/grpc_handler.py index 4b7d9874c..0aeedf344 100644 --- a/fedn/network/clients/grpc_handler.py +++ b/fedn/network/clients/grpc_handler.py @@ -65,16 +65,25 @@ def __init__(self, host: str, port: int, name: str, token: str, combiner_name: s ("client", name), ("grpc-server", combiner_name), ] + self.host = host + self.port = port + self.token = token - if port == 443: - self._init_secure_channel(host, port, token) - else: - self._init_insecure_channel(host, port) + self._init_channel(host, port, token) + self._init_stubs() + + def _init_stubs(self): self.connectorStub = rpc.ConnectorStub(self.channel) self.combinerStub = rpc.CombinerStub(self.channel) self.modelStub = rpc.ModelServiceStub(self.channel) + def _init_channel(self, host: str, port: int, token: str): + if port == 443: + self._init_secure_channel(host, port, token) + else: + self._init_insecure_channel(host, port) + def _init_secure_channel(self, host: str, port: int, token: str): url = f"{host}:{port}" logger.info(f"Connecting (GRPC) to {url}") @@ -116,10 +125,10 @@ def heartbeat(self, client_name: str, client_id: str): logger.info("Sending heartbeat to combiner") response = self.connectorStub.SendHeartbeat(heartbeat, metadata=self.metadata) except grpc.RpcError as e: + logger.error(f"GRPC (SendHeartbeat): An error occurred: {e}") raise e except Exception as e: logger.error(f"GRPC (SendHeartbeat): An error occurred: {e}") - self._disconnect() raise e return response @@ -130,6 +139,8 @@ def send_heartbeats(self, client_name: str, client_id: str, update_frequency: fl response = self.heartbeat(client_name, client_id) except grpc.RpcError as e: return self._handle_grpc_error(e, "SendHeartbeat", lambda: self.send_heartbeats(client_name, client_id, update_frequency)) + except Exception as e: + return self._handle_unknown_error(e, "SendHeartbeat", lambda: self.send_heartbeats(client_name, client_id, update_frequency)) if isinstance(response, fedn.Response): logger.info("Heartbeat successful.") else: @@ -166,10 +177,11 @@ def listen_to_task_stream(self, client_name: str, client_id: str, callback: Call callback(request) except grpc.RpcError as e: + self.logger.error(f"GRPC (TaskStream): An error occurred: {e}") return self._handle_grpc_error(e, "TaskStream", lambda: self.listen_to_task_stream(client_name, client_id, callback)) except Exception as e: logger.error(f"GRPC (TaskStream): An error occurred: {e}") - self._disconnect() + self._handle_unknown_error(e, "TaskStream", lambda: self.listen_to_task_stream(client_name, client_id, callback)) def send_status(self, msg: str, log_level=fedn.Status.INFO, type=None, request=None, sesssion_id: str = None, sender_name: str = None): """Send status message. @@ -204,7 +216,7 @@ def send_status(self, msg: str, log_level=fedn.Status.INFO, type=None, request=N return self._handle_grpc_error(e, "SendStatus", lambda: self.send_status(msg, log_level, type, request, sesssion_id, sender_name)) except Exception as e: logger.error(f"GRPC (SendStatus): An error occurred: {e}") - self._disconnect() + self._handle_unknown_error(e, "SendStatus", lambda: self.send_status(msg, log_level, type, request, sesssion_id, sender_name)) def get_model_from_combiner(self, id: str, client_id: str, timeout: int = 20) -> BytesIO: """Fetch a model from the assigned combiner. @@ -241,8 +253,7 @@ def get_model_from_combiner(self, id: str, client_id: str, timeout: int = 20) -> return self._handle_grpc_error(e, "Download", lambda: self.get_model_from_combiner(id, client_id, timeout)) except Exception as e: logger.error(f"GRPC (Download): An error occurred: {e}") - self._disconnect() - + self._handle_unknown_error(e, "Download", lambda: self.get_model_from_combiner(id, client_id, timeout)) return data def send_model_to_combiner(self, model: BytesIO, id: str): @@ -273,8 +284,7 @@ def send_model_to_combiner(self, model: BytesIO, id: str): return self._handle_grpc_error(e, "Upload", lambda: self.send_model_to_combiner(model, id)) except Exception as e: logger.error(f"GRPC (Upload): An error occurred: {e}") - self._disconnect() - + self._handle_unknown_error(e, "Upload", lambda: self.send_model_to_combiner(model, id)) return result def create_update_message( @@ -353,8 +363,7 @@ def send_model_update(self, update: fedn.ModelUpdate): return self._handle_grpc_error(e, "SendModelUpdate", lambda: self.send_model_update(update)) except Exception as e: logger.error(f"GRPC (SendModelUpdate): An error occurred: {e}") - self._disconnect() - + self._handle_unknown_error(e, "SendModelUpdate", lambda: self.send_model_update(update)) return True def send_model_validation(self, validation: fedn.ModelValidation) -> bool: @@ -369,8 +378,7 @@ def send_model_validation(self, validation: fedn.ModelValidation) -> bool: ) except Exception as e: logger.error(f"GRPC (SendModelValidation): An error occurred: {e}") - self._disconnect() - + self._handle_unknown_error(e, "SendModelValidation", lambda: self.send_model_validation(validation)) return True def send_model_prediction(self, prediction: fedn.ModelPrediction) -> bool: @@ -385,8 +393,7 @@ def send_model_prediction(self, prediction: fedn.ModelPrediction) -> bool: ) except Exception as e: logger.error(f"GRPC (SendModelPrediction): An error occurred: {e}") - self._disconnect() - + self._handle_unknown_error(e, "SendModelPrediction", lambda: self.send_model_prediction(prediction)) return True def _handle_grpc_error(self, e, method_name: str, sender_function: Callable): @@ -399,12 +406,38 @@ def _handle_grpc_error(self, e, method_name: str, sender_function: Callable): logger.warning(f"GRPC ({method_name}): connection cancelled. Retrying in 5 seconds.") time.sleep(5) return sender_function() - if status_code == grpc.StatusCode.UNAUTHENTICATED: + elif status_code == grpc.StatusCode.UNAUTHENTICATED: details = e.details() if details == "Token expired": logger.warning(f"GRPC ({method_name}): Token expired.") + raise e + elif status_code == grpc.StatusCode.UNKNOWN: + logger.warning(f"GRPC ({method_name}): An unknown error occurred: {e}.") + details = e.details() + if details == "Stream removed": + logger.warning(f"GRPC ({method_name}): Stream removed. Reconnecting") + self._disconnect() + self._init_channel(self.host, self.port, self.token) + self._init_stubs() + return sender_function() + raise e self._disconnect() logger.error(f"GRPC ({method_name}): An error occurred: {e}") + raise e + + def _handle_unknown_error(self, e, method_name: str, sender_function: Callable): + # Try to reconnect + logger.warning(f"GRPC ({method_name}): An unknown error occurred: {e}.") + if isinstance(e, ValueError): + # ValueError is raised when the channel is closed + self._disconnect() + logger.warning(f"GRPC ({method_name}): Reconnecting to channel.") + # recreate the channel + self._init_channel(self.host, self.port, self.token) + self._init_stubs() + return sender_function() + else: + raise e def _disconnect(self): """Disconnect from the combiner.""" diff --git a/fedn/network/combiner/hooks/hook_client.py b/fedn/network/combiner/hooks/hook_client.py index 3219e5a17..7ad4cc5b5 100644 --- a/fedn/network/combiner/hooks/hook_client.py +++ b/fedn/network/combiner/hooks/hook_client.py @@ -17,22 +17,19 @@ class CombinerHookInterface: def __init__(self): """Initialize CombinerHookInterface client.""" - try: - self.hook_service_host = os.getenv("HOOK_SERVICE_HOST", "hook:12081") - self.channel = grpc.insecure_channel( - self.hook_service_host, - options=[ - ("grpc.keepalive_time_ms", 30000), # 30 seconds ping interval - ("grpc.keepalive_timeout_ms", 5000), # 5 seconds timeout for a response - ("grpc.keepalive_permit_without_calls", 1), # allow keepalives even with no active calls - ("grpc.enable_retries", 1), # automatic retries - ("grpc.initial_reconnect_backoff_ms", 1000), # initial delay before retrying - ("grpc.max_reconnect_backoff_ms", 5000), # maximum delay before retrying - ], - ) - self.stub = rpc.FunctionServiceStub(self.channel) - except Exception as e: - logger.warning(f"Failed to initialize connection to hooks container with error {e}") + self.hook_service_host = os.getenv("HOOK_SERVICE_HOST", "hook:12081") + self.channel = grpc.insecure_channel( + self.hook_service_host, + options=[ + ("grpc.keepalive_time_ms", 30000), # 30 seconds ping interval + ("grpc.keepalive_timeout_ms", 5000), # 5 seconds timeout for a response + ("grpc.keepalive_permit_without_calls", 1), # allow keepalives even with no active calls + ("grpc.enable_retries", 1), # automatic retries + ("grpc.initial_reconnect_backoff_ms", 1000), # initial delay before retrying + ("grpc.max_reconnect_backoff_ms", 5000), # maximum delay before retrying + ], + ) + self.stub = rpc.FunctionServiceStub(self.channel) def provided_functions(self, server_functions: str): """Communicates to hook container and asks which functions are available. @@ -47,8 +44,14 @@ def provided_functions(self, server_functions: str): response = self.stub.HandleProvidedFunctions(request) return response.available_functions + except grpc.RpcError as rpc_error: + if rpc_error.code() == grpc.StatusCode.UNAVAILABLE: + logger.warning(f"Server-functions container is unavailable; using default implementations: {rpc_error}") + else: + logger.error(f"gRPC error: {rpc_error.code().name} - {rpc_error.details()}") + return {} except Exception as e: - logger.warning(f"Was not able to communicate to hooks container due to: {e}") + logger.error(f"Unexpected error communicating with hooks container: {e}") return {} def client_settings(self, global_model) -> dict: diff --git a/fedn/network/controller/controlbase.py b/fedn/network/controller/controlbase.py index 297efd426..397a117bb 100644 --- a/fedn/network/controller/controlbase.py +++ b/fedn/network/controller/controlbase.py @@ -261,8 +261,8 @@ def commit(self, model_id, model=None, session_id=None): """ helper = self.get_helper() if model is not None: - logger.info("Saving model file temporarily to disk...") outfile_name = helper.save(model) + logger.info("Saving model file temporarily to {}".format(outfile_name)) logger.info("CONTROL: Uploading model to Minio...") model_id = self.model_repository.set_model(outfile_name, is_file=True) diff --git a/fedn/utils/dist.py b/fedn/utils/dist.py index e5fa7192b..82812dfc3 100644 --- a/fedn/utils/dist.py +++ b/fedn/utils/dist.py @@ -3,7 +3,7 @@ import fedn -def get_version(pacakge): +def get_version(package): # Dynamically get the version of the package try: version = importlib.metadata.version("fedn") diff --git a/fedn/utils/process.py b/fedn/utils/process.py index c2574a760..b08af99b6 100644 --- a/fedn/utils/process.py +++ b/fedn/utils/process.py @@ -101,9 +101,6 @@ def _exec_cmd( env = env if extra_env is None else {**os.environ, **extra_env} - # In Python < 3.8, `subprocess.Popen` doesn't accept a command containing path-like - # objects (e.g. `["ls", pathlib.Path("abc")]`) on Windows. To avoid this issue, - # stringify all elements in `cmd`. Note `str(pathlib.Path("abc"))` returns 'abc'. if isinstance(cmd, list): cmd = list(map(str, cmd)) diff --git a/pyproject.toml b/pyproject.toml index beb511290..94ad2ffdb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" [project] name = "fedn" -version = "0.19.0" +version = "0.20.0" description = "Scaleout Federated Learning" authors = [{ name = "Scaleout Systems AB", email = "contact@scaleoutsystems.com" }] readme = "README.rst" @@ -20,30 +20,29 @@ keywords = [ ] classifiers = [ "Natural Language :: English", - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", ] -requires-python = '>=3.8,<3.13' +requires-python = '>=3.9,<3.13' dependencies = [ "requests", "urllib3>=1.26.4", "gunicorn>=20.0.4", "minio", - "grpcio>=1.60,<1.67", - "grpcio-tools>=1.60,<1.67", + "grpcio>=1.60,<1.69", + "grpcio-tools>=1.60,<1.69", "numpy>=1.21.6", "protobuf>=5.0.0,<5.29.0", "pymongo", - "Flask==3.0.3", + "Flask==3.1.0", "pyjwt", "pyopenssl", "psutil", "click==8.1.7", - "grpcio-health-checking>=1.60,<1.67", + "grpcio-health-checking>=1.60,<1.69", "pyyaml", "plotly", "virtualenv",