diff --git a/fedn/fedn/common/certificate/certificate.py b/fedn/fedn/common/certificate/certificate.py index f3e76d950..a2c059748 100644 --- a/fedn/fedn/common/certificate/certificate.py +++ b/fedn/fedn/common/certificate/certificate.py @@ -5,6 +5,8 @@ from OpenSSL import crypto +from fedn.common.log_config import logger + class Certificate: """ @@ -20,9 +22,9 @@ def __init__(self, cwd, name=None, key_name="key.pem", cert_name="cert.pem", cre try: os.makedirs(cwd) except OSError: - print("Directory exists, will store all cert and keys here.") + logger.info("Directory exists, will store all cert and keys here.") else: - print( + logger.info( "Successfully created the directory to store cert and keys in {}".format(cwd)) self.key_path = os.path.join(cwd, key_name) diff --git a/fedn/fedn/common/log_config.py b/fedn/fedn/common/log_config.py index 47579cb82..0e61a6a83 100644 --- a/fedn/fedn/common/log_config.py +++ b/fedn/fedn/common/log_config.py @@ -1,19 +1,78 @@ import logging import logging.config +import os +import requests import urllib3 +log_levels = { + "DEBUG": logging.DEBUG, + "INFO": logging.INFO, + "WARNING": logging.WARNING, + "ERROR": logging.ERROR, + "CRITICAL": logging.CRITICAL +} + + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) logging.getLogger("urllib3").setLevel(logging.ERROR) handler = logging.StreamHandler() -logger = logging.getLogger() +logger = logging.getLogger("fedn") logger.addHandler(handler) logger.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S') handler.setFormatter(formatter) +class StudioHTTPHandler(logging.handlers.HTTPHandler): + def __init__(self, host, url, method='POST', token=None): + super().__init__(host, url, method) + self.token = token + + def emit(self, record): + log_entry = self.mapLogRecord(record) + + log_entry = { + "msg": log_entry['msg'], + "levelname": log_entry['levelname'], + "project": os.environ.get("PROJECT_ID"), + "appinstance": os.environ.get("APP_ID") + + } + # Setup headers + headers = { + 'Content-type': 'application/json', + } + if self.token: + remote_token_protocol = os.environ.get('FEDN_REMOTE_LOG_TOKEN_PROTOCOL', "Token") + headers['Authorization'] = f"{remote_token_protocol} {self.token}" + if self.method.lower() == 'post': + requests.post(self.host+self.url, json=log_entry, headers=headers) + else: + # No other methods implemented. + return + + +# Remote logging can only be configured via environment variables for now. +REMOTE_LOG_SERVER = os.environ.get('FEDN_REMOTE_LOG_SERVER', False) +REMOTE_LOG_PATH = os.environ.get('FEDN_REMOTE_LOG_PATH', False) +REMOTE_LOG_LEVEL = os.environ.get('FEDN_REMOTE_LOG_LEVEL', 'INFO') + +if REMOTE_LOG_SERVER: + rloglevel = log_levels.get(REMOTE_LOG_LEVEL, logging.INFO) + remote_token = os.environ.get('FEDN_REMOTE_LOG_TOKEN', None) + + http_handler = StudioHTTPHandler( + host=REMOTE_LOG_SERVER, + url=REMOTE_LOG_PATH, + method='POST', + token=remote_token + ) + http_handler.setLevel(rloglevel) + logger.addHandler(http_handler) + + def set_log_level_from_string(level_str): """ Set the log level based on a string input. diff --git a/fedn/fedn/network/api/interface.py b/fedn/fedn/network/api/interface.py index a4b1947a1..e788e1c37 100644 --- a/fedn/fedn/network/api/interface.py +++ b/fedn/fedn/network/api/interface.py @@ -9,6 +9,7 @@ from werkzeug.utils import secure_filename from fedn.common.config import get_controller_config, get_network_config +from fedn.common.log_config import logger from fedn.network.combiner.interfaces import (CombinerInterface, CombinerUnavailableError) from fedn.network.state import ReducerState, ReducerStateToString @@ -274,7 +275,7 @@ def _get_compute_package_name(self): name = package_objects["storage_file_name"] except KeyError as e: message = "No compute package found. Key error." - print(e) + logger.debug(e) return None, message return name, "success" @@ -655,7 +656,7 @@ def add_client(self, client_id, preferred_combiner, remote_addr): "certificate": cert, "helper_type": self.control.statestore.get_helper(), } - print("Seding payload: ", payload, flush=True) + logger.info(f"Sending payload: {payload}") return jsonify(payload) @@ -687,7 +688,7 @@ def set_initial_model(self, file): model = helper.load(object) self.control.commit(file.filename, model) except Exception as e: - print(e, flush=True) + logger.debug(e) return jsonify({"success": False, "message": e}) return jsonify( @@ -967,7 +968,7 @@ def get_plot_data(self, feature=None): except Exception as e: valid_metrics = None box_plot = None - print(e, flush=True) + logger.debug(e) result = { "valid_metrics": valid_metrics, @@ -1081,7 +1082,7 @@ def start_session( clients_available = clients_available + int(nr_active_clients) except CombinerUnavailableError as e: # TODO: Handle unavailable combiner, stop session or continue? - print("COMBINER UNAVAILABLE: {}".format(e), flush=True) + logger.error("COMBINER UNAVAILABLE: {}".format(e)) continue if clients_available < min_clients: diff --git a/fedn/fedn/network/api/network.py b/fedn/fedn/network/api/network.py index a4d4c7299..c6a3c3838 100644 --- a/fedn/fedn/network/api/network.py +++ b/fedn/fedn/network/api/network.py @@ -1,5 +1,6 @@ import base64 +from fedn.common.log_config import logger from fedn.network.combiner.interfaces import CombinerInterface from fedn.network.loadbalancer.leastpacked import LeastPacked @@ -67,13 +68,13 @@ def add_combiner(self, combiner): :return: None """ if not self.control.idle(): - print("Reducer is not idle, cannot add additional combiner.") + logger.warning("Reducer is not idle, cannot add additional combiner.") return if self.get_combiner(combiner.name): return - print("adding combiner {}".format(combiner.name), flush=True) + logger.info("adding combiner {}".format(combiner.name)) self.statestore.set_combiner(combiner.to_dict()) def remove_combiner(self, combiner): @@ -84,7 +85,7 @@ def remove_combiner(self, combiner): :return: None """ if not self.control.idle(): - print("Reducer is not idle, cannot remove combiner.") + logger.warning("Reducer is not idle, cannot remove combiner.") return self.statestore.delete_combiner(combiner.name) @@ -105,8 +106,8 @@ def handle_unavailable_combiner(self, combiner): :return: None """ # TODO: Implement strategy to handle an unavailable combiner. - print("REDUCER CONTROL: Combiner {} unavailable.".format( - combiner.name), flush=True) + logger.warning("REDUCER CONTROL: Combiner {} unavailable.".format( + combiner.name)) def add_client(self, client): """ Add a new client to the network. @@ -119,7 +120,7 @@ def add_client(self, client): if self.get_client(client['name']): return - print("adding client {}".format(client['name']), flush=True) + logger.info("adding client {}".format(client['name'])) self.statestore.set_client(client) def get_client(self, name): diff --git a/fedn/fedn/network/clients/client.py b/fedn/fedn/network/clients/client.py index 54c06ca51..691b592d5 100644 --- a/fedn/fedn/network/clients/client.py +++ b/fedn/fedn/network/clients/client.py @@ -60,7 +60,7 @@ def __init__(self, config): self._connected = False self._missed_heartbeat = 0 self.config = config - + self.trace_attribs = False set_log_level_from_string(config.get('verbosity', "INFO")) set_log_stream(config.get('logfile', None)) diff --git a/fedn/fedn/network/clients/connect.py b/fedn/fedn/network/clients/connect.py index 2e9345ebb..bbe2599b8 100644 --- a/fedn/fedn/network/clients/connect.py +++ b/fedn/fedn/network/clients/connect.py @@ -86,7 +86,7 @@ def assign(self): allow_redirects=True, headers={'Authorization': f"{FEDN_AUTH_SCHEME} {self.token}"}) except Exception as e: - print('***** {}'.format(e), flush=True) + logger.debug('***** {}'.format(e)) return Status.Unassigned, {} if retval.status_code == 400: diff --git a/fedn/fedn/network/combiner/aggregators/fedopt.py b/fedn/fedn/network/combiner/aggregators/fedopt.py index ccabb2789..b147bfe0a 100644 --- a/fedn/fedn/network/combiner/aggregators/fedopt.py +++ b/fedn/fedn/network/combiner/aggregators/fedopt.py @@ -75,7 +75,7 @@ def combine_models(self, helper=None, delete_models=True): logger.info( "AGGREGATOR({}): Processing model update {}, metadata: {} ".format(self.name, model_update.model_update_id, metadata)) - print("***** ", model_update, flush=True) + logger.info("***** {}".format(model_update)) # Increment total number of examples total_examples += metadata['num_examples'] diff --git a/fedn/fedn/network/combiner/combiner.py b/fedn/fedn/network/combiner/combiner.py index 5adf2d412..dfe109f9e 100644 --- a/fedn/fedn/network/combiner/combiner.py +++ b/fedn/fedn/network/combiner/combiner.py @@ -120,8 +120,6 @@ def __init__(self, config): 'certificate': cert, 'key': key} - print(announce_config, flush=True) - # Set up model repository self.repository = Repository( announce_config['storage']['storage_config']) @@ -564,7 +562,7 @@ def AcceptingClients(self, request: fedn.ConnectionRequest, context): return response except Exception as e: - logger.error("Combiner not properly configured! {}".format(e), flush=True) + logger.error("Combiner not properly configured! {}".format(e)) raise response.status = fedn.ConnectionStatus.TRY_AGAIN_LATER diff --git a/fedn/fedn/network/controller/control.py b/fedn/fedn/network/controller/control.py index 77004108b..2b64098cf 100644 --- a/fedn/fedn/network/controller/control.py +++ b/fedn/fedn/network/controller/control.py @@ -145,7 +145,7 @@ def round(self, session_config, round_id): self.create_round({'round_id': round_id, 'status': "Pending"}) if len(self.network.get_combiners()) < 1: - logger.warning("Round cannot start, no combiners connected!", flush=True) + logger.warning("Round cannot start, no combiners connected!") self.set_round_status(round_id, 'Failed') return None, self.statestore.get_round(round_id) diff --git a/fedn/fedn/network/storage/models/tempmodelstorage.py b/fedn/fedn/network/storage/models/tempmodelstorage.py index 7e32ea866..492cee5ab 100644 --- a/fedn/fedn/network/storage/models/tempmodelstorage.py +++ b/fedn/fedn/network/storage/models/tempmodelstorage.py @@ -2,6 +2,7 @@ from io import BytesIO import fedn.network.grpc.fedn_pb2 as fedn +from fedn.common.log_config import logger from fedn.network.storage.models.modelstorage import ModelStorage CHUNK_SIZE = 1024 * 1024 @@ -30,10 +31,10 @@ def get(self, model_id): try: if self.models_metadata[model_id] != fedn.ModelStatus.OK: - print("File not ready! Try again", flush=True) + logger.warning("File not ready! Try again") return None except KeyError: - print("No such model have been made available yet!", flush=True) + logger.error("No such model has been made available yet!") return None obj = BytesIO() @@ -74,12 +75,12 @@ def delete(self, model_id): try: os.remove(os.path.join(self.default_dir, str(model_id))) - print("TEMPMODELSTORAGE: Deleted model with id: {}".format(model_id), flush=True) + logger.info("TEMPMODELSTORAGE: Deleted model with id: {}".format(model_id)) # Delete id from metadata and models dict del self.models_metadata[model_id] del self.models[model_id] except FileNotFoundError: - print("Could not delete model from disk. File not found!", flush=True) + logger.error("Could not delete model from disk. File not found!") return False return True @@ -90,11 +91,11 @@ def delete_all(self): for model_id in self.models.keys(): try: os.remove(os.path.join(self.default_dir, str(model_id))) - print("TEMPMODELSTORAGE: Deleted model with id: {}".format(model_id), flush=True) + logger.info("TEMPMODELSTORAGE: Deleted model with id: {}".format(model_id)) # Add id to list of ids to pop/delete from metadata and models dict ids_pop.append(model_id) except FileNotFoundError: - print("TEMPMODELSTORAGE: Could not delete model {} from disk. File not found!".format(model_id), flush=True) + logger.error("TEMPMODELSTORAGE: Could not delete model {} from disk. File not found!".format(model_id)) # Remove id from metadata and models dict for model_id in ids_pop: del self.models_metadata[model_id] diff --git a/fedn/fedn/network/storage/statestore/mongostatestore.py b/fedn/fedn/network/storage/statestore/mongostatestore.py index a88e2a0e7..c4fdf3fe7 100644 --- a/fedn/fedn/network/storage/statestore/mongostatestore.py +++ b/fedn/fedn/network/storage/statestore/mongostatestore.py @@ -5,6 +5,7 @@ import pymongo from google.protobuf.json_format import MessageToDict +from fedn.common.log_config import logger from fedn.network.state import ReducerStateToString, StringToReducerState @@ -48,7 +49,7 @@ def __init__(self, network_id, config): self.__inited = True except Exception as e: - print("FAILED TO CONNECT TO MONGODB, {}".format(e), flush=True) + logger.error("FAILED TO CONNECT TO MONGODB, {}".format(e)) self.state = None self.model = None self.control = None @@ -124,7 +125,7 @@ def transition(self, state): True, ) else: - print( + logger.info( "Not updating state, already in {}".format( ReducerStateToString(state) ) @@ -278,7 +279,7 @@ def set_current_model(self, model_id: str): return True except Exception as e: - print("ERROR: {}".format(e), flush=True) + logger.error("ERROR: {}".format(e)) return False @@ -349,7 +350,7 @@ def set_active_compute_package(self, id: str): ) except Exception as e: - print("ERROR: {}".format(e), flush=True) + logger.error("ERROR: {}".format(e)) return False return True @@ -400,7 +401,7 @@ def get_compute_package(self): ret = self.control.package.find_one(find, projection) return ret except Exception as e: - print("ERROR: {}".format(e), flush=True) + logger.error("ERROR: {}".format(e)) return None def list_compute_packages(self, limit: int = None, skip: int = None, sort_key="committed_at", sort_order=pymongo.DESCENDING): @@ -433,7 +434,7 @@ def list_compute_packages(self, limit: int = None, skip: int = None, sort_key="c count = self.control.package.count_documents(find_option) except Exception as e: - print("ERROR: {}".format(e), flush=True) + logger.error("ERROR: {}".format(e)) return None return { @@ -780,9 +781,8 @@ def delete_combiner(self, combiner): try: self.combiners.delete_one({"name": combiner}) except Exception: - print( - "WARNING, failed to delete combiner: {}".format(combiner), - flush=True, + logger.error( + "Failed to delete combiner: {}".format(combiner), ) def set_client(self, client_data): @@ -843,7 +843,7 @@ def list_clients(self, limit=None, skip=None, status=None, sort_key="last_seen", count = self.clients.count_documents(find) except Exception as e: - print("ERROR: {}".format(e), flush=True) + logger.error("{}".format(e)) return { "result": result, @@ -879,7 +879,7 @@ def list_combiners_data(self, combiners, sort_key="count", sort_order=pymongo.DE result = self.clients.aggregate(pipeline) except Exception as e: - print("ERROR: {}".format(e), flush=True) + logger.error(e) return result diff --git a/fedn/fedn/utils/helpers/plugins/androidhelper.py b/fedn/fedn/utils/helpers/plugins/androidhelper.py index a5492d0c6..6a9fc7f9d 100644 --- a/fedn/fedn/utils/helpers/plugins/androidhelper.py +++ b/fedn/fedn/utils/helpers/plugins/androidhelper.py @@ -4,6 +4,7 @@ import numpy as np +from fedn.common.log_config import logger from fedn.utils.helpers.helperbase import HelperBase @@ -86,7 +87,7 @@ def load(self, fh): :param fh: file path, filehandle, filelike. :return: List of weights in json format. """ - print("in android helper load") + logger.debug("in android helper load") if isinstance(fh, str): with open(fh, "rb") as file: byte_data = file.read() diff --git a/fedn/fedn/utils/plots.py b/fedn/fedn/utils/plots.py index dedb27e87..8ec81aca2 100644 --- a/fedn/fedn/utils/plots.py +++ b/fedn/fedn/utils/plots.py @@ -6,6 +6,7 @@ import plotly.graph_objs as go from plotly.subplots import make_subplots +from fedn.common.log_config import logger from fedn.network.storage.statestore.mongostatestore import MongoStateStore @@ -27,7 +28,7 @@ def __init__(self, statestore): self.network_clients = self.mdb["network.clients"] except Exception as e: - print("FAILED TO CONNECT TO MONGO, {}".format(e), flush=True) + logger.error("FAILED TO CONNECT TO MONGO, {}".format(e)) self.collection = None raise diff --git a/fedn/fedn/utils/process.py b/fedn/fedn/utils/process.py index 95e6eaa63..a201eb386 100644 --- a/fedn/fedn/utils/process.py +++ b/fedn/fedn/utils/process.py @@ -15,7 +15,6 @@ def run_process(args, cwd): status = subprocess.Popen( args, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - # print(status) def check_io(): """ Check stdout/stderr of the child process.