From b99e593d35fe2eff107de6f75499aeda08efa50a Mon Sep 17 00:00:00 2001 From: Niklas Date: Fri, 20 Oct 2023 15:51:52 +0200 Subject: [PATCH] pagination added to get events --- fedn/fedn/network/api/interface.py | 20 +- fedn/fedn/network/api/server.py | 2 - fedn/fedn/network/dashboard/restservice.py | 871 +++++++++++------- .../network/dashboard/templates/events.html | 65 +- .../network/statestore/mongostatestore.py | 26 +- 5 files changed, 588 insertions(+), 396 deletions(-) diff --git a/fedn/fedn/network/api/interface.py b/fedn/fedn/network/api/interface.py index 5fa1cc1dd..e45d15919 100644 --- a/fedn/fedn/network/api/interface.py +++ b/fedn/fedn/network/api/interface.py @@ -1,11 +1,10 @@ import base64 import copy -import json + import os import threading from io import BytesIO -from bson import json_util from flask import jsonify, send_from_directory from werkzeug.utils import secure_filename @@ -380,19 +379,20 @@ def get_events(self, **kwargs): :return: The events as a json object. :rtype: :py:class:`flask.Response` """ - event_objects = self.statestore.get_events(**kwargs) - if event_objects is None: + response = self.statestore.get_events(**kwargs) + + result = response["result"] + if result is None: return ( jsonify({"success": False, "message": "No events found."}), 404, ) - json_docs = [] - for doc in self.statestore.get_events(**kwargs): - json_doc = json.dumps(doc, default=json_util.default) - json_docs.append(json_doc) - json_docs.reverse() - return jsonify({"events": json_docs}) + events = [] + for evt in result: + events.append(evt) + + return jsonify({"result": events, "count": response["count"]}) def get_all_validations(self, **kwargs): """Get all validations from the statestore. diff --git a/fedn/fedn/network/api/server.py b/fedn/fedn/network/api/server.py index 0c49343f8..43cf34024 100644 --- a/fedn/fedn/network/api/server.py +++ b/fedn/fedn/network/api/server.py @@ -298,8 +298,6 @@ def get_events(): # TODO: except filter with request.get_json() kwargs = request.args.to_dict() - print("get_events") - print(kwargs) return api.get_events(**kwargs) diff --git a/fedn/fedn/network/dashboard/restservice.py b/fedn/fedn/network/dashboard/restservice.py index 3c272349e..5dfde03c0 100644 --- a/fedn/fedn/network/dashboard/restservice.py +++ b/fedn/fedn/network/dashboard/restservice.py @@ -12,9 +12,19 @@ import pandas as pd from bokeh.embed import json_item from bson import json_util -from flask import (Flask, abort, flash, jsonify, make_response, redirect, - render_template, request, send_file, send_from_directory, - url_for) +from flask import ( + Flask, + abort, + flash, + jsonify, + make_response, + redirect, + render_template, + request, + send_file, + send_from_directory, + url_for, +) from werkzeug.utils import secure_filename from fedn.common.tracer.mongotracer import MongoTracer @@ -23,8 +33,8 @@ from fedn.network.state import ReducerState, ReducerStateToString from fedn.utils.checksum import sha -UPLOAD_FOLDER = '/app/client/package/' -ALLOWED_EXTENSIONS = {'gz', 'bz2', 'tar', 'zip', 'tgz'} +UPLOAD_FOLDER = "/app/client/package/" +ALLOWED_EXTENSIONS = {"gz", "bz2", "tar", "zip", "tgz"} def allowed_file(filename): @@ -33,8 +43,10 @@ def allowed_file(filename): :param filename: :return: """ - return '.' in filename and \ - filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS + return ( + "." in filename + and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS + ) def encode_auth_token(secret_key): @@ -43,16 +55,17 @@ def encode_auth_token(secret_key): """ try: payload = { - 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=90, seconds=0), - 'iat': datetime.datetime.utcnow(), - 'status': 'Success' + "exp": datetime.datetime.utcnow() + + datetime.timedelta(days=90, seconds=0), + "iat": datetime.datetime.utcnow(), + "status": "Success", } - token = jwt.encode( - payload, - secret_key, - algorithm='HS256' + token = jwt.encode(payload, secret_key, algorithm="HS256") + print( + "\n\n\nSECURE MODE ENABLED, USE TOKEN TO ACCESS REDUCER: **** {} ****\n\n\n".format( + token + ) ) - print('\n\n\nSECURE MODE ENABLED, USE TOKEN TO ACCESS REDUCER: **** {} ****\n\n\n'.format(token)) return token except Exception as e: return e @@ -64,56 +77,49 @@ def decode_auth_token(auth_token, secret): :return: string """ try: - payload = jwt.decode( - auth_token, - secret, - algorithms=['HS256'] - ) + payload = jwt.decode(auth_token, secret, algorithms=["HS256"]) return payload["status"] except jwt.ExpiredSignatureError as e: print(e) - return 'Token has expired.' + return "Token has expired." except jwt.InvalidTokenError as e: print(e) - return 'Invalid token.' + return "Invalid token." class ReducerRestService: - """ - - """ + """ """ def __init__(self, config, control, statestore, certificate_manager): - print("config object!: \n\n\n\n{}".format(config)) - if config['host']: - self.host = config['host'] + if config["host"]: + self.host = config["host"] else: self.host = None - self.name = config['name'] + self.name = config["name"] - self.port = config['port'] - self.network_id = config['name'] + '-network' + self.port = config["port"] + self.network_id = config["name"] + "-network" - if 'token' in config.keys(): + if "token" in config.keys(): self.token_auth_enabled = True else: self.token_auth_enabled = False - if 'secret_key' in config.keys(): - self.SECRET_KEY = config['secret_key'] + if "secret_key" in config.keys(): + self.SECRET_KEY = config["secret_key"] else: self.SECRET_KEY = None - if 'use_ssl' in config.keys(): - self.use_ssl = config['use_ssl'] + if "use_ssl" in config.keys(): + self.use_ssl = config["use_ssl"] self.remote_compute_package = config["remote_compute_package"] if self.remote_compute_package: - self.package = 'remote' + self.package = "remote" else: - self.package = 'local' + self.package = "local" self.control = control self.statestore = statestore @@ -125,9 +131,7 @@ def to_dict(self): :return: """ - data = { - 'name': self.name - } + data = {"name": self.name} return data def check_compute_package(self): @@ -165,24 +169,40 @@ def check_configured_response(self): :rtype: json """ if self.control.state() == ReducerState.setup: - return jsonify({'status': 'retry', - 'package': self.package, - 'msg': "Controller is not configured."}) + return jsonify( + { + "status": "retry", + "package": self.package, + "msg": "Controller is not configured.", + } + ) if not self.check_compute_package(): - return jsonify({'status': 'retry', - 'package': self.package, - 'msg': "Compute package is not configured. Please upload the compute package."}) + return jsonify( + { + "status": "retry", + "package": self.package, + "msg": "Compute package is not configured. Please upload the compute package.", + } + ) if not self.check_initial_model(): - return jsonify({'status': 'retry', - 'package': self.package, - 'msg': "Initial model is not configured. Please upload the model."}) + return jsonify( + { + "status": "retry", + "package": self.package, + "msg": "Initial model is not configured. Please upload the model.", + } + ) if not self.control.idle(): - return jsonify({'status': 'retry', - 'package': self.package, - 'msg': "Conroller is not in idle state, try again later. "}) + return jsonify( + { + "status": "retry", + "package": self.package, + "msg": "Conroller is not in idle state, try again later. ", + } + ) return None def check_configured(self): @@ -192,17 +212,29 @@ def check_configured(self): :return: Rendered html template or None """ if not self.check_compute_package(): - return render_template('setup.html', client=self.name, state=ReducerStateToString(self.control.state()), - logs=None, refresh=False, - message='Please set the compute package') + return render_template( + "setup.html", + client=self.name, + state=ReducerStateToString(self.control.state()), + logs=None, + refresh=False, + message="Please set the compute package", + ) if self.control.state() == ReducerState.setup: - return render_template('setup.html', client=self.name, state=ReducerStateToString(self.control.state()), - logs=None, refresh=True, - message='Warning. Reducer is not base-configured. please do so with config file.') + return render_template( + "setup.html", + client=self.name, + state=ReducerStateToString(self.control.state()), + logs=None, + refresh=True, + message="Warning. Reducer is not base-configured. please do so with config file.", + ) if not self.check_initial_model(): - return render_template('setup_model.html', message="Please set the initial model.") + return render_template( + "setup_model.html", message="Please set the initial model." + ) return None @@ -216,31 +248,37 @@ def authorize(self, r, secret): """ try: # Get token - if 'Authorization' in r.headers: # header auth - request_token = r.headers.get('Authorization').split()[1] - elif 'token' in r.args: # args auth - request_token = str(r.args.get('token')) - elif 'fedn_token' in r.cookies: - request_token = r.cookies.get('fedn_token') + if "Authorization" in r.headers: # header auth + request_token = r.headers.get("Authorization").split()[1] + elif "token" in r.args: # args auth + request_token = str(r.args.get("token")) + elif "fedn_token" in r.cookies: + request_token = r.cookies.get("fedn_token") else: # no token provided - print('Authorization failed. No token provided.', flush=True) + print("Authorization failed. No token provided.", flush=True) abort(401) # Log token and secret print( - f'Secret: {secret}. Request token: {request_token}.', flush=True) + f"Secret: {secret}. Request token: {request_token}.", + flush=True, + ) # Authenticate status = decode_auth_token(request_token, secret) - if status == 'Success': + if status == "Success": return True else: - print('Authorization failed. Status: "{}"'.format( - status), flush=True) + print( + 'Authorization failed. Status: "{}"'.format(status), + flush=True, + ) abort(401) except Exception as e: - print('Authorization failed. Expection encountered: "{}".'.format( - e), flush=True) + print( + 'Authorization failed. Expection encountered: "{}".'.format(e), + flush=True, + ) abort(401) def run(self): @@ -250,10 +288,10 @@ def run(self): """ app = Flask(__name__) - app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER - app.config['SECRET_KEY'] = self.SECRET_KEY + app.config["UPLOAD_FOLDER"] = UPLOAD_FOLDER + app.config["SECRET_KEY"] = self.SECRET_KEY - @app.route('/') + @app.route("/") def index(): """ @@ -261,7 +299,7 @@ def index(): """ # Token auth if self.token_auth_enabled: - self.authorize(request, app.config.get('SECRET_KEY')) + self.authorize(request, app.config.get("SECRET_KEY")) # Render template not_configured_template = self.check_configured() @@ -269,29 +307,37 @@ def index(): template = not_configured_template else: events = self.control.get_events() - message = request.args.get('message', None) - message_type = request.args.get('message_type', None) - template = render_template('events.html', client=self.name, state=ReducerStateToString(self.control.state()), - events=events, - logs=None, refresh=True, configured=True, message=message, message_type=message_type) + message = request.args.get("message", None) + message_type = request.args.get("message_type", None) + template = render_template( + "events.html", + client=self.name, + state=ReducerStateToString(self.control.state()), + events=events, + logs=None, + refresh=True, + configured=True, + message=message, + message_type=message_type, + ) # Set token cookie in response if needed response = make_response(template) - if 'token' in request.args: # args auth - response.set_cookie('fedn_token', str(request.args['token'])) + if "token" in request.args: # args auth + response.set_cookie("fedn_token", str(request.args["token"])) # Return response return response - @app.route('/status') + @app.route("/status") def status(): """ :return: """ - return {'state': ReducerStateToString(self.control.state())} + return {"state": ReducerStateToString(self.control.state())} - @app.route('/netgraph') + @app.route("/netgraph") def netgraph(): """ Creates nodes and edges for network graph @@ -299,16 +345,18 @@ def netgraph(): :return: nodes and edges as keys :rtype: dict """ - result = {'nodes': [], 'edges': []} - - result['nodes'].append({ - "id": "reducer", - "label": "Reducer", - "role": 'reducer', - "status": 'active', - "name": 'reducer', # TODO: get real host name - "type": 'reducer', - }) + result = {"nodes": [], "edges": []} + + result["nodes"].append( + { + "id": "reducer", + "label": "Reducer", + "role": "reducer", + "status": "active", + "name": "reducer", # TODO: get real host name + "type": "reducer", + } + ) combiner_info = combiner_status() client_info = client_status() @@ -319,49 +367,55 @@ def netgraph(): for combiner in combiner_info: print("combiner info {}".format(combiner_info), flush=True) try: - result['nodes'].append({ - "id": combiner['name'], # "n{}".format(count), - "label": "Combiner ({} clients)".format(combiner['nr_active_clients']), - "role": 'combiner', - "status": 'active', # TODO: Hard-coded, combiner_info does not contain status - "name": combiner['name'], - "type": 'combiner', - }) + result["nodes"].append( + { + "id": combiner["name"], # "n{}".format(count), + "label": "Combiner ({} clients)".format( + combiner["nr_active_clients"] + ), + "role": "combiner", + "status": "active", # TODO: Hard-coded, combiner_info does not contain status + "name": combiner["name"], + "type": "combiner", + } + ) except Exception as err: print(err) - for client in client_info['active_clients']: + for client in client_info["active_clients"]: try: - if client['status'] != 'offline': - result['nodes'].append({ - "id": str(client['_id']), - "label": "Client", - "role": client['role'], - "status": client['status'], - "name": client['name'], - "combiner": client['combiner'], - "type": 'client', - }) + if client["status"] != "offline": + result["nodes"].append( + { + "id": str(client["_id"]), + "label": "Client", + "role": client["role"], + "status": client["status"], + "name": client["name"], + "combiner": client["combiner"], + "type": "client", + } + ) except Exception as err: print(err) count = 0 - for node in result['nodes']: + for node in result["nodes"]: try: - if node['type'] == 'combiner': - result['edges'].append( + if node["type"] == "combiner": + result["edges"].append( { "id": "e{}".format(count), - "source": node['id'], - "target": 'reducer', + "source": node["id"], + "target": "reducer", } ) - elif node['type'] == 'client': - result['edges'].append( + elif node["type"] == "client": + result["edges"].append( { "id": "e{}".format(count), - "source": node['combiner'], - "target": node['id'], + "source": node["combiner"], + "target": node["id"], } ) except Exception: @@ -369,59 +423,75 @@ def netgraph(): count = count + 1 return result - @app.route('/networkgraph') + @app.route("/networkgraph") def network_graph(): - try: plot = Plot(self.control.statestore) result = netgraph() - df_nodes = pd.DataFrame(result['nodes']) - df_edges = pd.DataFrame(result['edges']) + df_nodes = pd.DataFrame(result["nodes"]) + df_edges = pd.DataFrame(result["edges"]) graph = plot.make_netgraph_plot(df_edges, df_nodes) return json.dumps(json_item(graph, "myplot")) except Exception: raise # return '' - @app.route('/events') + @app.route("/events") def events(): """ :return: """ + response = self.control.get_events() + events = [] + + result = response["result"] + + for evt in result: + events.append(evt) + + return jsonify({"result": events, "count": response["count"]}) + json_docs = [] for doc in self.control.get_events(): json_doc = json.dumps(doc, default=json_util.default) json_docs.append(json_doc) json_docs.reverse() - return {'events': json_docs} - @app.route('/add') + return {"events": json_docs} + + @app.route("/add") def add(): - """ Add a combiner to the network. """ + """Add a combiner to the network.""" print("Adding combiner to network:", flush=True) if self.token_auth_enabled: - self.authorize(request, app.config.get('SECRET_KEY')) + self.authorize(request, app.config.get("SECRET_KEY")) if self.control.state() == ReducerState.setup: - return jsonify({'status': 'retry'}) - - name = request.args.get('name', None) - address = str(request.args.get('address', None)) - fqdn = str(request.args.get('fqdn', None)) - port = request.args.get('port', None) - secure_grpc = request.args.get('secure', None) - - if port is None or address is None or name is None or secure_grpc is None: + return jsonify({"status": "retry"}) + + name = request.args.get("name", None) + address = str(request.args.get("address", None)) + fqdn = str(request.args.get("fqdn", None)) + port = request.args.get("port", None) + secure_grpc = request.args.get("secure", None) + + if ( + port is None + or address is None + or name is None + or secure_grpc is None + ): return "Please specify correct parameters." # Try to retrieve combiner from db combiner = self.control.network.get_combiner(name) if not combiner: - if secure_grpc == 'True': + if secure_grpc == "True": certificate, key = self.certificate_manager.get_or_create( - address).get_keypair_raw() + address + ).get_keypair_raw() _ = base64.b64encode(certificate) _ = base64.b64encode(key) @@ -437,23 +507,24 @@ def add(): port=port, certificate=copy.deepcopy(certificate), key=copy.deepcopy(key), - ip=request.remote_addr) + ip=request.remote_addr, + ) self.control.network.add_combiner(combiner) combiner = self.control.network.get_combiner(name) ret = { - 'status': 'added', - 'storage': self.control.statestore.get_storage_backend(), - 'statestore': self.control.statestore.get_config(), - 'certificate': combiner.get_certificate(), - 'key': combiner.get_key() + "status": "added", + "storage": self.control.statestore.get_storage_backend(), + "statestore": self.control.statestore.get_config(), + "certificate": combiner.get_certificate(), + "key": combiner.get_key(), } return jsonify(ret) - @app.route('/eula', methods=['GET', 'POST']) + @app.route("/eula", methods=["GET", "POST"]) def eula(): """ @@ -462,9 +533,9 @@ def eula(): for r in request.headers: print("header contains: {}".format(r), flush=True) - return render_template('eula.html', configured=True) + return render_template("eula.html", configured=True) - @app.route('/models', methods=['GET', 'POST']) + @app.route("/models", methods=["GET", "POST"]) def models(): """ @@ -472,13 +543,12 @@ def models(): """ # Token auth if self.token_auth_enabled: - self.authorize(request, app.config.get('SECRET_KEY')) + self.authorize(request, app.config.get("SECRET_KEY")) - if request.method == 'POST': + if request.method == "POST": # upload seed file - uploaded_seed = request.files['seed'] + uploaded_seed = request.files["seed"] if uploaded_seed: - a = BytesIO() a.seek(0, 0) uploaded_seed.seek(0) @@ -504,23 +574,31 @@ def models(): h_latest_model_id = self.statestore.get_latest_model() model_info = self.control.get_model_info() - return render_template('models.html', box_plot=box_plot, metrics=valid_metrics, h_latest_model_id=h_latest_model_id, seed=True, - model_info=model_info, configured=True) + return render_template( + "models.html", + box_plot=box_plot, + metrics=valid_metrics, + h_latest_model_id=h_latest_model_id, + seed=True, + model_info=model_info, + configured=True, + ) seed = True - return redirect(url_for('models', seed=seed)) + return redirect(url_for("models", seed=seed)) - @app.route('/delete_model_trail', methods=['GET', 'POST']) + @app.route("/delete_model_trail", methods=["GET", "POST"]) def delete_model_trail(): """ :return: """ - if request.method == 'POST': - + if request.method == "POST": statestore_config = self.control.statestore.get_config() self.tracer = MongoTracer( - statestore_config['mongo_config'], statestore_config['network_id']) + statestore_config["mongo_config"], + statestore_config["network_id"], + ) try: self.control.drop_models() except Exception: @@ -528,28 +606,28 @@ def delete_model_trail(): # drop objects in minio self.control.delete_bucket_objects() - return redirect(url_for('models')) + return redirect(url_for("models")) seed = True - return redirect(url_for('models', seed=seed)) + return redirect(url_for("models", seed=seed)) - @app.route('/drop_control', methods=['GET', 'POST']) + @app.route("/drop_control", methods=["GET", "POST"]) def drop_control(): """ :return: """ - if request.method == 'POST': + if request.method == "POST": self.control.statestore.drop_control() - return redirect(url_for('control')) - return redirect(url_for('control')) + return redirect(url_for("control")) + return redirect(url_for("control")) # http://localhost:8090/control?rounds=4&model_id=879fa112-c861-4cb1-a25d-775153e5b548 - @app.route('/control', methods=['GET', 'POST']) + @app.route("/control", methods=["GET", "POST"]) def control(): - """ Main page for round control. Configure, start and stop training sessions. """ + """Main page for round control. Configure, start and stop training sessions.""" # Token auth if self.token_auth_enabled: - self.authorize(request, app.config.get('SECRET_KEY')) + self.authorize(request, app.config.get("SECRET_KEY")) not_configured = self.check_configured() if not_configured: @@ -560,60 +638,88 @@ def control(): if self.remote_compute_package: try: - self.current_compute_context = self.control.get_compute_package_name() + self.current_compute_context = ( + self.control.get_compute_package_name() + ) except Exception: self.current_compute_context = None else: self.current_compute_context = "None:Local" if self.control.state() == ReducerState.monitoring: return redirect( - url_for('index', state=state, refresh=refresh, message="Reducer is in monitoring state")) - - if request.method == 'POST': + url_for( + "index", + state=state, + refresh=refresh, + message="Reducer is in monitoring state", + ) + ) + + if request.method == "POST": # Get session configuration - round_timeout = float(request.form.get('timeout', 180)) - buffer_size = int(request.form.get('buffer_size', -1)) - rounds = int(request.form.get('rounds', 1)) - delete_models = request.form.get('delete_models', True) - task = (request.form.get('task', '')) - clients_required = request.form.get('clients_required', 1) - clients_requested = request.form.get('clients_requested', 8) + round_timeout = float(request.form.get("timeout", 180)) + buffer_size = int(request.form.get("buffer_size", -1)) + rounds = int(request.form.get("rounds", 1)) + delete_models = request.form.get("delete_models", True) + task = request.form.get("task", "") + clients_required = request.form.get("clients_required", 1) + clients_requested = request.form.get("clients_requested", 8) # checking if there are enough clients connected to start! clients_available = 0 for combiner in self.control.network.get_combiners(): try: combiner_state = combiner.report() - nac = combiner_state['nr_active_clients'] + nac = combiner_state["nr_active_clients"] clients_available = clients_available + int(nac) except Exception: pass if clients_available < clients_required: - return redirect(url_for('index', state=state, - message="Not enough clients available to start rounds! " - "check combiner client capacity", - message_type='warning')) + return redirect( + url_for( + "index", + state=state, + message="Not enough clients available to start rounds! " + "check combiner client capacity", + message_type="warning", + ) + ) - validate = request.form.get('validate', False) - if validate == 'False': + validate = request.form.get("validate", False) + if validate == "False": validate = False - helper_type = request.form.get('helper', 'keras') + helper_type = request.form.get("helper", "keras") # self.control.statestore.set_framework(helper_type) latest_model_id = self.statestore.get_latest_model() - config = {'round_timeout': round_timeout, 'buffer_size': buffer_size, - 'model_id': latest_model_id, 'rounds': rounds, 'delete_models_storage': delete_models, - 'clients_required': clients_required, - 'clients_requested': clients_requested, 'task': task, - 'validate': validate, 'helper_type': helper_type} - - threading.Thread(target=self.control.session, - args=(config,)).start() + config = { + "round_timeout": round_timeout, + "buffer_size": buffer_size, + "model_id": latest_model_id, + "rounds": rounds, + "delete_models_storage": delete_models, + "clients_required": clients_required, + "clients_requested": clients_requested, + "task": task, + "validate": validate, + "helper_type": helper_type, + } + + threading.Thread( + target=self.control.session, args=(config,) + ).start() - return redirect(url_for('index', state=state, refresh=refresh, message="Sent execution plan.", - message_type='SUCCESS')) + return redirect( + url_for( + "index", + state=state, + refresh=refresh, + message="Sent execution plan.", + message_type="SUCCESS", + ) + ) else: seed_model_id = None @@ -624,42 +730,53 @@ def control(): except Exception: pass - return render_template('index.html', latest_model_id=latest_model_id, - compute_package=self.current_compute_context, - seed_model_id=seed_model_id, - helper=self.control.statestore.get_helper(), validate=True, configured=True) - - @app.route('/assign') + return render_template( + "index.html", + latest_model_id=latest_model_id, + compute_package=self.current_compute_context, + seed_model_id=seed_model_id, + helper=self.control.statestore.get_helper(), + validate=True, + configured=True, + ) + + @app.route("/assign") def assign(): - """Handle client assignment requests. """ + """Handle client assignment requests.""" if self.token_auth_enabled: - self.authorize(request, app.config.get('SECRET_KEY')) + self.authorize(request, app.config.get("SECRET_KEY")) response = self.check_configured_response() if response: return response - name = request.args.get('name', None) - combiner_preferred = request.args.get('combiner', None) + name = request.args.get("name", None) + combiner_preferred = request.args.get("combiner", None) if combiner_preferred: - combiner = self.control.network.get_combiner(combiner_preferred) + combiner = self.control.network.get_combiner( + combiner_preferred + ) else: combiner = self.control.network.find_available_combiner() if combiner is None: - return jsonify({'status': 'retry', - 'package': self.package, - 'msg': "Failed to assign to a combiner, try again later."}) + return jsonify( + { + "status": "retry", + "package": self.package, + "msg": "Failed to assign to a combiner, try again later.", + } + ) client = { - 'name': name, - 'combiner_preferred': combiner_preferred, - 'combiner': combiner.name, - 'ip': request.remote_addr, - 'status': 'available' + "name": name, + "combiner_preferred": combiner_preferred, + "combiner": combiner.name, + "ip": request.remote_addr, + "status": "available", } # Add client to database @@ -668,25 +785,25 @@ def assign(): # Return connection information to client if combiner.certificate: cert_b64 = base64.b64encode(combiner.certificate) - cert = str(cert_b64).split('\'')[1] + cert = str(cert_b64).split("'")[1] else: cert = None response = { - 'status': 'assigned', - 'host': combiner.address, - 'fqdn': combiner.fqdn, - 'package': self.package, - 'ip': combiner.ip, - 'port': combiner.port, - 'certificate': cert, - 'model_type': self.control.statestore.get_helper() + "status": "assigned", + "host": combiner.address, + "fqdn": combiner.fqdn, + "package": self.package, + "ip": combiner.ip, + "port": combiner.port, + "certificate": cert, + "model_type": self.control.statestore.get_helper(), } return jsonify(response) def combiner_status(): - """ Get current status reports from all combiners registered in the network. + """Get current status reports from all combiners registered in the network. :return: """ @@ -711,67 +828,90 @@ def client_status(): all_active_validators = [] for client in combiner_info: - active_trainers_str = client['active_trainers'] - active_validators_str = client['active_validators'] + active_trainers_str = client["active_trainers"] + active_validators_str = client["active_validators"] active_trainers_str = re.sub( - '[^a-zA-Z0-9-:\n\.]', '', active_trainers_str).replace('name:', ' ') # noqa: W605 + "[^a-zA-Z0-9-:\n\.]", "", active_trainers_str + ).replace( + "name:", " " + ) # noqa: W605 active_validators_str = re.sub( - '[^a-zA-Z0-9-:\n\.]', '', active_validators_str).replace('name:', ' ') # noqa: W605 + "[^a-zA-Z0-9-:\n\.]", "", active_validators_str + ).replace( + "name:", " " + ) # noqa: W605 all_active_trainers.extend( - ' '.join(active_trainers_str.split(" ")).split()) + " ".join(active_trainers_str.split(" ")).split() + ) all_active_validators.extend( - ' '.join(active_validators_str.split(" ")).split()) + " ".join(active_validators_str.split(" ")).split() + ) active_trainers_list = [ - client for client in client_info if client['name'] in all_active_trainers] + client + for client in client_info + if client["name"] in all_active_trainers + ] active_validators_list = [ - cl for cl in client_info if cl['name'] in all_active_validators] + cl + for cl in client_info + if cl["name"] in all_active_validators + ] all_clients = [cl for cl in client_info] for client in all_clients: - status = 'offline' - role = 'None' + status = "offline" + role = "None" self.control.network.update_client_data( - client, status, role) + client, status, role + ) - all_active_clients = active_validators_list + active_trainers_list + all_active_clients = ( + active_validators_list + active_trainers_list + ) for client in all_active_clients: - status = 'active' - if client in active_trainers_list and client in active_validators_list: - role = 'trainer-validator' + status = "active" + if ( + client in active_trainers_list + and client in active_validators_list + ): + role = "trainer-validator" elif client in active_trainers_list: - role = 'trainer' + role = "trainer" elif client in active_validators_list: - role = 'validator' + role = "validator" else: - role = 'unknown' + role = "unknown" self.control.network.update_client_data( - client, status, role) - - return {'active_clients': all_clients, - 'active_trainers': active_trainers_list, - 'active_validators': active_validators_list - } + client, status, role + ) + + return { + "active_clients": all_clients, + "active_trainers": active_trainers_list, + "active_validators": active_validators_list, + } except Exception: pass - return {'active_clients': [], - 'active_trainers': [], - 'active_validators': [] - } + return { + "active_clients": [], + "active_trainers": [], + "active_validators": [], + } - @app.route('/metric_type', methods=['GET', 'POST']) + @app.route("/metric_type", methods=["GET", "POST"]) def change_features(): """ :return: """ - feature = request.args['selected'] + feature = request.args["selected"] plot = Plot(self.control.statestore) graphJSON = plot.create_box_plot(feature) return graphJSON - @app.route('/dashboard') + @app.route("/dashboard") def dashboard(): """ @@ -779,7 +919,7 @@ def dashboard(): """ # Token auth if self.token_auth_enabled: - self.authorize(request, app.config.get('SECRET_KEY')) + self.authorize(request, app.config.get("SECRET_KEY")) not_configured = self.check_configured() if not_configured: @@ -793,16 +933,18 @@ def dashboard(): clients_plot = plot.create_client_plot() client_histogram_plot = plot.create_client_histogram_plot() - return render_template('dashboard.html', show_plot=True, - table_plot=table_plot, - timeline_plot=timeline_plot, - clients_plot=clients_plot, - client_histogram_plot=client_histogram_plot, - combiners_plot=combiners_plot, - configured=True - ) - - @app.route('/network') + return render_template( + "dashboard.html", + show_plot=True, + table_plot=table_plot, + timeline_plot=timeline_plot, + clients_plot=clients_plot, + client_histogram_plot=client_histogram_plot, + combiners_plot=combiners_plot, + configured=True, + ) + + @app.route("/network") def network(): """ @@ -810,7 +952,7 @@ def network(): """ # Token auth if self.token_auth_enabled: - self.authorize(request, app.config.get('SECRET_KEY')) + self.authorize(request, app.config.get("SECRET_KEY")) not_configured = self.check_configured() if not_configured: @@ -821,17 +963,19 @@ def network(): combiner_info = combiner_status() active_clients = client_status() # print(combiner_info, flush=True) - return render_template('network.html', network_plot=True, - round_time_plot=round_time_plot, - mem_cpu_plot=mem_cpu_plot, - combiner_info=combiner_info, - active_clients=active_clients['active_clients'], - active_trainers=active_clients['active_trainers'], - active_validators=active_clients['active_validators'], - configured=True - ) - - @app.route('/config/download', methods=['GET']) + return render_template( + "network.html", + network_plot=True, + round_time_plot=round_time_plot, + mem_cpu_plot=mem_cpu_plot, + combiner_info=combiner_info, + active_clients=active_clients["active_clients"], + active_trainers=active_clients["active_trainers"], + active_validators=active_clients["active_validators"], + configured=True, + ) + + @app.route("/config/download", methods=["GET"]) def config_download(): """ @@ -839,8 +983,8 @@ def config_download(): """ chk_string = "" name = self.control.get_compute_package_name() - if name is None or name == '': - chk_string = '' + if name is None or name == "": + chk_string = "" else: file_path = os.path.join(UPLOAD_FOLDER, name) print("trying to get {}".format(file_path)) @@ -848,7 +992,7 @@ def config_download(): try: sum = str(sha(file_path)) except FileNotFoundError: - sum = '' + sum = "" chk_string = "checksum: {}".format(sum) network_id = self.network_id @@ -857,20 +1001,24 @@ def config_download(): ctx = """network_id: {network_id} discover_host: {discover_host} discover_port: {discover_port} -{chk_string}""".format(network_id=network_id, - discover_host=discover_host, - discover_port=discover_port, - chk_string=chk_string) +{chk_string}""".format( + network_id=network_id, + discover_host=discover_host, + discover_port=discover_port, + chk_string=chk_string, + ) obj = BytesIO() - obj.write(ctx.encode('UTF-8')) + obj.write(ctx.encode("UTF-8")) obj.seek(0) - return send_file(obj, - as_attachment=True, - download_name='client.yaml', - mimetype='application/x-yaml') - - @app.route('/context', methods=['GET', 'POST']) + return send_file( + obj, + as_attachment=True, + download_name="client.yaml", + mimetype="application/x-yaml", + ) + + @app.route("/context", methods=["GET", "POST"]) def context(): """ @@ -878,78 +1026,85 @@ def context(): """ # Token auth if self.token_auth_enabled: - self.authorize(request, app.config.get('SECRET_KEY')) + self.authorize(request, app.config.get("SECRET_KEY")) # if reset is not empty then allow context re-set - reset = request.args.get('reset', None) + reset = request.args.get("reset", None) if reset: - return render_template('context.html') + return render_template("context.html") - if request.method == 'POST': + if request.method == "POST": + if "file" not in request.files: + flash("No file part") + return redirect(url_for("context")) - if 'file' not in request.files: - flash('No file part') - return redirect(url_for('context')) - - file = request.files['file'] - helper_type = request.form.get('helper', 'kerashelper') + file = request.files["file"] + helper_type = request.form.get("helper", "kerashelper") # if user does not select file, browser also # submit an empty part without filename - if file.filename == '': - flash('No selected file') - return redirect(url_for('context')) + if file.filename == "": + flash("No selected file") + return redirect(url_for("context")) if file and allowed_file(file.filename): filename = secure_filename(file.filename) file_path = os.path.join( - app.config['UPLOAD_FOLDER'], filename) + app.config["UPLOAD_FOLDER"], filename + ) file.save(file_path) - if self.control.state() == ReducerState.instructing or self.control.state() == ReducerState.monitoring: + if ( + self.control.state() == ReducerState.instructing + or self.control.state() == ReducerState.monitoring + ): return "Not allowed to change context while execution is ongoing." self.control.set_compute_package(filename, file_path) self.control.statestore.set_helper(helper_type) - return redirect(url_for('control')) + return redirect(url_for("control")) - name = request.args.get('name', '') + name = request.args.get("name", "") - if name == '': + if name == "": name = self.control.get_compute_package_name() - if name is None or name == '': - return render_template('context.html') + if name is None or name == "": + return render_template("context.html") # There is a potential race condition here, if one client requests a package and at # the same time another one triggers a fetch from Minio and writes to disk. try: mutex = Lock() mutex.acquire() - return send_from_directory(app.config['UPLOAD_FOLDER'], name, as_attachment=True) + return send_from_directory( + app.config["UPLOAD_FOLDER"], name, as_attachment=True + ) except Exception: try: data = self.control.get_compute_package(name) - file_path = os.path.join(app.config['UPLOAD_FOLDER'], name) - with open(file_path, 'wb') as fh: + file_path = os.path.join(app.config["UPLOAD_FOLDER"], name) + with open(file_path, "wb") as fh: fh.write(data) - return send_from_directory(app.config['UPLOAD_FOLDER'], name, as_attachment=True) + return send_from_directory( + app.config["UPLOAD_FOLDER"], name, as_attachment=True + ) except Exception: raise finally: mutex.release() - return render_template('context.html') + return render_template("context.html") - @app.route('/checksum', methods=['GET', 'POST']) + @app.route("/checksum", methods=["GET", "POST"]) def checksum(): """ :return: """ # sum = '' - name = request.args.get('name', None) - if name == '' or name is None: + name = request.args.get("name", None) + if name == "" or name is None: name = self.control.get_compute_package_name() - if name is None or name == '': + if name is None or name == "": return jsonify({}) file_path = os.path.join(UPLOAD_FOLDER, name) @@ -958,13 +1113,13 @@ def checksum(): try: sum = str(sha(file_path)) except FileNotFoundError: - sum = '' + sum = "" - data = {'checksum': sum} + data = {"checksum": sum} return jsonify(data) - @app.route('/infer', methods=['POST']) + @app.route("/infer", methods=["POST"]) def infer(): """ @@ -972,7 +1127,7 @@ def infer(): """ # Token auth if self.token_auth_enabled: - self.authorize(request, app.config.get('SECRET_KEY')) + self.authorize(request, app.config.get("SECRET_KEY")) # Check configured not_configured = self.check_configured() @@ -982,7 +1137,9 @@ def infer(): # Check compute context if self.remote_compute_context: try: - self.current_compute_context = self.control.get_compute_package() + self.current_compute_context = ( + self.control.get_compute_package() + ) except Exception as e: print(e, flush=True) self.current_compute_context = None @@ -992,27 +1149,43 @@ def infer(): # Redirect if in monitoring state if self.control.state() == ReducerState.monitoring: return redirect( - url_for('index', state=ReducerStateToString(self.control.state()), refresh=True, message="Reducer is in monitoring state")) + url_for( + "index", + state=ReducerStateToString(self.control.state()), + refresh=True, + message="Reducer is in monitoring state", + ) + ) # POST params - timeout = int(request.form.get('timeout', 180)) - helper_type = request.form.get('helper', 'keras') - clients_required = request.form.get('clients_required', 1) - clients_requested = request.form.get('clients_requested', 8) + timeout = int(request.form.get("timeout", 180)) + helper_type = request.form.get("helper", "keras") + clients_required = request.form.get("clients_required", 1) + clients_requested = request.form.get("clients_requested", 8) # Start inference request - config = {'round_timeout': timeout, - 'model_id': self.statestore.get_latest_model(), - 'clients_required': clients_required, - 'clients_requested': clients_requested, - 'task': 'inference', - 'helper_type': helper_type} - threading.Thread(target=self.control.infer_instruct, - args=(config,)).start() + config = { + "round_timeout": timeout, + "model_id": self.statestore.get_latest_model(), + "clients_required": clients_required, + "clients_requested": clients_requested, + "task": "inference", + "helper_type": helper_type, + } + threading.Thread( + target=self.control.infer_instruct, args=(config,) + ).start() # Redirect - return redirect(url_for('index', state=ReducerStateToString(self.control.state()), refresh=True, message="Sent execution plan (inference).", - message_type='SUCCESS')) + return redirect( + url_for( + "index", + state=ReducerStateToString(self.control.state()), + refresh=True, + message="Sent execution plan (inference).", + message_type="SUCCESS", + ) + ) if not self.host: bind = "0.0.0.0" diff --git a/fedn/fedn/network/dashboard/templates/events.html b/fedn/fedn/network/dashboard/templates/events.html index d3c34beb5..1fb5fac74 100644 --- a/fedn/fedn/network/dashboard/templates/events.html +++ b/fedn/fedn/network/dashboard/templates/events.html @@ -3,41 +3,44 @@ {% block content %} -
-
-
Events
-
-
- - - - + + + -
- -
+ }); + +
+
+ -{% endblock %} +{% endblock %} \ No newline at end of file diff --git a/fedn/fedn/network/statestore/mongostatestore.py b/fedn/fedn/network/statestore/mongostatestore.py index 980801569..ae30a64c3 100644 --- a/fedn/fedn/network/statestore/mongostatestore.py +++ b/fedn/fedn/network/statestore/mongostatestore.py @@ -416,8 +416,16 @@ def get_events(self, **kwargs): :rtype: ObjectId """ # check if kwargs is empty + + result = None + count = None + projection = {"_id": False} + if not kwargs: - result = self.control.status.find() + result = self.control.status.find({}, projection).sort( + "timestamp", pymongo.DESCENDING + ) + count = self.control.status.count_documents({}) else: limit = kwargs.pop("limit", None) skip = kwargs.pop("skip", None) @@ -426,12 +434,22 @@ def get_events(self, **kwargs): limit = int(limit) skip = int(skip) result = ( - self.control.status.find(kwargs).limit(limit).skip(skip) + self.control.status.find(kwargs, projection) + .sort("timestamp", pymongo.DESCENDING) + .limit(limit) + .skip(skip) ) else: - result = self.control.status.find(kwargs) + result = self.control.status.find(kwargs, projection).sort( + "timestamp", pymongo.DESCENDING + ) - return result + count = self.control.status.count_documents(kwargs) + + return { + "result": result, + "count": count, + } def get_storage_backend(self): """Get the storage backend.