Skip to content

Commit

Permalink
Merge branch 'master' into feature/cross-device-simulation
Browse files Browse the repository at this point in the history
  • Loading branch information
Andreas Hellander committed Nov 21, 2023
2 parents 6c2a1e1 + 1e51843 commit 423de0d
Show file tree
Hide file tree
Showing 11 changed files with 229 additions and 25 deletions.
1 change: 1 addition & 0 deletions .readthedocs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ python:
install:
- method: pip
path: ./fedn
- requirements: docs/requirements.txt
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Base image
ARG BASE_IMG=python:3.9-slim
ARG BASE_IMG=python:3.10-slim
FROM $BASE_IMG

# Requirements (use MNIST Keras as default)
Expand Down
8 changes: 4 additions & 4 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ services:
build:
context: .
args:
BASE_IMG: ${BASE_IMG:-python:3.9-slim}
BASE_IMG: ${BASE_IMG:-python:3.10-slim}
working_dir: /app
volumes:
- ${HOST_REPO_DIR:-.}/fedn:/app/fedn
Expand All @@ -89,7 +89,7 @@ services:
build:
context: .
args:
BASE_IMG: ${BASE_IMG:-python:3.9-slim}
BASE_IMG: ${BASE_IMG:-python:3.10-slim}
working_dir: /app
volumes:
- ${HOST_REPO_DIR:-.}/fedn:/app/fedn
Expand All @@ -110,7 +110,7 @@ services:
build:
context: .
args:
BASE_IMG: ${BASE_IMG:-python:3.9-slim}
BASE_IMG: ${BASE_IMG:-python:3.10-slim}
working_dir: /app
volumes:
- ${HOST_REPO_DIR:-.}/fedn:/app/fedn
Expand All @@ -127,7 +127,7 @@ services:
build:
context: .
args:
BASE_IMG: ${BASE_IMG:-python:3.9-slim}
BASE_IMG: ${BASE_IMG:-python:3.10-slim}
working_dir: /app
volumes:
- ${HOST_REPO_DIR:-.}/fedn:/app/fedn
Expand Down
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
author = 'Scaleout Systems AB'

# The full version, including alpha/beta/rc tags
release = '0.4.1'
release = '0.6.0'

