Skip to content

Commit

Permalink
Feature/SK-718 | Enable remote logging (#540)
Browse files Browse the repository at this point in the history
Enables remote logging to Studio (if configured, default is off).
  • Loading branch information
stefanhellander authored Apr 8, 2024
1 parent 2386abe commit 6550fb7
Show file tree
Hide file tree
Showing 14 changed files with 104 additions and 41 deletions.
6 changes: 4 additions & 2 deletions fedn/fedn/common/certificate/certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from OpenSSL import crypto

from fedn.common.log_config import logger


class Certificate:
"""
Expand All @@ -20,9 +22,9 @@ def __init__(self, cwd, name=None, key_name="key.pem", cert_name="cert.pem", cre
try:
os.makedirs(cwd)
except OSError:
print("Directory exists, will store all cert and keys here.")
logger.info("Directory exists, will store all cert and keys here.")
else:
print(
logger.info(
"Successfully created the directory to store cert and keys in {}".format(cwd))

self.key_path = os.path.join(cwd, key_name)
Expand Down
61 changes: 60 additions & 1 deletion fedn/fedn/common/log_config.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,78 @@
import logging
import logging.config
import os

import requests
import urllib3

log_levels = {
"DEBUG": logging.DEBUG,
"INFO": logging.INFO,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL
}


urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logging.getLogger("urllib3").setLevel(logging.ERROR)

handler = logging.StreamHandler()
logger = logging.getLogger()
logger = logging.getLogger("fedn")
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
handler.setFormatter(formatter)


class StudioHTTPHandler(logging.handlers.HTTPHandler):
def __init__(self, host, url, method='POST', token=None):
super().__init__(host, url, method)
self.token = token

def emit(self, record):
log_entry = self.mapLogRecord(record)

log_entry = {
"msg": log_entry['msg'],
"levelname": log_entry['levelname'],
"project": os.environ.get("PROJECT_ID"),
"appinstance": os.environ.get("APP_ID")

}
# Setup headers
headers = {
'Content-type': 'application/json',
}
if self.token:
remote_token_protocol = os.environ.get('FEDN_REMOTE_LOG_TOKEN_PROTOCOL', "Token")
headers['Authorization'] = f"{remote_token_protocol} {self.token}"
if self.method.lower() == 'post':
requests.post(self.host+self.url, json=log_entry, headers=headers)
else:
# No other methods implemented.
return


# Remote logging can only be configured via environment variables for now.
REMOTE_LOG_SERVER = os.environ.get('FEDN_REMOTE_LOG_SERVER', False)
REMOTE_LOG_PATH = os.environ.get('FEDN_REMOTE_LOG_PATH', False)
REMOTE_LOG_LEVEL = os.environ.get('FEDN_REMOTE_LOG_LEVEL', 'INFO')

if REMOTE_LOG_SERVER:
rloglevel = log_levels.get(REMOTE_LOG_LEVEL, logging.INFO)
remote_token = os.environ.get('FEDN_REMOTE_LOG_TOKEN', None)

http_handler = StudioHTTPHandler(
host=REMOTE_LOG_SERVER,
url=REMOTE_LOG_PATH,
method='POST',
token=remote_token
)
http_handler.setLevel(rloglevel)
logger.addHandler(http_handler)


def set_log_level_from_string(level_str):
"""
Set the log level based on a string input.
Expand Down
11 changes: 6 additions & 5 deletions fedn/fedn/network/api/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from werkzeug.utils import secure_filename

from fedn.common.config import get_controller_config, get_network_config
from fedn.common.log_config import logger
from fedn.network.combiner.interfaces import (CombinerInterface,
CombinerUnavailableError)
from fedn.network.state import ReducerState, ReducerStateToString
Expand Down Expand Up @@ -274,7 +275,7 @@ def _get_compute_package_name(self):
name = package_objects["storage_file_name"]
except KeyError as e:
message = "No compute package found. Key error."
print(e)
logger.debug(e)
return None, message
return name, "success"

Expand Down Expand Up @@ -655,7 +656,7 @@ def add_client(self, client_id, preferred_combiner, remote_addr):
"certificate": cert,
"helper_type": self.control.statestore.get_helper(),
}
print("Seding payload: ", payload, flush=True)
logger.info(f"Sending payload: {payload}")

