diff --git a/.ci/tests/examples/wait_for.py b/.ci/tests/examples/wait_for.py index 991ae2e32..fcecea853 100644 --- a/.ci/tests/examples/wait_for.py +++ b/.ci/tests/examples/wait_for.py @@ -1,5 +1,4 @@ import json -import ssl import sys from time import sleep @@ -16,15 +15,14 @@ def _eprint(*args, **kwargs): def _retry(try_func, **func_args): - n = 0 for _ in range(RETRIES): is_success = try_func(**func_args) if is_success: - _eprint(f'Sucess.') + _eprint('Sucess.') return True _eprint(f'Sleeping for {SLEEP}.') sleep(SLEEP) - _eprint(f'Giving up.') + _eprint('Giving up.') return False diff --git a/.devcontainer/bin/init_venv.sh b/.devcontainer/bin/init_venv.sh index 24e4d84c2..90670539b 100755 --- a/.devcontainer/bin/init_venv.sh +++ b/.devcontainer/bin/init_venv.sh @@ -11,5 +11,6 @@ python -m venv .venv sphinx_press_theme==0.8.0 \ sphinx-autobuild==2021.3.14 \ autopep8==1.5.7 \ - isort==5.10.1 + isort==5.10.1 \ + flake8==4.0.1 .venv/bin/pip install -e fedn \ No newline at end of file diff --git a/.github/workflows/code-checks.yaml b/.github/workflows/code-checks.yaml index 3f32c87d0..311af03fa 100644 --- a/.github/workflows/code-checks.yaml +++ b/.github/workflows/code-checks.yaml @@ -27,4 +27,18 @@ jobs: --exclude .mnist-pytorch . + - name: run Python linter + run: > + .venv/bin/flake8 . + --exclude ".venv,.mnist-keras,.mnist-pytorch,fedn_pb2.py" + + - name: check for floating imports + run: > + ! grep -E -R + --exclude-dir='.venv' + --exclude-dir='.mnist-pytorch' + --exclude-dir='.mnist-keras' + --exclude-dir='docs' + '^[ \t]+(import|from) ' -I . + # TODO: add linting/formatting for all file types \ No newline at end of file diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml deleted file mode 100644 index 08f1a6f8b..000000000 --- a/.github/workflows/codeql-analysis.yml +++ /dev/null @@ -1,71 +0,0 @@ -# For most projects, this workflow file will not need changing; you simply need -# to commit it to your repository. -# -# You may wish to alter this file to override the set of languages analyzed, -# or to provide custom queries or build logic. -# -# ******** NOTE ******** -# We have attempted to detect the languages in your repository. Please check -# the `language` matrix defined below to confirm you have the correct set of -# supported CodeQL languages. -# -name: "CodeQL" - -on: - push: - branches: [ master, develop, release/* ] - pull_request: - # The branches below must be a subset of the branches above - branches: [ master ] - schedule: - - cron: '23 1 * * 6' - -jobs: - analyze: - name: Analyze - runs-on: ubuntu-latest - permissions: - actions: read - contents: read - security-events: write - - strategy: - fail-fast: false - matrix: - language: [ 'python' ] - # CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python' ] - # Learn more: - # https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed - - steps: - - name: Checkout repository - uses: actions/checkout@v2 - - # Initializes the CodeQL tools for scanning. - - name: Initialize CodeQL - uses: github/codeql-action/init@v1 - with: - languages: ${{ matrix.language }} - # If you wish to specify custom queries, you can do so here or in a config file. - # By default, queries listed here will override any specified in a config file. - # Prefix the list here with "+" to use these queries and those in the config file. - # queries: ./path/to/local/query, your-org/your-repo/queries@main - - # Autobuild attempts to build any compiled languages (C/C++, C#, or Java). - # If this step fails, then you should remove it and run the build manually (see below) - - name: Autobuild - uses: github/codeql-action/autobuild@v1 - - # ℹī¸ Command-line programs to run using the OS shell. - # 📚 https://git.io/JvXDl - - # ✏ī¸ If the Autobuild fails above, remove it and uncomment the following three lines - # and modify them (or add more) to build your code if your project - # uses a compiled language - - #- run: | - # make bootstrap - # make release - - - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@v1 diff --git a/.vscode/settings.json b/.vscode/settings.json index 35fa1902d..07cfc57ae 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -3,4 +3,6 @@ "editor.codeActionsOnSave": { "source.organizeImports": true }, + "python.linting.enabled": true, + "python.linting.flake8Enabled": true, } \ No newline at end of file diff --git a/docs/source/conf.py b/docs/source/conf.py index cc3543318..1ca0ff82c 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -59,8 +59,8 @@ # The theme to use for HTML and HTML Help pages. See the documentation for # a list of builtin themes. # -#html_theme = 'furo' -#html_theme = 'sphinx_book_theme' +# html_theme = 'furo' +# html_theme = 'sphinx_book_theme' html_theme = 'press' diff --git a/fedn/cli/__init__.py b/fedn/cli/__init__.py index dbea41de3..13c9b1c51 100644 --- a/fedn/cli/__init__.py +++ b/fedn/cli/__init__.py @@ -1,3 +1,3 @@ -from .control_cmd import control_cmd -from .main import main -from .run_cmd import run_cmd +from .control_cmd import control_cmd # noqa: F401 +from .main import main # noqa: F401 +from .run_cmd import run_cmd # noqa: F401 diff --git a/fedn/cli/control_cmd.py b/fedn/cli/control_cmd.py index ecfa631f5..9305c7015 100644 --- a/fedn/cli/control_cmd.py +++ b/fedn/cli/control_cmd.py @@ -1,5 +1,10 @@ +import os +from datetime import datetime + import click +from fedn.common.control.package import Package, PackageRuntime + from .main import main @@ -37,20 +42,17 @@ def package_cmd(ctx, reducer, port, token, name, upload, validate, cwd): :param cwd: """ if not cwd: - import os cwd = os.getcwd() print("CONTROL: Bundling {} dir for distribution. Please wait for operation to complete..".format(cwd)) if not name: - from datetime import datetime name = str(os.path.basename(cwd)) + '-' + \ datetime.today().strftime('%Y-%m-%d-%H%M%S') config = {'host': reducer, 'port': port, 'token': token, 'name': name, 'cwd': cwd} - from fedn.common.control.package import Package package = Package(config) print("CONTROL: Bundling package..") @@ -85,14 +87,12 @@ def unpack_cmd(ctx, reducer, port, token, name, download, validate, cwd): :param validate: :param cwd: """ - import os if not cwd: cwd = os.getcwd() - config = {'host': reducer, 'port': port, 'token': token, 'name': name, - 'cwd': cwd} + # config = {'host': reducer, 'port': port, 'token': token, 'name': name, + # 'cwd': cwd} - from fedn.common.control.package import PackageRuntime package = PackageRuntime(cwd, os.path.join(cwd, 'client')) package.download(reducer, port, token) package.unpack() diff --git a/fedn/cli/run_cmd.py b/fedn/cli/run_cmd.py index 0e6766325..b1299f609 100644 --- a/fedn/cli/run_cmd.py +++ b/fedn/cli/run_cmd.py @@ -4,8 +4,13 @@ import click import yaml +from fedn.client import Client from fedn.clients.reducer.restservice import (decode_auth_token, encode_auth_token) +from fedn.clients.reducer.statestore.mongoreducerstatestore import \ + MongoReducerStateStore +from fedn.combiner import Combiner +from fedn.reducer import Reducer from .main import main @@ -100,7 +105,7 @@ def client_cmd(ctx, discoverhost, discoverport, token, name, client_id, local_pa with open(config['init'], 'r') as file: try: settings = dict(yaml.safe_load(file)) - except Exception as e: + except Exception: print('Failed to read config from settings file, exiting.', flush=True) return # raise(e) @@ -119,11 +124,10 @@ def client_cmd(ctx, discoverhost, discoverport, token, name, client_id, local_pa print( "Missing required configuration: discover_host, discover_port", flush=True) return - except Exception as e: + except Exception: print("Could not load config appropriately. Check config", flush=True) return - from fedn.client import Client client = Client(config) client.run() @@ -160,8 +164,8 @@ def reducer_cmd(ctx, discoverhost, discoverport, secret_key, local_package, name print(e, flush=True) exit(-1) - if not remote: - helper = check_helper_config_file(fedn_config) + # if not remote: + # helper = check_helper_config_file(fedn_config) try: network_id = fedn_config['network_id'] @@ -171,8 +175,7 @@ def reducer_cmd(ctx, discoverhost, discoverport, secret_key, local_package, name statestore_config = fedn_config['statestore'] if statestore_config['type'] == 'MongoDB': - from fedn.clients.reducer.statestore.mongoreducerstatestore import \ - MongoReducerStateStore + statestore = MongoReducerStateStore( network_id, statestore_config['mongo_config'], defaults=config['init']) else: @@ -190,7 +193,7 @@ def reducer_cmd(ctx, discoverhost, discoverport, secret_key, local_package, name if status != 'Success': token = encode_auth_token(config['secret_key']) config['token'] = token - except: + except Exception: raise else: @@ -199,7 +202,7 @@ def reducer_cmd(ctx, discoverhost, discoverport, secret_key, local_package, name try: statestore.set_reducer(config) - except: + except Exception: print("Failed to set reducer config in statestore, exiting.", flush=True) exit(-1) @@ -208,7 +211,7 @@ def reducer_cmd(ctx, discoverhost, discoverport, secret_key, local_package, name except KeyError: print("storage configuration missing in statestore_config.", flush=True) exit(-1) - except: + except Exception: print("Failed to set storage config in statestore, exiting.", flush=True) exit(-1) @@ -216,11 +219,10 @@ def reducer_cmd(ctx, discoverhost, discoverport, secret_key, local_package, name control_config = fedn_config['control'] try: statestore.set_round_config(control_config) - except: + except Exception: print("Failed to set control config, exiting.", flush=True) exit(-1) - from fedn.reducer import Reducer reducer = Reducer(statestore) reducer.run() @@ -274,6 +276,5 @@ def combiner_cmd(ctx, discoverhost, discoverport, token, name, hostname, port, s config['myport'] = combiner_config['port'] config['max_clients'] = combiner_config['max_clients'] - from fedn.combiner import Combiner combiner = Combiner(config) combiner.run() diff --git a/fedn/cli/tests/tests.py b/fedn/cli/tests/tests.py index 4c71c3816..dffe90a2c 100644 --- a/fedn/cli/tests/tests.py +++ b/fedn/cli/tests/tests.py @@ -1,8 +1,5 @@ import unittest -from unittest.mock import MagicMock, patch -from uuid import UUID -import yaml from click.testing import CliRunner from run_cmd import check_helper_config_file @@ -68,7 +65,7 @@ def test_check_helper_config_file(self): del COPY_INIT_FILE["control"]["helper"] with self.assertRaises(SystemExit): - helper = check_helper_config_file(COPY_INIT_FILE) + check_helper_config_file(COPY_INIT_FILE) if __name__ == '__main__': diff --git a/fedn/fedn/aggregators/aggregator.py b/fedn/fedn/aggregators/aggregator.py index 3e7b3bd20..d03a1811f 100644 --- a/fedn/fedn/aggregators/aggregator.py +++ b/fedn/fedn/aggregators/aggregator.py @@ -1,6 +1,3 @@ -import collections -import os -import tempfile from abc import ABC, abstractmethod diff --git a/fedn/fedn/aggregators/fedavg.py b/fedn/fedn/aggregators/fedavg.py index 034e22a70..317ddc5eb 100644 --- a/fedn/fedn/aggregators/fedavg.py +++ b/fedn/fedn/aggregators/fedavg.py @@ -1,25 +1,20 @@ import json -import os import queue -import sys -import tempfile import time -import uuid import fedn.common.net.grpc.fedn_pb2 as fedn from fedn.aggregators.aggregator import AggregatorBase -from fedn.utils.helpers import get_helper class FedAvgAggregator(AggregatorBase): - """ Local SGD / Federated Averaging (FedAvg) aggregator. + """ Local SGD / Federated Averaging (FedAvg) aggregator. - :param id: A reference to id of :class: `fedn.combiner.Combiner` + :param id: A reference to id of :class: `fedn.combiner.Combiner` :type id: str - :param storage: Model repository for :class: `fedn.combiner.Combiner` + :param storage: Model repository for :class: `fedn.combiner.Combiner` :type storage: class: `fedn.common.storage.s3.s3repo.S3ModelRepository` :param server: A handle to the Combiner class :class: `fedn.combiner.Combiner` - :type server: class: `fedn.combiner.Combiner` + :type server: class: `fedn.combiner.Combiner` :param modelservice: A handle to the model service :class: `fedn.clients.combiner.modelservice.ModelService` :type modelservice: class: `fedn.clients.combiner.modelservice.ModelService` :param control: A handle to the :class: `fedn.clients.combiner.roundcontrol.RoundControl` @@ -40,7 +35,7 @@ def __init__(self, id, storage, server, modelservice, control): def on_model_update(self, model_id): """Callback when a new model update is recieved from a client. Performs (optional) pre-processing and the puts the update id - on the aggregation queue. + on the aggregation queue. :param model_id: ID of model update :type model_id: str @@ -57,9 +52,9 @@ def on_model_update(self, model_id): pass def on_model_validation(self, validation): - """ Callback when a new model validation is recieved from a client. + """ Callback when a new model validation is recieved from a client. - :param validation: Dict containing validation data sent by client. + :param validation: Dict containing validation data sent by client. Must be valid JSON. :type validation: dict """ diff --git a/fedn/fedn/client.py b/fedn/fedn/client.py index 50d6a885f..4583a2633 100644 --- a/fedn/fedn/client.py +++ b/fedn/fedn/client.py @@ -1,3 +1,4 @@ +import base64 import io import json import os @@ -8,16 +9,19 @@ import time import uuid from datetime import datetime +from distutils.dir_util import copy_tree +from io import BytesIO import grpc -from aiohttp import client +from flask import Flask +from google.protobuf.json_format import MessageToJson import fedn.common.net.grpc.fedn_pb2 as fedn import fedn.common.net.grpc.fedn_pb2_grpc as rpc from fedn.clients.client.state import ClientState, ClientStateToString from fedn.common.control.package import PackageRuntime from fedn.common.net.connect import ConnectorClient, Status -# TODO Remove from this level. Abstract to unified non implementation specific client. +from fedn.common.net.web.client import page, style from fedn.utils.dispatcher import Dispatcher from fedn.utils.helpers import get_helper from fedn.utils.logger import Logger @@ -33,7 +37,7 @@ class Client: --------- config: dict A configuration dictionary containing connection information for - the discovery service (controller) and settings governing e.g. + the discovery service (controller) and settings governing e.g. client-combiner assignment behavior. """ @@ -44,7 +48,7 @@ def __init__(self, config): ---------- config: dict A configuration dictionary containing connection information for - the discovery service (controller) and settings governing e.g. + the discovery service (controller) and settings governing e.g. client-combiner assignment behavior. """ @@ -120,7 +124,7 @@ def _initialize_helper(self, client_config): self.helper = get_helper(client_config['model_type']) def _subscribe_to_combiner(self, config): - """Listen to combiner message stream and start all processing threads. + """Listen to combiner message stream and start all processing threads. """ @@ -129,10 +133,10 @@ def _subscribe_to_combiner(self, config): 'update_frequency': config['heartbeat_interval']}, daemon=True).start() # Start listening for combiner training and validation messages - if config['trainer'] == True: + if config['trainer']: threading.Thread( target=self._listen_to_model_update_request_stream, daemon=True).start() - if config['validator'] == True: + if config['validator']: threading.Thread( target=self._listen_to_model_validation_request_stream, daemon=True).start() self._attached = True @@ -159,7 +163,7 @@ def _initialize_dispatcher(self, config): tries -= 1 if retval: - if not 'checksum' in config: + if 'checksum' not in config: print( "\nWARNING: Skipping security validation of local package!, make sure you trust the package source.\n", flush=True) @@ -185,10 +189,8 @@ def _initialize_dispatcher(self, config): {'predict': {'command': 'python3 predict.py'}, 'train': {'command': 'python3 train.py'}, 'validate': {'command': 'python3 validate.py'}}} - dispatch_dir = os.getcwd() from_path = os.path.join(os.getcwd(), 'client') - from distutils.dir_util import copy_tree copy_tree(from_path, self.run_path) self.dispatcher = Dispatcher(dispatch_config, self.run_path) @@ -218,19 +220,18 @@ def _assign(self): return client_config def _connect(self, client_config): - """Connect to assigned combiner. + """Connect to assigned combiner. Parameters ---------- client_config : dict A dictionary with connection information and settings - for the assigned combiner. + for the assigned combiner. """ # TODO use the client_config['certificate'] for setting up secure comms' if client_config['certificate']: - import base64 cert = base64.b64decode( client_config['certificate']) # .decode('utf-8') credentials = grpc.ssl_channel_credentials(root_certificates=cert) @@ -257,18 +258,16 @@ def _disconnect(self): self.channel.close() def get_model(self, id): - """Fetch a model from the assigned combiner. + """Fetch a model from the assigned combiner. - Downloads the model update object via a gRPC streaming channel, Dowload. + Downloads the model update object via a gRPC streaming channel, Dowload. Parameters ---------- id : str - The id of the model update object. + The id of the model update object. """ - - from io import BytesIO data = BytesIO() for part in self.models.Download(fedn.ModelRequest(id=id)): @@ -285,20 +284,17 @@ def get_model(self, id): return data def set_model(self, model, id): - """Send a model update to the assigned combiner. + """Send a model update to the assigned combiner. - Uploads the model updated object via a gRPC streaming channel, Upload. + Uploads the model updated object via a gRPC streaming channel, Upload. Parameters ---------- model : BytesIO, object - The model update object. + The model update object. id : str The id of the model update object. """ - - from io import BytesIO - if not isinstance(model, BytesIO): bt = BytesIO() @@ -314,7 +310,6 @@ def upload_request_generator(mdl): :param mdl: """ - i = 1 while True: b = mdl.read(CHUNK_SIZE) if b: @@ -339,7 +334,6 @@ def _listen_to_model_update_request_stream(self): r.sender.name = self.name r.sender.role = fedn.WORKER metadata = [('client', r.sender.name)] - _disconnect = False while True: try: @@ -354,14 +348,13 @@ def _listen_to_model_update_request_stream(self): if not self._attached: return - except grpc.RpcError as e: - status_code = e.code() + except grpc.RpcError: # TODO: make configurable timeout = 5 # print("CLIENT __listen_to_model_update_request_stream: GRPC ERROR {} retrying in {}..".format( # status_code.name, timeout), flush=True) time.sleep(timeout) - except: + except Exception: raise if not self._attached: @@ -377,19 +370,17 @@ def _listen_to_model_validation_request_stream(self): try: for request in self.orchestrator.ModelValidationRequestStream(r): # Process validation request - model_id = request.model_id self._send_status("Recieved model validation request.", log_level=fedn.Status.AUDIT, type=fedn.StatusType.MODEL_VALIDATION_REQUEST, request=request) self.inbox.put(('validate', request)) - except grpc.RpcError as e: - status_code = e.code() + except grpc.RpcError: # TODO: make configurable timeout = 5 # print("CLIENT __listen_to_model_validation_request_stream: GRPC ERROR {} retrying in {}..".format( # status_code.name, timeout), flush=True) time.sleep(timeout) - except: + except Exception: raise if not self._attached: @@ -413,7 +404,7 @@ def process_request(self): processing_time = time.time()-tic meta['processing_time'] = processing_time - if model_id != None: + if model_id is not None: # Notify the combiner that a model update is available update = fedn.ModelUpdate() update.sender.name = self.name @@ -426,7 +417,6 @@ def process_request(self): update.correlation_id = request.correlation_id update.meta = json.dumps(meta) # TODO: Check responses - response = self.orchestrator.SendModelUpdate(update) self._send_status("Model update completed.", log_level=fedn.Status.AUDIT, type=fedn.StatusType.MODEL_UPDATE, request=update) @@ -443,7 +433,7 @@ def process_request(self): metrics = self._process_validation_request( request.model_id) - if metrics != None: + if metrics is not None: # Send validation validation = fedn.ModelValidation() validation.sender.name = self.name @@ -455,8 +445,6 @@ def process_request(self): self.str = str(datetime.now()) validation.timestamp = self.str validation.correlation_id = request.correlation_id - response = self.orchestrator.SendModelValidation( - validation) self._send_status("Model validation completed.", log_level=fedn.Status.AUDIT, type=fedn.StatusType.MODEL_VALIDATION, request=validation) else: @@ -469,7 +457,7 @@ def process_request(self): pass def _process_training_request(self, model_id): - """Process a training (model update) request. + """Process a training (model update) request. Parameters ---------- @@ -551,7 +539,7 @@ def _process_validation_request(self, model_id): return validation def _handle_combiner_failure(self): - """ Register failed combiner connection. + """ Register failed combiner connection. """ self._missed_heartbeat += 1 @@ -559,7 +547,7 @@ def _handle_combiner_failure(self): self._detach() def _send_heartbeat(self, update_frequency=2.0): - """Send a heartbeat to the combiner. + """Send a heartbeat to the combiner. Parameters ---------- @@ -586,9 +574,6 @@ def _send_heartbeat(self, update_frequency=2.0): def _send_status(self, msg, log_level=fedn.Status.INFO, type=None, request=None): """Send status message. """ - - from google.protobuf.json_format import MessageToJson - status = fedn.Status() status.timestamp = str(datetime.now()) status.sender.name = self.name @@ -604,19 +589,15 @@ def _send_status(self, msg, log_level=fedn.Status.INFO, type=None, request=None) self.logs.append( "{} {} LOG LEVEL {} MESSAGE {}".format(str(datetime.now()), status.sender.name, status.log_level, status.status)) - response = self.connection.SendStatus(status) def run_web(self): - """Starts a local logging UI (Flask app) serving on port 8080. + """Starts a local logging UI (Flask app) serving on port 8080. - Currently not in use as default. + Currently not in use as default. """ - from flask import Flask app = Flask(__name__) - from fedn.common.net.web.client import page, style - @app.route('/') def index(): """ @@ -629,8 +610,6 @@ def index(): return page.format(client=self.name, state=ClientStateToString(self.state), style=style, logs=logs_fancy) - import os - import sys self._original_stdout = sys.stdout sys.stdout = open(os.devnull, 'w') app.run(host="0.0.0.0", port="8080") @@ -639,7 +618,6 @@ def index(): def run(self): """ Main run loop. """ - #threading.Thread(target=self.run_web, daemon=True).start() try: cnt = 0 old_state = self.state diff --git a/fedn/fedn/clients/combiner/modelservice.py b/fedn/fedn/clients/combiner/modelservice.py index b5105efef..02c74c3e0 100644 --- a/fedn/fedn/clients/combiner/modelservice.py +++ b/fedn/fedn/clients/combiner/modelservice.py @@ -1,3 +1,5 @@ +from io import BytesIO + import fedn.common.net.grpc.fedn_pb2 as fedn import fedn.common.net.grpc.fedn_pb2_grpc as rpc from fedn.common.storage.models.tempmodelstorage import TempModelStorage @@ -29,11 +31,9 @@ def get_model(self, id): :param id: :return: """ - from io import BytesIO + data = BytesIO() data.seek(0, 0) - import random - import time parts = self.Download(fedn.ModelRequest(id=id), self) for part in parts: @@ -51,7 +51,6 @@ def set_model(self, model, id): :param model: :param id: """ - from io import BytesIO if not isinstance(model, BytesIO): bt = BytesIO() @@ -83,7 +82,7 @@ def upload_request_generator(mdl): break # TODO: Check result - result = self.Upload(upload_request_generator(bt), self) + # result = self.Upload(upload_request_generator(bt), self) # Model Service def Upload(self, request_iterator, context): @@ -120,13 +119,12 @@ def Download(self, request, context): if self.models.get_meta(request.id) != fedn.ModelStatus.OK: print("Error file is not ready", flush=True) yield fedn.ModelResponse(id=request.id, data=None, status=fedn.ModelStatus.FAILED) - except Exception as e: + except Exception: print("Error file does not exist", flush=True) yield fedn.ModelResponse(id=request.id, data=None, status=fedn.ModelStatus.FAILED) try: obj = self.models.get(request.id) - import sys with obj as f: while True: piece = f.read(CHUNK_SIZE) diff --git a/fedn/fedn/clients/combiner/roundcontrol.py b/fedn/fedn/clients/combiner/roundcontrol.py index c6d3be9db..7e19065f7 100644 --- a/fedn/fedn/clients/combiner/roundcontrol.py +++ b/fedn/fedn/clients/combiner/roundcontrol.py @@ -1,29 +1,26 @@ -import json -import os import queue +import random import sys -import tempfile import time import uuid -from threading import Lock, Thread -import fedn.common.net.grpc.fedn_pb2 as fedn +from fedn.aggregators.fedavg import FedAvgAggregator from fedn.utils.helpers import get_helper class RoundControl: - """ Combiner level round controller. + """ Combiner level round controller. - The controller recieves round configurations from the global controller - and acts on them by soliciting model updates and model validations - from the connected clients. + The controller recieves round configurations from the global controller + and acts on them by soliciting model updates and model validations from + the connected clients. - :param id: A reference to id of :class: `fedn.combiner.Combiner` + :param id: A reference to id of :class: `fedn.combiner.Combiner` :type id: str - :param storage: Model repository for :class: `fedn.combiner.Combiner` + :param storage: Model repository for :class: `fedn.combiner.Combiner` :type storage: class: `fedn.common.storage.s3.s3repo.S3ModelRepository` :param server: A handle to the Combiner class :class: `fedn.combiner.Combiner` - :type server: class: `fedn.combiner.Combiner` + :type server: class: `fedn.combiner.Combiner` :param modelservice: A handle to the model service :class: `fedn.clients.combiner.modelservice.ModelService` :type modelservice: class: `fedn.clients.combiner.modelservice.ModelService` """ @@ -38,23 +35,23 @@ def __init__(self, id, storage, server, modelservice): self.config = {} # TODO, make runtime configurable - from fedn.aggregators.fedavg import FedAvgAggregator + self.aggregator = FedAvgAggregator( self.id, self.storage, self.server, self.modelservice, self) def push_round_config(self, round_config): - """ Recieve a round_config (job description) and push on the queue. + """ Recieve a round_config (job description) and push on the queue. :param round_config: A dict containing round configurations. :type round_config: dict - :return: A generated job id (universally unique identifier) for the round configuration + :return: A generated job id (universally unique identifier) for the round configuration :rtype: str """ try: - import uuid + round_config['_job_id'] = str(uuid.uuid4()) self.round_configs.put(round_config) - except: + except Exception: self.server.report_status( "ROUNDCONTROL: Failed to push round config.", flush=True) raise @@ -73,7 +70,7 @@ def load_model_fault_tolerant(self, model_id, retry=3): # Try reading model update from local disk/combiner memory model_str = self.modelservice.models.get(model_id) # And if we cannot access that, try downloading from the server - if model_str == None: + if model_str is None: model_str = self.modelservice.get_model(model_id) # TODO: use retrying library tries = 0 @@ -82,14 +79,14 @@ def load_model_fault_tolerant(self, model_id, retry=3): if not model_str or sys.getsizeof(model_str) == 80: self.server.report_status( "ROUNDCONTROL: Model download failed. retrying", flush=True) - import time + time.sleep(1) model_str = self.modelservice.get_model(model_id) return model_str def _training_round(self, config, clients): - """Send model update requests to clients and aggregate results. + """Send model update requests to clients and aggregate results. :param config: [description] :type config: [type] @@ -142,7 +139,7 @@ def _validation_round(self, config, clients, model_id): def stage_model(self, model_id, timeout_retry=3, retry=2): """Download model from persistent storage. - :param model_id: ID of the model update object to stage. + :param model_id: ID of the model update object to stage. :type model_id: str :param timeout_retry: Sleep before retrying download again (sec), defaults to 3 :type timeout_retry: int, optional @@ -161,7 +158,7 @@ def stage_model(self, model_id, timeout_retry=3, retry=2): model = self.storage.get_model_stream(model_id) if model: break - except Exception as e: + except Exception: self.server.report_status("ROUNDCONTROL: Could not fetch model from storage backend, retrying.", flush=True) time.sleep(timeout_retry) @@ -174,7 +171,7 @@ def stage_model(self, model_id, timeout_retry=3, retry=2): self.modelservice.set_model(model, model_id) def __assign_round_clients(self, n, type="trainers"): - """ Obtain a list of clients (trainers or validators) to talk to in a round. + """ Obtain a list of clients (trainers or validators) to talk to in a round. :param n: Size of a random set taken from active trainers (clients), if n > "active trainers" all is used :type n: int @@ -198,13 +195,13 @@ def __assign_round_clients(self, n, type="trainers"): n = len(clients) # If not, we pick a random subsample of all available clients. - import random + clients = random.sample(clients, n) return clients def __check_nr_round_clients(self, config, timeout=0.0): - """Check that the minimal number of required clients to start a round are connected. + """Check that the minimal number of required clients to start a round are connected. :param config: [description] :type config: [type] @@ -214,7 +211,6 @@ def __check_nr_round_clients(self, config, timeout=0.0): :rtype: [type] """ - import time ready = False t = 0.0 while not ready: @@ -238,7 +234,7 @@ def __check_nr_round_clients(self, config, timeout=0.0): return ready def execute_validation(self, round_config): - """ Coordinate validation rounds as specified in config. + """ Coordinate validation rounds as specified in config. :param round_config: [description] :type round_config: [type] @@ -290,7 +286,7 @@ def execute_training(self, config): return round_meta def run(self): - """ Main control loop. Sequentially execute rounds based on round config. + """ Main control loop. Sequentially execute rounds based on round config. """ try: diff --git a/fedn/fedn/clients/reducer/config.py b/fedn/fedn/clients/reducer/config.py index 95794c26f..0ccfddc66 100644 --- a/fedn/fedn/clients/reducer/config.py +++ b/fedn/fedn/clients/reducer/config.py @@ -1,4 +1,4 @@ -from abc import ABC, abstractmethod +from abc import ABC class Config(ABC): diff --git a/fedn/fedn/clients/reducer/control.py b/fedn/fedn/clients/reducer/control.py index 692dfcfca..c8d1e1598 100644 --- a/fedn/fedn/clients/reducer/control.py +++ b/fedn/fedn/clients/reducer/control.py @@ -1,12 +1,13 @@ import copy import os -import tempfile import time +import uuid from datetime import datetime import fedn.utils.helpers from fedn.clients.reducer.interfaces import CombinerUnavailableError from fedn.clients.reducer.network import Network +from fedn.common.storage.s3.s3repo import S3ModelRepository from fedn.common.tracer.mongotracer import MongoTracer from .state import ReducerState @@ -21,7 +22,7 @@ class MisconfiguredStorageBackend(Exception): class ReducerControl: - """ Main conroller for training round. + """ Main conroller for training round. """ @@ -34,7 +35,7 @@ def __init__(self, statestore): try: config = self.statestore.get_storage_backend() - except: + except Exception: print( "REDUCER CONTROL: Failed to retrive storage configuration, exiting.", flush=True) raise MisconfiguredStorageBackend() @@ -44,7 +45,7 @@ def __init__(self, statestore): raise MisconfiguredStorageBackend() if config['storage_type'] == 'S3': - from fedn.common.storage.s3.s3repo import S3ModelRepository + self.model_repository = S3ModelRepository(config['storage_config']) else: print("REDUCER CONTROL: Unsupported storage backend, exiting.", flush=True) @@ -279,7 +280,7 @@ def round(self, config, round_number): self._handle_unavailable_combiner(combiner) combiner_state = None - if combiner_state != None: + if combiner_state is not None: is_participating = self.check_round_participation_policy( compute_plan, combiner_state) if is_participating: @@ -306,12 +307,11 @@ def round(self, config, round_number): for combiner, compute_plan in combiners: try: self.sync_combiners([combiner], self.get_latest_model()) - response = combiner.start(compute_plan) except CombinerUnavailableError: # This is OK, handled by round accept policy self._handle_unavailable_combiner(combiner) pass - except: + except Exception: # Unknown error raise @@ -342,7 +342,7 @@ def round(self, config, round_number): print("Checking round validity policy...", flush=True) round_valid = self.check_round_validity_policy(updated) - if round_valid == False: + if not round_valid: # TODO: Should we reset combiner state here? print("REDUCER CONTROL: Round invalid!", flush=True) return None, round_meta @@ -365,7 +365,7 @@ def round(self, config, round_number): if model is not None: # Commit to model ledger tic = time.time() - import uuid + model_id = uuid.uuid4() self.commit(model_id, model) round_meta['time_commit'] = time.time() - tic @@ -406,9 +406,6 @@ def sync_combiners(self, combiners, model_id): print("GOT NO MODEL TO SET! Have you seeded the FedML model?", flush=True) return - for combiner in combiners: - response = combiner.set_model_id(model_id) - def instruct(self, config): """ Main entrypoint, executes the compute plan. """ @@ -427,7 +424,7 @@ def instruct(self, config): # self.set_config(config) # TODO: Refactor - from fedn.common.tracer.mongotracer import MongoTracer + statestore_config = self.statestore.get_config() self.tracer = MongoTracer( statestore_config['mongo_config'], statestore_config['network_id']) @@ -440,7 +437,6 @@ def instruct(self, config): else: current_round = round - from datetime import datetime start_time = datetime.now() # start round monitor self.tracer.start_monitor(round) @@ -488,7 +484,7 @@ def reduce(self, combiners): tic = time.time() data = combiner.get_model() meta['time_fetch_model'] += (time.time() - tic) - except: + except Exception: pass helper = self.get_helper() @@ -502,7 +498,7 @@ def reduce(self, combiners): tic = time.time() model = helper.increment_average(model, model_next, i) meta['time_aggregate_model'] += (time.time() - tic) - except: + except Exception: tic = time.time() model = helper.load_model_from_BytesIO(data.getbuffer()) meta['time_aggregate_model'] += (time.time() - tic) @@ -550,7 +546,7 @@ def client_allocation_policy_least_packed(self): elif nac < min_clients: min_clients = nac selected_combiner = combiner - except CombinerUnavailableError as err: + except CombinerUnavailableError: print("Combiner was not responding, continuing to next") return selected_combiner diff --git a/fedn/fedn/clients/reducer/interfaces.py b/fedn/fedn/clients/reducer/interfaces.py index 5cb855ca5..ff1deecd0 100644 --- a/fedn/fedn/clients/reducer/interfaces.py +++ b/fedn/fedn/clients/reducer/interfaces.py @@ -1,4 +1,7 @@ +import base64 +import copy import json +from io import BytesIO import grpc @@ -20,7 +23,7 @@ def __init__(self, address, port, certificate): self.port = port self.certificate = certificate if self.certificate: - import copy + credentials = grpc.ssl_channel_credentials( root_certificates=copy.deepcopy(certificate)) self.channel = grpc.secure_channel('{}:{}'.format( @@ -34,7 +37,6 @@ def get_channel(self): :return: """ - import copy return copy.copy(self.channel) @@ -76,7 +78,7 @@ def to_dict(self): :return: """ - import base64 + cert_b64 = base64.b64encode(self.certificate) key_b64 = base64.b64encode(self.key) @@ -144,7 +146,7 @@ def configure(self, config=None): p.value = str(value) try: - response = control.Configure(request) + control.Configure(request) except grpc.RpcError as e: if e.code() == grpc.StatusCode.UNAVAILABLE: raise CombinerUnavailableError @@ -192,7 +194,7 @@ def set_model_id(self, model_id): p.value = str(model_id) try: - response = control.Configure(request) + control.Configure(request) except grpc.RpcError as e: if e.code() == grpc.StatusCode.UNAVAILABLE: raise CombinerUnavailableError @@ -228,7 +230,6 @@ def get_model(self, id=None): if not id: id = self.get_model_id() - from io import BytesIO data = BytesIO() data.seek(0, 0) diff --git a/fedn/fedn/clients/reducer/network.py b/fedn/fedn/clients/reducer/network.py index 0bdd4f0ee..727438f50 100644 --- a/fedn/fedn/clients/reducer/network.py +++ b/fedn/fedn/clients/reducer/network.py @@ -1,12 +1,8 @@ import base64 -import copy -import time from fedn.clients.reducer.interfaces import (CombinerInterface, CombinerUnavailableError) -from .state import ReducerState - class Network: """ FEDn network. """ @@ -61,7 +57,7 @@ def add_combiner(self, combiner): self.statestore.set_combiner(combiner.to_dict()) def add_client(self, client): - """ Add a new client to the network. + """ Add a new client to the network. :param client: :return: diff --git a/fedn/fedn/clients/reducer/plots.py b/fedn/fedn/clients/reducer/plots.py index b3c6f22e3..d85923aa2 100644 --- a/fedn/fedn/clients/reducer/plots.py +++ b/fedn/fedn/clients/reducer/plots.py @@ -1,23 +1,19 @@ import json -import math -import os -from datetime import datetime, timedelta +from datetime import datetime -import geoip2.database import networkx import numpy import pandas as pd import plotly -import plotly.express as px import plotly.graph_objs as go -import pymongo from bokeh.models import (Circle, ColumnDataSource, Label, LabelSet, MultiLine, NodesAndLinkedEdges, Range1d) from bokeh.palettes import Spectral8 from bokeh.plotting import figure, from_networkx -from numpy.core.einsumfunc import _flop_count +from networkx.algorithms import community +from plotly.subplots import make_subplots -from fedn.common.storage.db.mongo import connect_to_mongodb, drop_mongodb +from fedn.common.storage.db.mongo import connect_to_mongodb class Plot: @@ -54,7 +50,7 @@ def _scalar_metrics(self, metrics): try: val = float(val) valid_metrics.append(metric) - except: + except Exception: pass return valid_metrics @@ -65,7 +61,7 @@ def create_table_plot(self): :return: """ metrics = self.status.find_one({'type': 'MODEL_VALIDATION'}) - if metrics == None: + if metrics is None: fig = go.Figure(data=[]) fig.update_layout( title_text='No data currently available for table mean metrics') @@ -228,7 +224,6 @@ def create_client_plot(self): training.append(meta['exec_training']) processing.append(meta['processing_time']) - from plotly.subplots import make_subplots fig = make_subplots(rows=1, cols=2, specs=[ [{"type": "pie"}, {"type": "histogram"}]]) @@ -270,7 +265,7 @@ def create_combiner_plot(self): waiting.append(stats['time_combination'] - ml - ag) model_load.append(ml) aggregation.append(ag) - except: + except Exception: pass labels = ['Waiting for updates', @@ -307,7 +302,7 @@ def create_box_plot(self, metric): :return: """ metrics = self.status.find_one({'type': 'MODEL_VALIDATION'}) - if metrics == None: + if metrics is None: fig = go.Figure(data=[]) fig.update_layout(title_text='No data currently available for metric distribution over ' 'participants') @@ -338,16 +333,14 @@ def create_box_plot(self, metric): for model_id in model_trail_ids: try: validations_sorted.append(validations[model_id]) - except: + except Exception: pass validations = validations_sorted box = go.Figure() - x = [] y = [] - box_trace = [] for j, acc in enumerate(validations): # x.append(j) y.append(numpy.mean([float(i) for i in acc])) @@ -380,11 +373,10 @@ def create_round_plot(self): """ trace_data = [] metrics = self.round_time.find_one({'key': 'round_time'}) - if metrics == None: + if metrics is None: fig = go.Figure(data=[]) fig.update_layout( title_text='No data currently available for round time') - ml = json.dumps(fig, cls=plotly.utils.PlotlyJSONEncoder) return False for post in self.round_time.find({'key': 'round_time'}): @@ -421,7 +413,7 @@ def create_cpu_plot(self): :return: """ metrics = self.psutil_usage.find_one({'key': 'cpu_mem_usage'}) - if metrics == None: + if metrics is None: fig = go.Figure(data=[]) fig.update_layout( title_text='No data currently available for MEM and CPU usage') @@ -435,7 +427,6 @@ def create_cpu_plot(self): round = post['round'] # Create figure with secondary y-axis - from plotly.subplots import make_subplots fig = make_subplots(specs=[[{"secondary_y": True}]]) fig.add_trace(go.Scatter( x=ps_time, @@ -544,15 +535,15 @@ def make_netgraph_plot(self, df, df_nodes): G, name='adjusted_node_size', values=adjusted_node_size) # community - from networkx.algorithms import community + communities = community.greedy_modularity_communities(G) # Create empty dictionaries modularity_class = {} modularity_color = {} # Loop through each community in the network - for community_number, community in enumerate(communities): + for community_number, community_names in enumerate(communities): # For each member of the community, add their community number and a distinct color - for name in community: + for name in community_names: modularity_class[name] = community_number modularity_color[name] = Spectral8[community_number] diff --git a/fedn/fedn/clients/reducer/restservice.py b/fedn/fedn/clients/reducer/restservice.py index 5dc69d25e..3420e0b96 100644 --- a/fedn/fedn/clients/reducer/restservice.py +++ b/fedn/fedn/clients/reducer/restservice.py @@ -1,27 +1,27 @@ +import base64 +import copy import datetime import json -import math import os import re -import uuid +import threading +from io import BytesIO from threading import Lock -from urllib import response -import geoip2.database import jwt -import numpy import pandas as pd -import plotly -import plotly.express as px +from bokeh.embed import json_item +from bson import json_util from flask import (Flask, abort, flash, jsonify, make_response, redirect, - render_template, request, url_for) -from idna import check_initial_combiner -from tenacity import retry + render_template, request, send_file, send_from_directory, + url_for) from werkzeug.utils import secure_filename from fedn.clients.reducer.interfaces import CombinerInterface from fedn.clients.reducer.plots import Plot from fedn.clients.reducer.state import ReducerState, ReducerStateToString +from fedn.common.exceptions import ModelError +from fedn.common.tracer.mongotracer import MongoTracer from fedn.utils.checksum import sha UPLOAD_FOLDER = '/app/client/package/' @@ -183,7 +183,7 @@ def check_configured_response(self): def check_configured(self): """Check if compute package has been configured and that and that the - state of the ReducerControl is not in setup otherwise render setup template. + state of the ReducerControl is not in setup otherwise render setup template. Check if initial model has been configured, otherwise render setup_model template. :return: Rendered html template or None """ @@ -359,14 +359,14 @@ def netgraph(): "target": node['id'], } ) - except Exception as e: + except Exception: pass count = count + 1 return result @app.route('/networkgraph') def network_graph(): - from bokeh.embed import json_item + try: plot = Plot(self.control.statestore) result = netgraph() @@ -374,7 +374,7 @@ def network_graph(): df_edges = pd.DataFrame(result['edges']) graph = plot.make_netgraph_plot(df_edges, df_nodes) return json.dumps(json_item(graph, "myplot")) - except: + except Exception: return '' @app.route('/events') @@ -383,9 +383,6 @@ def events(): :return: """ - import json - - from bson import json_util json_docs = [] for doc in self.control.get_events(): @@ -417,14 +414,11 @@ def add(): combiner = self.control.network.get_combiner(name) if not combiner: # Create a new combiner - import base64 certificate, key = self.certificate_manager.get_or_create( address).get_keypair_raw() - cert_b64 = base64.b64encode(certificate) - key_b64 = base64.b64encode(key) # TODO append and redirect to index. - import copy + combiner = CombinerInterface(self, name, address, port, copy.deepcopy(certificate), copy.deepcopy(key), request.remote_addr) self.control.network.add_combiner(combiner) @@ -466,7 +460,7 @@ def models(): # upload seed file uploaded_seed = request.files['seed'] if uploaded_seed: - from io import BytesIO + a = BytesIO() a.seek(0, 0) uploaded_seed.seek(0) @@ -494,13 +488,13 @@ def delete_model_trail(): :return: """ if request.method == 'POST': - from fedn.common.tracer.mongotracer import MongoTracer + statestore_config = self.control.statestore.get_config() self.tracer = MongoTracer( statestore_config['mongo_config'], statestore_config['network_id']) try: self.control.drop_models() - except: + except Exception: pass # drop objects in minio @@ -539,7 +533,7 @@ def control(): if self.remote_compute_context: try: self.current_compute_context = self.control.get_compute_context() - except: + except Exception: self.current_compute_context = None else: self.current_compute_context = "None:Local" @@ -561,7 +555,7 @@ def control(): combiner_state = combiner.report() nac = combiner_state['nr_active_clients'] clients_available = clients_available + int(nac) - except Exception as e: + except Exception: pass if clients_available < clients_required: @@ -583,7 +577,6 @@ def control(): 'clients_requested': clients_requested, 'task': task, 'validate': validate, 'helper_type': helper_type} - import threading threading.Thread(target=self.control.instruct, args=(config,)).start() # self.control.instruct(config) @@ -596,7 +589,7 @@ def control(): try: seed_model_id = self.control.get_first_model()[0] latest_model_id = self.control.get_latest_model() - except Exception as e: + except Exception: pass return render_template('index.html', latest_model_id=latest_model_id, @@ -647,7 +640,7 @@ def assign(): self.control.network.add_client(client) # Return connection information to client - import base64 + cert_b64 = base64.b64encode(combiner.certificate) response = { 'status': 'assigned', @@ -672,13 +665,13 @@ def infer(): result = "" try: self.control.set_model_id() - except fedn.exceptions.ModelError: + except ModelError: print("Failed to seed control.") return result def combiner_status(): - """ Get current status reports from all combiners registered in the network. + """ Get current status reports from all combiners registered in the network. :return: """ @@ -687,7 +680,7 @@ def combiner_status(): try: report = combiner.report() combiner_info.append(report) - except: + except Exception: pass return combiner_info @@ -706,9 +699,9 @@ def client_status(): active_trainers_str = client['active_trainers'] active_validators_str = client['active_validators'] active_trainers_str = re.sub( - '[^a-zA-Z0-9-:\n\.]', '', active_trainers_str).replace('name:', ' ') + '[^a-zA-Z0-9-:\n\.]', '', active_trainers_str).replace('name:', ' ') # noqa: W605 active_validators_str = re.sub( - '[^a-zA-Z0-9-:\n\.]', '', active_validators_str).replace('name:', ' ') + '[^a-zA-Z0-9-:\n\.]', '', active_validators_str).replace('name:', ' ') # noqa: W605 all_active_trainers.extend( ' '.join(active_trainers_str.split(" ")).split()) all_active_validators.extend( @@ -744,7 +737,7 @@ def client_status(): 'active_trainers': active_trainers_list, 'active_validators': active_validators_list } - except: + except Exception: pass return {'active_clients': [], @@ -844,7 +837,7 @@ def config_download(): try: sum = str(sha(file_path)) - except FileNotFoundError as e: + except FileNotFoundError: sum = '' chk_string = "checksum: {}".format(sum) @@ -860,9 +853,6 @@ def config_download(): discover_port=discover_port, chk_string=chk_string) - from io import BytesIO - - from flask import send_file obj = BytesIO() obj.write(ctx.encode('UTF-8')) obj.seek(0) @@ -915,12 +905,11 @@ def context(): self.control.statestore.set_framework(helper_type) return redirect(url_for('control')) - from flask import send_from_directory name = request.args.get('name', '') if name == '': name = self.control.get_compute_context() - if name == None or name == '': + if name is None or name == '': return render_template('context.html') # There is a potential race condition here, if one client requests a package and at @@ -929,14 +918,14 @@ def context(): mutex = Lock() mutex.acquire() return send_from_directory(app.config['UPLOAD_FOLDER'], name, as_attachment=True) - except: + except Exception: try: data = self.control.get_compute_package(name) file_path = os.path.join(app.config['UPLOAD_FOLDER'], name) with open(file_path, 'wb') as fh: fh.write(data) return send_from_directory(app.config['UPLOAD_FOLDER'], name, as_attachment=True) - except: + except Exception: raise finally: mutex.release() @@ -953,20 +942,19 @@ def checksum(): name = request.args.get('name', None) if name == '' or name is None: name = self.control.get_compute_context() - if name == None or name == '': + if name is None or name == '': return jsonify({}) file_path = os.path.join(UPLOAD_FOLDER, name) print("trying to get {}".format(file_path)) - from fedn.utils.checksum import sha try: sum = str(sha(file_path)) - except FileNotFoundError as e: + except FileNotFoundError: sum = '' data = {'checksum': sum} - from flask import jsonify + return jsonify(data) if self.certificate: diff --git a/fedn/fedn/clients/reducer/statestore/mongoreducerstatestore.py b/fedn/fedn/clients/reducer/statestore/mongoreducerstatestore.py index 1a1030853..f60ae0bad 100644 --- a/fedn/fedn/clients/reducer/statestore/mongoreducerstatestore.py +++ b/fedn/fedn/clients/reducer/statestore/mongoreducerstatestore.py @@ -1,3 +1,9 @@ +import copy +from datetime import datetime + +import pymongo +import yaml + from fedn.clients.reducer.state import (ReducerStateToString, StringToReducerState) from fedn.common.storage.db.mongo import connect_to_mongodb @@ -48,7 +54,6 @@ def __init__(self, network_id, config, defaults=None): self.clients = None raise - import yaml if defaults: with open(defaults, 'r') as file: try: @@ -85,12 +90,12 @@ def __init__(self, network_id, config, defaults=None): round_config = {'timeout': 180, 'validate': True} try: round_config['timeout'] = control['timeout'] - except: + except Exception: pass try: round_config['validate'] = control['validate'] - except: + except Exception: pass # Storage settings @@ -144,18 +149,18 @@ def set_latest(self, model_id): :param model_id: """ - from datetime import datetime - x = self.model.update_one({'key': 'current_model'}, { - '$set': {'model': model_id}}, True) + + self.model.update_one({'key': 'current_model'}, { + '$set': {'model': model_id}}, True) self.model.update_one({'key': 'model_trail'}, {'$push': {'model': model_id, 'committed_at': str(datetime.now())}}, True) def get_first(self): """ Return model_id for the latest model in the model_trail """ - import pymongo + ret = self.model.find_one({'key': 'model_trail'}, sort=[ ("committed_at", pymongo.ASCENDING)]) - if ret == None: + if ret is None: return None try: @@ -169,7 +174,7 @@ def get_first(self): def get_latest(self): """ Return model_id for the latest model in the model_trail """ ret = self.model.find_one({'key': 'current_model'}) - if ret == None: + if ret is None: return None try: @@ -185,8 +190,7 @@ def set_round_config(self, config): :param config: """ - from datetime import datetime - x = self.control.config.update_one( + self.control.config.update_one( {'key': 'round_config'}, {'$set': config}, True) def get_round_config(self): @@ -197,7 +201,7 @@ def get_round_config(self): ret = self.control.config.find({'key': 'round_config'}) try: retcheck = ret[0] - if retcheck == None or retcheck == '' or retcheck == ' ': # ugly check for empty string + if retcheck is None or retcheck == '' or retcheck == ' ': # ugly check for empty string return None return retcheck except (KeyError, IndexError): @@ -208,8 +212,7 @@ def set_compute_context(self, filename): :param filename: """ - from datetime import datetime - x = self.control.config.update_one( + self.control.config.update_one( {'key': 'package'}, {'$set': {'filename': filename}}, True) self.control.config.update_one({'key': 'package_trail'}, {'$push': {'filename': filename, 'committed_at': str(datetime.now())}}, True) @@ -222,7 +225,7 @@ def get_compute_context(self): ret = self.control.config.find({'key': 'package'}) try: retcheck = ret[0] - if retcheck == None or retcheck == '' or retcheck == ' ': # ugly check for empty string + if retcheck is None or retcheck == '' or retcheck == ' ': # ugly check for empty string return None return retcheck except (KeyError, IndexError): @@ -291,27 +294,24 @@ def get_storage_backend(self): def set_storage_backend(self, config): """ """ - import copy - from datetime import datetime config = copy.deepcopy(config) config['updated_at'] = str(datetime.now()) config['status'] = 'enabled' - ret = self.storage.update_one( + self.storage.update_one( {'storage_type': config['storage_type']}, {'$set': config}, True) def set_reducer(self, reducer_data): """ """ - from datetime import datetime reducer_data['updated_at'] = str(datetime.now()) - ret = self.reducer.update_one({'name': reducer_data['name']}, { - '$set': reducer_data}, True) + self.reducer.update_one({'name': reducer_data['name']}, { + '$set': reducer_data}, True) def get_reducer(self): """ """ try: ret = self.reducer.find_one() return ret - except: + except Exception: return None def list_combiners(self): @@ -319,7 +319,7 @@ def list_combiners(self): try: ret = self.combiners.find() return list(ret) - except: + except Exception: return None def get_combiner(self, name): @@ -327,7 +327,7 @@ def get_combiner(self, name): try: ret = self.combiners.find_one({'name': name}) return ret - except: + except Exception: return None def get_combiners(self): @@ -335,36 +335,35 @@ def get_combiners(self): try: ret = self.combiners.find() return list(ret) - except: + except Exception: return None def set_combiner(self, combiner_data): - """ - Set or update combiner record. + """ + Set or update combiner record. combiner_data: dictionary, output of combiner.to_dict()) """ - from datetime import datetime + combiner_data['updated_at'] = str(datetime.now()) - ret = self.combiners.update_one({'name': combiner_data['name']}, { - '$set': combiner_data}, True) + self.combiners.update_one({'name': combiner_data['name']}, { + '$set': combiner_data}, True) def delete_combiner(self, combiner): """ """ try: self.combiners.delete_one({'name': combiner}) - except: + except Exception: print("WARNING, failed to delete combiner: {}".format( combiner), flush=True) def set_client(self, client_data): - """ - Set or update client record. + """ + Set or update client record. client_data: dictionarys """ - from datetime import datetime client_data['updated_at'] = str(datetime.now()) - ret = self.clients.update_one({'name': client_data['name']}, { - '$set': client_data}, True) + self.clients.update_one({'name': client_data['name']}, { + '$set': client_data}, True) def get_client(self, name): """ """ @@ -374,7 +373,7 @@ def get_client(self, name): return None else: return ret - except: + except Exception: return None def list_clients(self): @@ -382,7 +381,7 @@ def list_clients(self): try: ret = self.clients.find() return list(ret) - except: + except Exception: return None def drop_control(self): diff --git a/fedn/fedn/combiner.py b/fedn/fedn/combiner.py index 8b38a6d37..b2eca7ded 100644 --- a/fedn/fedn/combiner.py +++ b/fedn/fedn/combiner.py @@ -1,24 +1,21 @@ import base64 -import io -import json -import os import queue +import signal import sys import threading import time import uuid -from collections import defaultdict from datetime import datetime, timedelta from enum import Enum -import requests - import fedn.common.net.grpc.fedn_pb2 as fedn import fedn.common.net.grpc.fedn_pb2_grpc as rpc from fedn.clients.combiner.modelservice import ModelService +from fedn.clients.combiner.roundcontrol import RoundControl from fedn.common.net.connect import ConnectorCombiner, Status from fedn.common.net.grpc.server import Server from fedn.common.storage.s3.s3repo import S3ModelRepository +from fedn.common.tracer.mongotracer import MongoTracer class Role(Enum): @@ -98,11 +95,9 @@ def __init__(self, connect_config): config['storage']['storage_config']) self.server = Server(self, self.modelservice, grpc_config) - from fedn.common.tracer.mongotracer import MongoTracer self.tracer = MongoTracer( config['statestore']['mongo_config'], config['statestore']['network_id']) - from fedn.clients.combiner.roundcontrol import RoundControl self.control = RoundControl( self.id, self.repository, self, self.modelservice) threading.Thread(target=self.control.run, daemon=True).start() @@ -154,11 +149,11 @@ def request_model_update(self, model_id, clients=[]): Parameters ---------- model_id : str - The id of the model to be updated. - clients : list - List of clients to submit a model update request to. - An empty list (default) results in a broadcast to - all connected trainig clients. + The id of the model to be updated. + clients : list + List of clients to submit a model update request to. + An empty list (default) results in a broadcast to + all connected trainig clients. """ @@ -180,16 +175,16 @@ def request_model_update(self, model_id, clients=[]): model_id, clients), flush=True) def request_model_validation(self, model_id, clients=[]): - """ Ask clients to validate the current global model. + """ Ask clients to validate the current global model. Parameters ---------- model_id : str - The id of the model to be updated. - clients : list - List of clients to submit a model update request to. - An empty list (default) results in a broadcast to - all connected trainig clients. + The id of the model to be updated. + clients : list + List of clients to submit a model update request to. + An empty list (default) results in a broadcast to + all connected trainig clients. """ @@ -251,12 +246,12 @@ def nr_active_validators(self): def __join_client(self, client): """ Add a client to the combiner. """ - if not client.name in self.clients.keys(): + if client.name not in self.clients.keys(): self.clients[client.name] = {"lastseen": datetime.now()} def _subscribe_client_to_queue(self, client, queue_name): self.__join_client(client) - if not queue_name in self.clients[client.name].keys(): + if queue_name not in self.clients[client.name].keys(): self.clients[client.name][queue_name] = queue.Queue() def __get_queue(self, client, queue_name): @@ -281,7 +276,7 @@ def __route_request_to_client(self, request, client, queue_name): try: q = self.__get_queue(client, queue_name) q.put(request) - except: + except Exception: print("Failed to route request to client: {} {}", request.receiver, queue_name) raise @@ -307,7 +302,7 @@ def __register_heartbeat(self, client): # Control Service def Start(self, control: fedn.ControlRequest, context): - """ Push a round config to RoundControl. + """ Push a round config to RoundControl. :param control: :param context: @@ -322,7 +317,6 @@ def Start(self, control: fedn.ControlRequest, context): print("\n\nSTARTING ROUND AT COMBINER WITH ROUND CONFIG: {}\n\n".format( config), flush=True) - job_id = self.control.push_round_config(config) return response def Configure(self, control: fedn.ControlRequest, context): @@ -387,7 +381,7 @@ def Report(self, control: fedn.ControlRequest, context): p = response.parameter.add() p.key = "model_id" model_id = self.get_active_model() - if model_id == None: + if model_id is None: model_id = "" p.value = str(model_id) @@ -664,7 +658,7 @@ def run(self): """ """ - import signal + print("COMBINER: {} started, ready for requests. ".format( self.id), flush=True) try: diff --git a/fedn/fedn/common/control/package.py b/fedn/fedn/common/control/package.py index d44afa395..23971d789 100644 --- a/fedn/fedn/common/control/package.py +++ b/fedn/fedn/common/control/package.py @@ -1,4 +1,11 @@ +import cgi +import hashlib import os +import tarfile +from distutils.dir_util import copy_tree + +import requests +import yaml from fedn.utils.checksum import sha from fedn.utils.dispatcher import Dispatcher @@ -34,14 +41,11 @@ def package(self, validate=False): package_file = '{name}.tar.gz'.format(name=self.name) # package the file - import os cwd = os.getcwd() self.file_path = os.getcwd() if self.config['cwd'] == '': self.file_path = os.getcwd() os.chdir(self.file_path) - - import tarfile with tarfile.open(os.path.join(os.path.dirname(self.file_path), package_file), 'w:gz') as tf: # for walking the current dir with absolute path (in archive) # for root, dirs, files in os.walk(self.file_path): @@ -52,7 +56,6 @@ def package(self, validate=False): tf.add(file) tf.close() - import hashlib hsh = hashlib.sha256() with open(os.path.join(os.path.dirname(self.file_path), package_file), 'rb') as f: for byte_block in iter(lambda: f.read(4096), b""): @@ -69,10 +72,6 @@ def upload(self): """ if self.package_file: - import os - - import requests - # data = {'name': self.package_file, 'hash': str(self.package_hash)} # print("going to send {}".format(data),flush=True) f = open(os.path.join(os.path.dirname( @@ -81,10 +80,10 @@ def upload(self): f.seek(0, 0) files = {'file': f} try: - retval = requests.post('https://{}:{}/context'.format(self.reducer_host, self.reducer_port), - verify=False, files=files, - # data=data, - headers={'Authorization': 'Token {}'.format(self.reducer_token)}) + requests.post('https://{}:{}/context'.format(self.reducer_host, self.reducer_port), + verify=False, files=files, + # data=data, + headers={'Authorization': 'Token {}'.format(self.reducer_token)}) except Exception as e: print("failed to put execution context to reducer. {}".format( e), flush=True) @@ -121,15 +120,13 @@ def download(self, host, port, token, name=None): :param name: :return: """ - import requests - path = "https://{}:{}/context".format(host, port) if name: path = path + "?name={}".format(name) with requests.get(path, stream=True, verify=False, headers={'Authorization': 'Token {}'.format(token)}) as r: if 200 <= r.status_code < 204: - import cgi + params = cgi.parse_header( r.headers.get('Content-Disposition', ''))[-1] try: @@ -152,7 +149,7 @@ def download(self, host, port, token, name=None): data = r.json() try: self.checksum = data['checksum'] - except Exception as e: + except Exception: print("Could not extract checksum!") return True @@ -184,9 +181,6 @@ def unpack(self): """ """ - import os - import tarfile - if self.pkg_name: f = None if self.pkg_name.endswith('tar.gz'): @@ -202,8 +196,7 @@ def unpack(self): print( "Failed to unpack compute package, no pkg_name set. Has the reducer been configured with a compute package?") - import os - cwd = os.getcwd() + os.getcwd() try: os.chdir(self.dir) @@ -211,7 +204,7 @@ def unpack(self): f.extractall() print("Successfully extracted compute package content in {}".format( self.dir), flush=True) - except: + except Exception: print("Error extracting files!") def dispatcher(self, run_path): @@ -222,19 +215,17 @@ def dispatcher(self, run_path): """ from_path = os.path.join(os.getcwd(), 'client') - from distutils.dir_util import copy_tree - # preserve_times=False ensures compatibility with Gramine LibOS copy_tree(from_path, run_path, preserve_times=False) try: cfg = None with open(os.path.join(run_path, 'fedn.yaml'), 'rb') as config_file: - import yaml + cfg = yaml.safe_load(config_file.read()) self.dispatch_config = cfg - except Exception as e: + except Exception: print( "Error trying to load and unpack dispatcher config - trying default", flush=True) diff --git a/fedn/fedn/common/net/connect.py b/fedn/fedn/common/net/connect.py index ebfd15bc5..37b822e2d 100644 --- a/fedn/fedn/common/net/connect.py +++ b/fedn/fedn/common/net/connect.py @@ -1,7 +1,8 @@ import enum -from http.client import UNAUTHORIZED +import os import requests as r +import urllib3 from fedn.common.security.certificate import Certificate @@ -29,7 +30,6 @@ def __init__(self, host, port, token, name, remote_package, combiner=None, id=No verify_cert=False): if not verify_cert: - import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) self.host = host @@ -48,7 +48,6 @@ def __init__(self, host, port, token, name, remote_package, combiner=None, id=No else: prefix = "https://" if secure and preshared_cert: - import os self.certificate = Certificate(os.getcwd() + "/certs/", name="client", key_name="client-key.pem", cert_name="client-cert.pem").cert_path else: @@ -122,7 +121,6 @@ class ConnectorCombiner: def __init__(self, host, port, myhost, myport, token, name, secure=True, preshared_cert=True, verify_cert=False): if not verify_cert: - import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) self.host = host @@ -139,7 +137,6 @@ def __init__(self, host, port, myhost, myport, token, name, secure=True, preshar else: prefix = "https://" if secure and preshared_cert: - import os self.certificate = Certificate(os.getcwd() + "/certs/", name="client", key_name="client-key.pem", cert_name="client-cert.pem", ).cert_path @@ -173,7 +170,7 @@ def announce(self): self.myport), verify=cert, headers={'Authorization': 'Token {}'.format(self.token)}) - except Exception as e: + except Exception: # self.state = State.Disconnected return Status.Unassigned, {} diff --git a/fedn/fedn/common/net/grpc/server.py b/fedn/fedn/common/net/grpc/server.py index 79a251de1..d9998364b 100644 --- a/fedn/fedn/common/net/grpc/server.py +++ b/fedn/fedn/common/net/grpc/server.py @@ -27,10 +27,6 @@ def __init__(self, servicer, modelservicer, config): rpc.add_ControlServicer_to_server(servicer, self.server) if config['secure']: - import os - - from fedn.common.security.certificate import Certificate - # self.certificate = Certificate(os.getcwd() + '/certs/', cert_name='combiner-cert.pem', key_name='combiner-key.pem') # self.certificate.set_keypair_raw(config['certificate'], config['key']) diff --git a/fedn/fedn/common/net/web/client.py b/fedn/fedn/common/net/web/client.py index 8aa4e1657..ba3a31723 100644 --- a/fedn/fedn/common/net/web/client.py +++ b/fedn/fedn/common/net/web/client.py @@ -146,4 +146,4 @@ -""" +""" # noqa E501 diff --git a/fedn/fedn/common/security/certificate.py b/fedn/fedn/common/security/certificate.py index bf90645f7..b3181178e 100644 --- a/fedn/fedn/common/security/certificate.py +++ b/fedn/fedn/common/security/certificate.py @@ -1,4 +1,7 @@ +import copy import os +import random +import uuid from OpenSSL import crypto @@ -22,7 +25,6 @@ def __init__(self, cwd, name=None, key_name="key.pem", cert_name="cert.pem", cre "Successfully created the directory to store cert and keys in {}".format(cwd)) self.key_path = os.path.join(cwd, key_name) self.cert_path = os.path.join(cwd, cert_name) - import uuid if name: self.name = name else: @@ -37,11 +39,10 @@ def gen_keypair(self, ): cert = crypto.X509() cert.get_subject().C = "SE" cert.get_subject().ST = "Stockholm" - cert.get_subject().O = "Development Key" + cert.get_subject().O = "Development Key" # noqa: E741 cert.get_subject().OU = "Development Key" cert.get_subject().CN = self.name # gethostname() - import random cert.set_serial_number(int(random.randint(1000, 100000))) cert.gmtime_adj_notBefore(0) @@ -79,7 +80,6 @@ def get_keypair_raw(self): key_buf = keyfile.read() with open(self.cert_path, 'rb') as certfile: cert_buf = certfile.read() - import copy return copy.deepcopy(cert_buf), copy.deepcopy(key_buf) def get_key(self): diff --git a/fedn/fedn/common/security/certificatemanager.py b/fedn/fedn/common/security/certificatemanager.py index 5637d7344..7171d26b4 100644 --- a/fedn/fedn/common/security/certificatemanager.py +++ b/fedn/fedn/common/security/certificatemanager.py @@ -1,3 +1,5 @@ +import os + from .certificate import Certificate @@ -44,7 +46,6 @@ def load_all(self): """ """ - import os for filename in sorted(os.listdir(self.directory)): if filename.endswith('cert.pem'): name = filename.split('-')[0] diff --git a/fedn/fedn/common/storage/db/mongo.py b/fedn/fedn/common/storage/db/mongo.py index 593afc6a9..e7cbf5b8f 100644 --- a/fedn/fedn/common/storage/db/mongo.py +++ b/fedn/fedn/common/storage/db/mongo.py @@ -1,5 +1,3 @@ -import os - import pymongo diff --git a/fedn/fedn/common/storage/models/memorymodelstorage.py b/fedn/fedn/common/storage/models/memorymodelstorage.py index 27ad9c8be..9ef5559cd 100644 --- a/fedn/fedn/common/storage/models/memorymodelstorage.py +++ b/fedn/fedn/common/storage/models/memorymodelstorage.py @@ -1,5 +1,6 @@ import io from collections import defaultdict +from io import BytesIO from fedn.common.storage.models.modelstorage import ModelStorage @@ -12,7 +13,6 @@ class MemoryModelStorage(ModelStorage): """ def __init__(self): - import tempfile # self.dir = tempfile.TemporaryDirectory() self.models = defaultdict(io.BytesIO) @@ -34,7 +34,6 @@ def get(self, model_id): :param model_id: :return: """ - from io import BytesIO obj = self.models[model_id] obj.seek(0, 0) # Have to copy object to not mix up the file pointers when sending... fix in better way. diff --git a/fedn/fedn/common/storage/models/tempmodelstorage.py b/fedn/fedn/common/storage/models/tempmodelstorage.py index 681573b94..f87c516b0 100644 --- a/fedn/fedn/common/storage/models/tempmodelstorage.py +++ b/fedn/fedn/common/storage/models/tempmodelstorage.py @@ -1,4 +1,5 @@ import os +from io import BytesIO import fedn.common.net.grpc.fedn_pb2 as fedn from fedn.common.storage.models.modelstorage import ModelStorage @@ -48,7 +49,6 @@ def get(self, model_id): print("No such model have been made available yet!", flush=True) return None - from io import BytesIO obj = BytesIO() with open(os.path.join(self.default_dir, str(model_id)), 'rb') as f: obj.write(f.read()) diff --git a/fedn/fedn/common/storage/s3/miniorepo.py b/fedn/fedn/common/storage/s3/miniorepo.py index cb489de89..9341704e6 100644 --- a/fedn/fedn/common/storage/s3/miniorepo.py +++ b/fedn/fedn/common/storage/s3/miniorepo.py @@ -1,13 +1,9 @@ import io -import json import logging -import os -import uuid -from urllib.parse import urlparse -import requests from minio import Minio from minio.error import InvalidResponseError +from urllib3.poolmanager import PoolManager from .base import Repository @@ -48,7 +44,6 @@ def __init__(self, config): "\n\n\nWARNING : S3/MINIO RUNNING IN **INSECURE** MODE! THIS IS NOT FOR PRODUCTION!\n\n\n") if self.secure_mode: - from urllib3.poolmanager import PoolManager manager = PoolManager( num_pools=100, cert_reqs='CERT_NONE', assert_hostname=False) self.client = Minio("{0}:{1}".format(config['storage_hostname'], config['storage_port']), @@ -74,15 +69,15 @@ def create_bucket(self, bucket_name): found = self.client.bucket_exists(bucket_name) if not found: try: - response = self.client.make_bucket(bucket_name) - except InvalidResponseError as err: + self.client.make_bucket(bucket_name) + except InvalidResponseError: raise def set_artifact(self, instance_name, instance, is_file=False, bucket=''): """ Instance must be a byte-like object. """ if bucket == '': bucket = self.bucket - if is_file == True: + if is_file: self.client.fput_object(bucket, instance_name, instance) else: try: @@ -132,7 +127,7 @@ def list_artifacts(self): for obj in objs: print(obj.object_name) objects_to_delete.append(obj.object_name) - except Exception as e: + except Exception: raise Exception( "Could not list models in bucket {}".format(self.bucket)) return objects_to_delete @@ -164,5 +159,5 @@ def delete_objects(self): ) for del_err in errors: print("Deletion Error: {}".format(del_err)) - except: + except Exception: print('Could not delete objects: {}'.format(objects_to_delete)) diff --git a/fedn/fedn/common/storage/s3/s3repo.py b/fedn/fedn/common/storage/s3/s3repo.py index d572be4ba..e660e9124 100644 --- a/fedn/fedn/common/storage/s3/s3repo.py +++ b/fedn/fedn/common/storage/s3/s3repo.py @@ -1,3 +1,5 @@ +import uuid + from .miniorepo import MINIORepository @@ -36,13 +38,12 @@ def set_model(self, model, is_file=True): :param is_file: :return: """ - import uuid model_id = uuid.uuid4() # TODO: Check that this call succeeds try: self.set_artifact(str(model_id), model, bucket=self.bucket, is_file=is_file) - except Exception as e: + except Exception: print("Failed to write model with ID {} to repository.".format(model_id)) raise return str(model_id) @@ -57,7 +58,7 @@ def set_compute_context(self, name, compute_package, is_file=True): try: self.set_artifact(str(name), compute_package, bucket="fedn-context", is_file=is_file) - except Exception as e: + except Exception: print("Failed to write compute_package to repository.") raise @@ -69,7 +70,7 @@ def get_compute_package(self, compute_package): """ try: data = self.get_artifact(compute_package, bucket="fedn-context") - except Exception as e: + except Exception: print("Failed to get compute_package from repository.") raise return data @@ -81,6 +82,6 @@ def delete_compute_context(self, compute_package): """ try: self.delete_artifact(compute_package, bucket=['fedn-context']) - except Exception as e: + except Exception: print("Failed to delete compute_package from repository.") raise diff --git a/fedn/fedn/common/tracer/mongotracer.py b/fedn/fedn/common/tracer/mongotracer.py index 0af957e79..5cf6a93a8 100644 --- a/fedn/fedn/common/tracer/mongotracer.py +++ b/fedn/fedn/common/tracer/mongotracer.py @@ -1,8 +1,8 @@ import threading -import time from datetime import datetime import psutil +from google.protobuf.json_format import MessageToDict from fedn.common.storage.db.mongo import connect_to_mongodb from fedn.common.tracer.tracer import Tracer @@ -34,12 +34,11 @@ def report(self, msg): :param msg: """ - from google.protobuf.json_format import MessageToDict data = MessageToDict(msg, including_default_value_fields=True) print("LOG: \n {} \n".format(data), flush=True) - if self.status != None: + if self.status is not None: self.status.insert_one(data) def drop_round_time(self): diff --git a/fedn/fedn/reducer.py b/fedn/fedn/reducer.py index 1394c5904..39781d51a 100644 --- a/fedn/fedn/reducer.py +++ b/fedn/fedn/reducer.py @@ -1,12 +1,12 @@ import os import threading +import time +from datetime import datetime from fedn.clients.reducer.control import ReducerControl from fedn.clients.reducer.interfaces import ReducerInferenceInterface from fedn.clients.reducer.restservice import ReducerRestService from fedn.clients.reducer.state import ReducerStateToString -from fedn.clients.reducer.statestore.mongoreducerstatestore import \ - MongoReducerStateStore from fedn.common.security.certificatemanager import CertificateManager @@ -33,11 +33,6 @@ def __init__(self, statestore): self.name = config['name'] - try: - path = config['path'] - except KeyError: - path = os.getcwd() - self.certificate_manager = CertificateManager(os.getcwd() + "/certs/") self.control = ReducerControl(self.statestore) @@ -58,8 +53,6 @@ def control_loop(self): """ """ - import time - from datetime import datetime try: old_state = self.control.state() diff --git a/fedn/fedn/tests/test_reducer_service.py b/fedn/fedn/tests/test_reducer_service.py index 93cf761b1..fc5ca8d9b 100644 --- a/fedn/fedn/tests/test_reducer_service.py +++ b/fedn/fedn/tests/test_reducer_service.py @@ -1,8 +1,7 @@ import unittest -from unittest.mock import MagicMock, patch +from unittest.mock import patch from fedn.clients.reducer.restservice import ReducerRestService -from fedn.clients.reducer.state import ReducerState class TestInit(unittest.TestCase): diff --git a/fedn/fedn/utils/dispatcher.py b/fedn/fedn/utils/dispatcher.py index 228c4f4bd..63743e6a6 100644 --- a/fedn/fedn/utils/dispatcher.py +++ b/fedn/fedn/utils/dispatcher.py @@ -1,5 +1,4 @@ import logging -import re from fedn.utils.process import run_process diff --git a/fedn/fedn/utils/helpers.py b/fedn/fedn/utils/helpers.py index 6e4d990a3..3de075d4c 100644 --- a/fedn/fedn/utils/helpers.py +++ b/fedn/fedn/utils/helpers.py @@ -1,6 +1,3 @@ -import collections -import os -import tempfile from abc import ABC, abstractmethod @@ -12,8 +9,8 @@ def __init__(self): @abstractmethod def increment_average(self, model, model_next, n): - """ Compute one increment of incremental averaging. - n: the iteration index 1...N in the sequence. + """ Compute one increment of incremental averaging. + n: the iteration index 1...N in the sequence. """ pass @@ -47,19 +44,19 @@ def get_tmp_path(self): def get_helper(helper_type): - """ Return an instance of the helper class. + """ Return an instance of the helper class. :param helper_type (str): The helper type ('keras','pytorch') :return: """ if helper_type == 'numpyarray': - from fedn.utils.numpyarrayhelper import NumpyArrayHelper - return NumpyArrayHelper() + __import__('NumpyArrayHelper', fromlist=["fedn.utils.numpyarrayhelper"]) + return NumpyArrayHelper() # noqa: F821 elif helper_type == 'keras': - from fedn.utils.kerashelper import KerasHelper - return KerasHelper() + __import__('KerasHelper', fromlist=["fedn.utils.kerashelper"]) + return KerasHelper() # noqa: F821 elif helper_type == 'pytorch': - from fedn.utils.pytorchhelper import PytorchHelper - return PytorchHelper() + __import__('PytorchHelper', fromlist=["fedn.utils.pytorchhelper"]) + return PytorchHelper() # noqa: F821 else: return None diff --git a/fedn/fedn/utils/kerashelper.py b/fedn/fedn/utils/kerashelper.py index 5d5b2a164..be081e45e 100644 --- a/fedn/fedn/utils/kerashelper.py +++ b/fedn/fedn/utils/kerashelper.py @@ -1,6 +1,6 @@ -import collections import os import tempfile +from io import BytesIO import numpy as np @@ -14,8 +14,8 @@ def average_weights(self, weights): """ Average weights of Keras Sequential models. """ avg_w = [] - for l in range(len(weights[0])): - lay_l = np.array([w[l] for w in weights]) + for i in range(len(weights[0])): + lay_l = np.array([w[i] for w in weights]) weight_l_avg = np.mean(lay_l, 0) avg_w.append(weight_l_avg) @@ -34,7 +34,7 @@ def set_weights(self, weights_, weights): :param weights_: :param weights: """ - weights_ = weights + weights_ = weights # noqa F841 def get_weights(self, weights): """ @@ -98,7 +98,6 @@ def serialize_model_to_BytesIO(self, model): """ outfile_name = self.save_model(model) - from io import BytesIO a = BytesIO() a.seek(0, 0) with open(outfile_name, 'rb') as f: diff --git a/fedn/fedn/utils/logger.py b/fedn/fedn/utils/logger.py index 22e50ebc1..08c2c94e1 100644 --- a/fedn/fedn/utils/logger.py +++ b/fedn/fedn/utils/logger.py @@ -8,8 +8,6 @@ class Logger: """ def __init__(self, log_level=logging.DEBUG, to_file='', file_path=os.getcwd()): - import sys - root = logging.getLogger() root.setLevel(log_level) diff --git a/fedn/fedn/utils/numpyarrayhelper.py b/fedn/fedn/utils/numpyarrayhelper.py index 1f481a309..fee1ac42b 100644 --- a/fedn/fedn/utils/numpyarrayhelper.py +++ b/fedn/fedn/utils/numpyarrayhelper.py @@ -1,7 +1,6 @@ -import collections import os -import pickle import tempfile +from io import BytesIO import numpy as np @@ -44,7 +43,6 @@ def serialize_model_to_BytesIO(self, model): """ outfile_name = self.save_model(model) - from io import BytesIO a = BytesIO() a.seek(0, 0) with open(outfile_name, 'rb') as f: diff --git a/fedn/fedn/utils/pytorchhelper.py b/fedn/fedn/utils/pytorchhelper.py index 7d816bc8c..fe1330b17 100644 --- a/fedn/fedn/utils/pytorchhelper.py +++ b/fedn/fedn/utils/pytorchhelper.py @@ -1,7 +1,7 @@ import os import tempfile from collections import OrderedDict -from functools import reduce +from io import BytesIO import numpy as np @@ -69,7 +69,6 @@ def serialize_model_to_BytesIO(self, model): """ outfile_name = self.save_model(model) - from io import BytesIO a = BytesIO() a.seek(0, 0) with open(outfile_name, 'rb') as f: diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 000000000..b98843069 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,5 @@ +[flake8] +max-line-length = 160 + +[pep8] +max-line-length = 160 \ No newline at end of file