Skip to content

Commit

Permalink
restructure commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Wrede committed Nov 29, 2024
1 parent c9bacbe commit 9d8b91d
Show file tree
Hide file tree
Showing 35 changed files with 598 additions and 165 deletions.
210 changes: 210 additions & 0 deletions .ci/tests/chaos_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
from toxiproxy import Toxiproxy
import unittest
import grpc
import time
from fedn.network.clients.grpc_handler import GrpcHandler
import fedn.network.grpc.fedn_pb2 as fedn


class TestGRPCWithToxiproxy(unittest.TestCase):
@classmethod
def setUpClass(cls):
client_name = 'test-client'
client_id = 'test-client-id'
host = 'localhost'
port_proxy = 12081
port_server = 12080
token = ""
combiner_name = 'combiner'

cls.toxiproxy = Toxiproxy()
if cls.toxiproxy.proxies():
cls.toxiproxy.destroy_all()

@classmethod
def tearDownClass(cls):
# Close the proxy and gRPC channel when done
cls.toxiproxy.destroy_all()

@unittest.skip("Not implemented")
def test_normal_heartbeat(self):
# Test the heartbeat without any toxic
client_name = 'test-client'
client_id = 'test-client-id'
# Random proxy port
grpc_handler = GrpcHandler(host='localhost', port=12080, name=client_name, token='', combiner_name='combiner')
try:
response = grpc_handler.heartbeat(client_name, client_id)
self.assertIsInstance(response, fedn.Response)
except grpc.RpcError as e:
self.fail(f'gRPC error: {e.code()} {e.details()}')
finally:
grpc_handler.channel.close()

@unittest.skip("Not implemented")
def test_latency_2s_toxic_heartbeat(self):
# Add latency of 1000ms
client_name = 'test-client'
client_id = 'test-client-id'

proxy = self.toxiproxy.create(name='test_latency_toxic_heartbeat', listen='localhost:12082', upstream='localhost:12080')
grpc_handler = GrpcHandler(host='localhost', port=12082, name=client_name, token='', combiner_name='combiner')
proxy.add_toxic(name='latency', type='latency', attributes={'latency': 2000})

start_time = time.time()
try:
response = grpc_handler.heartbeat(client_name, client_id)
finally:
grpc_handler.channel.close()
proxy.destroy()
end_time = time.time()

# Check that the latency delay is present
self.assertGreaterEqual(end_time - start_time, 2) # Expect at least 1 second delay
self.assertIsInstance(response, fedn.Response)

def test_latency_long_toxic_heartbeat(self):
"""Test gRPC request with a simulated latency of 25s. Should timeout based on KEEPALIVE_TIMEOUT_MS (default set to 20000)."""
client_name = 'test-client'
client_id = 'test-client-id'
latency = 20 # 15s latency

proxy = self.toxiproxy.create(name='test_latency_toxic_heartbeat', listen='localhost:12083', upstream='localhost:12080')
grpc_handler = GrpcHandler(host='localhost', port=12083, name=client_name, token='', combiner_name='combiner')
proxy.add_toxic(name='latency', type='latency', attributes={'latency': latency * 1000})

start_time = time.time()
try:
response = grpc_handler.heartbeat(client_name, client_id)
except grpc.RpcError as e:
response = e
finally:
grpc_handler.channel.close()
proxy.destroy()
end_time = time.time()

response

# Check that the latency delay is present
self.assertGreaterEqual(end_time - start_time, latency) # Expect at least 1 second delay
self.assertIsInstance(response, grpc.RpcError)
self.assertEqual(response.code(), grpc.StatusCode.UNAVAILABLE)
self.assertEqual(response.details(), 'failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:12083: connection attempt timed out before receiving SETTINGS frame')

def test_close_channel(self):
"""
Test closing the gRPC channel and trying to send a heartbeat.
Expect a ValueError to be raised.
"""

client_name = 'test-client'
client_id = 'test-client-id'

grpc_handler = GrpcHandler(host='localhost', port=12080, name=client_name, token='', combiner_name='combiner')

# Close the channel
grpc_handler._disconnect()

# Try to send heartbeat
with self.assertRaises(ValueError) as context:
response = grpc_handler.heartbeat(client_name, client_id)
self.assertEqual(str(context.exception), 'Cannot invoke RPC on closed channel!')


@unittest.skip("Not implemented")
def test_disconnect_toxic_heartbeat(self):
"""Test gRPC request with a simulated disconnection."""
# Add a timeout toxic to simulate network disconnection
client_name = 'test-client'
client_id = 'test-client-id'

proxy = self.toxiproxy.create(name='test_disconnect_toxic_heartbeat', listen='localhost:12084', upstream='localhost:12080')
grpc_handler = GrpcHandler(host='localhost', port=12084, name=client_name, token='', combiner_name='combiner')
proxy.add_toxic(name='timeout', type='timeout', attributes={'timeout': 1000})