# Add any Sphinx extension module names here, as strings
extensions = [
Expand Down
1 change: 1 addition & 0 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sphinx-rtd-theme
5 changes: 0 additions & 5 deletions fedn/fedn/network/api/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import uuid

import requests

__all__ = ['APIClient']
Expand Down Expand Up @@ -137,9 +135,6 @@ def start_session(self, session_id=None, round_timeout=180, rounds=5, round_buff
:return: A dict with success or failure message and session config.
:rtype: dict
"""
# If session id is None, generate a random session id.
if session_id is None:
session_id = str(uuid.uuid4())
response = requests.post(self._get_url('start_session'), json={
'session_id': session_id,
'round_timeout': round_timeout,
Expand Down
5 changes: 3 additions & 2 deletions fedn/fedn/network/api/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import copy
import os
import threading
import uuid
from io import BytesIO

from flask import jsonify, send_from_directory
Expand Down Expand Up @@ -81,7 +82,7 @@ def get_clients(self, limit=None, skip=None, status=False):
"combiner_preferred": element["combiner_preferred"],
"ip": element["ip"],
"status": element["status"],
"last_seen": element["last_seen"],
"last_seen": element["last_seen"] if "last_seen" in element else "",
}

arr.append(obj)
Expand Down Expand Up @@ -863,7 +864,7 @@ def start_session(

# Setup session config
session_config = {
"session_id": session_id,
"session_id": session_id if session_id else str(uuid.uuid4()),
"round_timeout": round_timeout,
"buffer_size": round_buffer_size,
"model_id": model_id,
Expand Down
64 changes: 53 additions & 11 deletions fedn/fedn/network/clients/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os
import queue
import re
import ssl
import socket
import sys
import tempfile
import threading
Expand All @@ -15,7 +15,9 @@
from io import BytesIO

import grpc
from cryptography.hazmat.primitives.serialization import Encoding
from google.protobuf.json_format import MessageToJson
from OpenSSL import SSL

import fedn.common.net.grpc.fedn_pb2 as fedn
import fedn.common.net.grpc.fedn_pb2_grpc as rpc
Expand Down Expand Up @@ -127,6 +129,42 @@ def _assign(self):
print("Received combiner config: {}".format(client_config), flush=True)
return client_config

def _add_grpc_metadata(self, key, value):
"""Add metadata for gRPC calls.
:param key: The key of the metadata.
:type key: str
:param value: The value of the metadata.
:type value: str
"""
# Check if metadata exists and add if not
if not hasattr(self, 'metadata'):
self.metadata = ()

# Check if metadata key already exists and replace value if so
for i, (k, v) in enumerate(self.metadata):
if k == key:
# Replace value
self.metadata = self.metadata[:i] + ((key, value),) + self.metadata[i + 1:]
return

# Set metadata using tuple concatenation
self.metadata += ((key, value),)

def _get_ssl_certificate(self, domain, port=443):
context = SSL.Context(SSL.SSLv23_METHOD)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((domain, port))
ssl_sock = SSL.Connection(context, sock)
ssl_sock.set_tlsext_host_name(domain.encode())
ssl_sock.set_connect_state()
ssl_sock.do_handshake()
cert = ssl_sock.get_peer_certificate()
ssl_sock.close()
sock.close()
cert = cert.to_cryptography().public_bytes(Encoding.PEM).decode()
return cert

def _connect(self, client_config):
"""Connect to assigned combiner.
Expand All @@ -137,6 +175,9 @@ def _connect(self, client_config):

# TODO use the client_config['certificate'] for setting up secure comms'
host = client_config['host']
# Add host to gRPC metadata
self._add_grpc_metadata('grpc-server', host)
print("CLIENT: Using metadata: {}".format(self.metadata), flush=True)
port = client_config['port']
secure = False
if client_config['fqdn'] is not None:
Expand All @@ -161,7 +202,7 @@ def _connect(self, client_config):
elif self.config['secure']:
secure = True
print("CLIENT: using CA certificate for GRPC channel")
cert = ssl.get_server_certificate((host, port))
cert = self._get_ssl_certificate(host, port=port)

credentials = grpc.ssl_channel_credentials(cert.encode('utf-8'))
if self.config['token']:
Expand Down Expand Up @@ -331,7 +372,7 @@ def get_model(self, id):
"""
data = BytesIO()

for part in self.modelStub.Download(fedn.ModelRequest(id=id)):
for part in self.modelStub.Download(fedn.ModelRequest(id=id), metadata=self.metadata):

if part.status == fedn.ModelStatus.IN_PROGRESS:
data.write(part.data)
Expand Down Expand Up @@ -386,7 +427,7 @@ def upload_request_generator(mdl):
if not b:
break

result = self.modelStub.Upload(upload_request_generator(bt))
result = self.modelStub.Upload(upload_request_generator(bt), metadata=self.metadata)

return result

Expand All @@ -400,11 +441,12 @@ def _listen_to_model_update_request_stream(self):
r = fedn.ClientAvailableMessage()
r.sender.name = self.name
r.sender.role = fedn.WORKER
metadata = [('client', r.sender.name)]
# Add client to metadata
self._add_grpc_metadata('client', self.name)

while True:
try:
for request in self.combinerStub.ModelUpdateRequestStream(r, metadata=metadata):
for request in self.combinerStub.ModelUpdateRequestStream(r, metadata=self.metadata):
if request.sender.role == fedn.COMBINER:
# Process training request
self._send_status("Received model update request.", log_level=fedn.Status.AUDIT,
Expand Down Expand Up @@ -438,7 +480,7 @@ def _listen_to_model_validation_request_stream(self):
r.sender.role = fedn.WORKER
while True:
try:
for request in self.combinerStub.ModelValidationRequestStream(r):
for request in self.combinerStub.ModelValidationRequestStream(r, metadata=self.metadata):
# Process validation request
_ = request.model_id
self._send_status("Recieved model validation request.", log_level=fedn.Status.AUDIT,
Expand Down Expand Up @@ -589,7 +631,7 @@ def process_request(self):
update.correlation_id = request.correlation_id
update.meta = json.dumps(meta)
# TODO: Check responses
_ = self.combinerStub.SendModelUpdate(update)
_ = self.combinerStub.SendModelUpdate(update, metadata=self.metadata)
self._send_status("Model update completed.", log_level=fedn.Status.AUDIT,
type=fedn.StatusType.MODEL_UPDATE, request=update)

Expand Down Expand Up @@ -618,7 +660,7 @@ def process_request(self):
validation.timestamp = self.str
validation.correlation_id = request.correlation_id
_ = self.combinerStub.SendModelValidation(
validation)
validation, metadata=self.metadata)

# Set status type
if request.is_inference:
Expand Down Expand Up @@ -655,7 +697,7 @@ def _send_heartbeat(self, update_frequency=2.0):
heartbeat = fedn.Heartbeat(sender=fedn.Client(
name=self.name, role=fedn.WORKER))
try:
self.connectorStub.SendHeartbeat(heartbeat)
self.connectorStub.SendHeartbeat(heartbeat, metadata=self.metadata)
self._missed_heartbeat = 0
except grpc.RpcError as e:
status_code = e.code()
Expand Down Expand Up @@ -694,7 +736,7 @@ def _send_status(self, msg, log_level=fedn.Status.INFO, type=None, request=None)
self.logs.append(
"{} {} LOG LEVEL {} MESSAGE {}".format(str(datetime.now()), status.sender.name, status.log_level,
status.status))
_ = self.connectorStub.SendStatus(status)
_ = self.connectorStub.SendStatus(status, metadata=self.metadata)

def run(self):
""" Run the client. """
Expand Down
45 changes: 45 additions & 0 deletions fedn/fedn/network/clients/test_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import unittest

from fedn.network.clients.client import Client


class TestClient(unittest.TestCase):
"""Test the Client class."""

def setUp(self):
self.client = Client()

def test_add_grpc_metadata(self):
"""Test the _add_grpc_metadata method."""

# Test adding metadata when it doesn't exist
self.client._add_grpc_metadata('key1', 'value1')
self.assertEqual(self.client.metadata, (('key1', 'value1'),))

# Test adding metadata when it already exists
self.client._add_grpc_metadata('key1', 'value2')
self.assertEqual(self.client.metadata, (('key1', 'value2'),))

# Test adding multiple metadata
self.client._add_grpc_metadata('key2', 'value3')
self.assertEqual(self.client.metadata, (('key1', 'value2'), ('key2', 'value3')))

# Test adding metadata with special characters
self.client._add_grpc_metadata('key3', 'value4!@#$%^&*()')
self.assertEqual(self.client.metadata, (('key1', 'value2'), ('key2', 'value3'), ('key3', 'value4!@#$%^&*()')))

# Test adding metadata with empty key
with self.assertRaises(ValueError):
self.client._add_grpc_metadata('', 'value5')

# Test adding metadata with empty value
with self.assertRaises(ValueError):
self.client._add_grpc_metadata('key4', '')

# Test adding metadata with None value
with self.assertRaises(ValueError):
self.client._add_grpc_metadata('key5', None)


if __name__ == '__main__':
unittest.main()
Loading

0 comments on commit 423de0d

Please sign in to comment.