return jsonify(payload)

Expand Down Expand Up @@ -687,7 +688,7 @@ def set_initial_model(self, file):
model = helper.load(object)
self.control.commit(file.filename, model)
except Exception as e:
print(e, flush=True)
logger.debug(e)
return jsonify({"success": False, "message": e})

return jsonify(
Expand Down Expand Up @@ -967,7 +968,7 @@ def get_plot_data(self, feature=None):
except Exception as e:
valid_metrics = None
box_plot = None
print(e, flush=True)
logger.debug(e)

result = {
"valid_metrics": valid_metrics,
Expand Down Expand Up @@ -1081,7 +1082,7 @@ def start_session(
clients_available = clients_available + int(nr_active_clients)
except CombinerUnavailableError as e:
# TODO: Handle unavailable combiner, stop session or continue?
print("COMBINER UNAVAILABLE: {}".format(e), flush=True)
logger.error("COMBINER UNAVAILABLE: {}".format(e))
continue

if clients_available < min_clients:
Expand Down
13 changes: 7 additions & 6 deletions fedn/fedn/network/api/network.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import base64

from fedn.common.log_config import logger
from fedn.network.combiner.interfaces import CombinerInterface
from fedn.network.loadbalancer.leastpacked import LeastPacked

Expand Down Expand Up @@ -67,13 +68,13 @@ def add_combiner(self, combiner):
:return: None
"""
if not self.control.idle():
print("Reducer is not idle, cannot add additional combiner.")
logger.warning("Reducer is not idle, cannot add additional combiner.")
return

if self.get_combiner(combiner.name):
return

print("adding combiner {}".format(combiner.name), flush=True)
logger.info("adding combiner {}".format(combiner.name))
self.statestore.set_combiner(combiner.to_dict())

def remove_combiner(self, combiner):
Expand All @@ -84,7 +85,7 @@ def remove_combiner(self, combiner):
:return: None
"""
if not self.control.idle():
print("Reducer is not idle, cannot remove combiner.")
logger.warning("Reducer is not idle, cannot remove combiner.")
return
self.statestore.delete_combiner(combiner.name)

Expand All @@ -105,8 +106,8 @@ def handle_unavailable_combiner(self, combiner):
:return: None
"""
# TODO: Implement strategy to handle an unavailable combiner.
print("REDUCER CONTROL: Combiner {} unavailable.".format(
combiner.name), flush=True)
logger.warning("REDUCER CONTROL: Combiner {} unavailable.".format(
combiner.name))

def add_client(self, client):
""" Add a new client to the network.
Expand All @@ -119,7 +120,7 @@ def add_client(self, client):
if self.get_client(client['name']):
return

print("adding client {}".format(client['name']), flush=True)
logger.info("adding client {}".format(client['name']))
self.statestore.set_client(client)

def get_client(self, name):
Expand Down
2 changes: 1 addition & 1 deletion fedn/fedn/network/clients/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def __init__(self, config):
self._connected = False
self._missed_heartbeat = 0
self.config = config

self.trace_attribs = False
set_log_level_from_string(config.get('verbosity', "INFO"))
set_log_stream(config.get('logfile', None))

Expand Down
2 changes: 1 addition & 1 deletion fedn/fedn/network/clients/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def assign(self):
allow_redirects=True,
headers={'Authorization': f"{FEDN_AUTH_SCHEME} {self.token}"})
except Exception as e:
print('***** {}'.format(e), flush=True)
logger.debug('***** {}'.format(e))
return Status.Unassigned, {}

if retval.status_code == 400:
Expand Down
2 changes: 1 addition & 1 deletion fedn/fedn/network/combiner/aggregators/fedopt.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def combine_models(self, helper=None, delete_models=True):

logger.info(
"AGGREGATOR({}): Processing model update {}, metadata: {} ".format(self.name, model_update.model_update_id, metadata))
print("***** ", model_update, flush=True)
logger.info("***** {}".format(model_update))

# Increment total number of examples
total_examples += metadata['num_examples']
Expand Down
4 changes: 1 addition & 3 deletions fedn/fedn/network/combiner/combiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,6 @@ def __init__(self, config):
'certificate': cert,
'key': key}

print(announce_config, flush=True)

# Set up model repository
self.repository = Repository(
announce_config['storage']['storage_config'])
Expand Down Expand Up @@ -564,7 +562,7 @@ def AcceptingClients(self, request: fedn.ConnectionRequest, context):
return response

except Exception as e:
logger.error("Combiner not properly configured! {}".format(e), flush=True)
logger.error("Combiner not properly configured! {}".format(e))
raise

response.status = fedn.ConnectionStatus.TRY_AGAIN_LATER
Expand Down
2 changes: 1 addition & 1 deletion fedn/fedn/network/controller/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def round(self, session_config, round_id):
self.create_round({'round_id': round_id, 'status': "Pending"})

if len(self.network.get_combiners()) < 1:
logger.warning("Round cannot start, no combiners connected!", flush=True)
logger.warning("Round cannot start, no combiners connected!")
self.set_round_status(round_id, 'Failed')
return None, self.statestore.get_round(round_id)

Expand Down
13 changes: 7 additions & 6 deletions fedn/fedn/network/storage/models/tempmodelstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from io import BytesIO

import fedn.network.grpc.fedn_pb2 as fedn
from fedn.common.log_config import logger
from fedn.network.storage.models.modelstorage import ModelStorage

CHUNK_SIZE = 1024 * 1024
Expand Down Expand Up @@ -30,10 +31,10 @@ def get(self, model_id):

try:
if self.models_metadata[model_id] != fedn.ModelStatus.OK:
print("File not ready! Try again", flush=True)
logger.warning("File not ready! Try again")
return None
except KeyError:
print("No such model have been made available yet!", flush=True)
logger.error("No such model has been made available yet!")
return None

obj = BytesIO()
Expand Down Expand Up @@ -74,12 +75,12 @@ def delete(self, model_id):

try:
os.remove(os.path.join(self.default_dir, str(model_id)))
print("TEMPMODELSTORAGE: Deleted model with id: {}".format(model_id), flush=True)
logger.info("TEMPMODELSTORAGE: Deleted model with id: {}".format(model_id))
# Delete id from metadata and models dict
del self.models_metadata[model_id]
del self.models[model_id]
except FileNotFoundError:
print("Could not delete model from disk. File not found!", flush=True)
logger.error("Could not delete model from disk. File not found!")
return False
return True

Expand All @@ -90,11 +91,11 @@ def delete_all(self):
for model_id in self.models.keys():
try:
os.remove(os.path.join(self.default_dir, str(model_id)))
print("TEMPMODELSTORAGE: Deleted model with id: {}".format(model_id), flush=True)
logger.info("TEMPMODELSTORAGE: Deleted model with id: {}".format(model_id))
# Add id to list of ids to pop/delete from metadata and models dict
ids_pop.append(model_id)
except FileNotFoundError:
print("TEMPMODELSTORAGE: Could not delete model {} from disk. File not found!".format(model_id), flush=True)
logger.error("TEMPMODELSTORAGE: Could not delete model {} from disk. File not found!".format(model_id))
# Remove id from metadata and models dict
for model_id in ids_pop:
del self.models_metadata[model_id]
Expand Down
Loading

0 comments on commit 6550fb7

Please sign in to comment.