try:
response = grpc_handler.heartbeat(client_name, client_id)
except grpc.RpcError as e:
response = e
finally:
grpc_handler.channel.close()
proxy.destroy()

# Assert that the response is a gRPC error with status code UNAVAILABLE
self.assertEqual(response.code(), grpc.StatusCode.UNAVAILABLE)
self.assertEqual(response.details(), 'failed to connect to all addresses; last error: UNAVAILABLE: ipv4:127.0.0.1:12084: Socket closed')

@unittest.skip("Not implemented")
def test_timeout_toxic_heartbeat(self):
"""Stops all data from getting through, and closes the connection after timeout. timeout is 0,
the connection won't close, and data will be delayed until the toxic is removed.
"""
# Add a timeout toxic to simulate network disconnection
client_name = 'test-client'
client_id = 'test-client-id'

proxy = self.toxiproxy.create(name='test_timeout_toxic_heartbeat', listen='localhost:12085', upstream='localhost:12080')
grpc_handler = GrpcHandler(host='localhost', port=12085, name=client_name, token='', combiner_name='combiner')
proxy.add_toxic(name='timeout', type='timeout', attributes={'timeout': 0})

try:
response = grpc_handler.heartbeat(client_name, client_id)
except grpc.RpcError as e:
response = e
finally:
grpc_handler.channel.close()
proxy.destroy()

# Assert that the response is a gRPC error with status code UNAVAILABLE
self.assertEqual(response.code(), grpc.StatusCode.UNAVAILABLE)
self.assertEqual(response.details(), 'failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:12085: connection attempt timed out before receiving SETTINGS frame')

@unittest.skip("Not implemented")
def test_rate_limit_toxic_heartbeat(self):
# Purpose: Limits the number of connections that can be established within a certain time frame.
# Toxic: rate_limit
# Use Case: Useful for testing how the client behaves under strict rate limits. For example, in Federated Learning,
# this could simulate constraints in networks with multiple clients trying to access the server.

# Add a rate limit toxic to the proxy
self.proxy.add_rate_limit(rate=1000)

@unittest.skip("Not implemented")
def test_bandwidth_toxic_heartbeat(self):
# Purpose: Limits the bandwidth of the connection.
# Toxic: bandwidth
# Use Case: Useful for testing how the client behaves under limited bandwidth. For example, in Federated Learning,
# this could simulate a slow network connection between the client and the server.

# Add a bandwidth toxic to the proxy
self.proxy.add_bandwidth(rate=1000) # 1 KB/s

@unittest.skip("Not implemented")
def test_connection_reset(self):
# Purpose: Immediately resets the connection, simulating an abrupt network drop.
# Toxic: add_reset
# Use Case: This is helpful for testing error-handling logic on sudden network failures,
# ensuring the client retries appropriately or fails gracefully

# Add a connection_reset toxic to the proxy
self.proxy.add_reset()

@unittest.skip("Not implemented")
def test_slow_close(self):
# Purpose: Simulates a slow closing of the connection.
# Toxic: slow_close
# Use Case: Useful for testing how the client behaves when the server closes the connection slowly.
# This can help ensure that the client handles slow network disconnections gracefully.

# Add a slow_close toxic to the proxy
self.proxy.add_slow_close(delay=1000) # Delay closing the connection by 1 second

@unittest.skip("Not implemented")
def test_slicer(self):
# Purpose: Slices the data into smaller chunks.
# Toxic: slicer
# Use Case: Useful for testing how the client handles fragmented data.
# This can help ensure that the client can reassemble the data correctly and handle partial data gracefully.

# Add a slicer toxic to the proxy
self.proxy.add_slicer(average_size=1000, size_variation=100) # Slice data into chunks of 1 KB with 100 bytes variation
24 changes: 23 additions & 1 deletion .github/workflows/build-containers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ jobs:
permissions:
packages: write
contents: read
security-events: write # for github/codeql-action/upload-sarif to upload SARIF results

steps:
- uses: actions/checkout@v4
Expand Down Expand Up @@ -48,7 +49,28 @@ jobs:
- name: Build and push
uses: docker/build-push-action@v4
with:
push: "${{ github.event_name != 'pull_request' }}"
push: true #"${{ github.event_name != 'pull_request' }}"
tags: ${{ steps.meta1.outputs.tags }}
labels: ${{ steps.meta1.outputs.labels }}
file: Dockerfile

# if push to master of release, run trivy scan on the image
- name: Trivy scan
if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/master' }}
uses: aquasecurity/[email protected]
env:
TRIVY_DB_REPOSITORY: public.ecr.aws/aquasecurity/trivy-db,aquasec/trivy-db,ghcr.io/aquasecurity/trivy-db
TRIVY_JAVA_DB_REPOSITORY: public.ecr.aws/aquasecurity/trivy-java-db,aquasec/trivy-java-db,ghcr.io/aquasecurity/trivy-java-db
with:
image-ref: ghcr.io/${{ github.repository }}/fedn:master
format: 'sarif'
output: 'trivy-results.sarif'
severity: HIGH,CRITICAL
vuln-type: 'os,library'
github-pat: ${{ secrets.GITHUB_TOKEN }}

- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@v3
if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/master' }}
with:
sarif_file: 'trivy-results.sarif'
2 changes: 1 addition & 1 deletion .github/workflows/code-checks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on: push

jobs:
code-checks:
runs-on: ubuntu-20.04
runs-on: ubuntu-latest
steps:
- name: checkout
uses: actions/checkout@v4
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/integration-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ jobs:
to_test:
- "mnist-keras numpyhelper"
- "mnist-pytorch numpyhelper"
python_version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python_version: ["3.9", "3.10", "3.11", "3.12"]
os:
- ubuntu-22.04
- ubuntu-24.04
runs-on: ${{ matrix.os }}
steps:
- name: checkout
Expand Down
89 changes: 48 additions & 41 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,54 +1,61 @@
# Base image
ARG BASE_IMG=python:3.10-slim
FROM $BASE_IMG
# Stage 1: Builder
ARG BASE_IMG=python:3.12-slim
FROM $BASE_IMG as builder

ARG GRPC_HEALTH_PROBE_VERSION=""

# Requirements (use MNIST Keras as default)
ARG REQUIREMENTS=""

WORKDIR /build

# Install build dependencies
RUN apt-get update && apt-get upgrade -y && apt-get install -y --no-install-recommends python3-dev gcc wget \
&& rm -rf /var/lib/apt/lists/*

# Add FEDn and default configs
COPY . /app
COPY config/settings-client.yaml.template /app/config/settings-client.yaml
COPY config/settings-combiner.yaml.template /app/config/settings-combiner.yaml
COPY config/settings-hooks.yaml.template /app/config/settings-hooks.yaml
COPY config/settings-reducer.yaml.template /app/config/settings-reducer.yaml
COPY $REQUIREMENTS /app/config/requirements.txt
COPY . /build
COPY $REQUIREMENTS /build/requirements.txt

# Install developer tools (needed for psutil)
RUN apt-get update && apt-get install -y python3-dev gcc
# Install dependencies
RUN python -m venv /venv \
&& /venv/bin/pip install --upgrade pip \
&& /venv/bin/pip install --no-cache-dir 'setuptools>=65' \
&& /venv/bin/pip install --no-cache-dir . \
&& if [[ ! -z "$REQUIREMENTS" ]]; then \
/venv/bin/pip install --no-cache-dir -r /build/requirements.txt; \
fi \
&& rm -rf /build/requirements.txt

# Install grpc health probe checker

# Install grpc health probe
RUN if [ ! -z "$GRPC_HEALTH_PROBE_VERSION" ]; then \
apt-get install -y wget && \
wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
chmod +x /bin/grpc_health_probe && \
apt-get remove -y wget && apt autoremove -y; \
else \
echo "No grpc_health_probe version specified, skipping installation"; \
wget -qO /build/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
chmod +x /build/grpc_health_probe; \
fi

# Setup working directory
# Stage 2: Runtime
FROM $BASE_IMG

WORKDIR /app

# Create FEDn app directory
SHELL ["/bin/bash", "-c"]
RUN mkdir -p /app \
&& mkdir -p /app/client \
&& mkdir -p /app/certs \
&& mkdir -p /app/client/package \
&& mkdir -p /app/certs \
#
# Install FEDn and requirements
&& python -m venv /venv \
&& /venv/bin/pip install --upgrade pip \
&& /venv/bin/pip install --no-cache-dir 'setuptools>=65' \
&& /venv/bin/pip install --no-cache-dir -e . \
&& if [[ ! -z "$REQUIREMENTS" ]]; then \
/venv/bin/pip install --no-cache-dir -r /app/config/requirements.txt; \
fi \
#
# Clean up
&& rm -r /app/config/requirements.txt
# Copy application and venv from the builder stage
COPY --from=builder /venv /venv
COPY --from=builder /build /app

# Use a non-root user
RUN set -ex \
# Create a non-root user
&& addgroup --system --gid 1001 appgroup \
&& adduser --system --uid 1001 --gid 1001 --no-create-home appuser \
# Creare application specific tmp directory, set ENV TMPDIR to /app/tmp
&& mkdir -p /app/tmp \
&& chown -R appuser:appgroup /venv /app \
# Upgrade the package index and install security upgrades
&& apt-get update \
&& apt-get upgrade -y \
&& apt-get autoremove -y \
&& apt-get clean -y \
&& rm -rf /var/lib/apt/lists/*
USER appuser

ENTRYPOINT [ "/venv/bin/fedn" ]

ENTRYPOINT [ "/venv/bin/fedn" ]
Loading

0 comments on commit 9d8b91d

Please sign in to comment.