Skip to content

Commit

Permalink
Fixed some stuff again after merging in develop branch.
Browse files Browse the repository at this point in the history
  • Loading branch information
stefanhellander committed Oct 20, 2023
1 parent 037c9bd commit a2fec35
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 79 deletions.
5 changes: 0 additions & 5 deletions fedn/cli/main.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
import logging

import click

logging.basicConfig(format='%(asctime)s [%(filename)s:%(lineno)d] %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p') # , level=logging.DEBUG)

CONTEXT_SETTINGS = dict(
# Support -h as a shortcut for --help
help_option_names=['-h', '--help'],
Expand Down
53 changes: 23 additions & 30 deletions fedn/fedn/network/clients/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import fedn.common.net.grpc.fedn_pb2 as fedn
import fedn.common.net.grpc.fedn_pb2_grpc as rpc
from fedn.common.log_config import logger, set_log_level_from_string, set_theme_from_string
from fedn.network.clients.connect import ConnectorClient, Status
from fedn.network.clients.package import PackageRuntime
from fedn.network.clients.state import ClientState, ClientStateToString
Expand Down Expand Up @@ -49,7 +50,13 @@ class Client:

def __init__(self, config):
"""Initialize the client."""

print(""" _____ _ _ ______ ______ _____
/ ____| | | | | | ____| ____| __ \
| (___ ___ __ _| | ___ ___ _ _| |_ | |__ | |__ | | | |_ __
\___ \ / __/ _` | |/ _ \/ _ \| | | | __| | __| | __| | | | | '_ \
____) | (_| (_| | | __/ (_) | |_| | |_ | | | |____| |__| | | | |
|_____/ \___\__,_|_|\___|\___/ \__,_|\__| |_| |______|_____/|_| |_|
""")
self.state = None
self.error_state = False
self._attached = False
Expand Down Expand Up @@ -107,27 +114,26 @@ def _assign(self):
:rtype: dict
"""

print("Asking for assignment!", flush=True)
logger.info("Initiating assignment request.")
while True:
status, response = self.connector.assign()
if status == Status.TryAgain:
print(response, flush=True)
logger.info(response)
time.sleep(5)
continue
if status == Status.Assigned:
client_config = response
break
if status == Status.UnAuthorized:
print(response, flush=True)
logger.warning(response)
sys.exit("Exiting: Unauthorized")
if status == Status.UnMatchedConfig:
print(response, flush=True)
logger.warning(response)
sys.exit("Exiting: UnMatchedConfig")
time.sleep(5)
print(".", end=' ', flush=True)

print("Got assigned!", flush=True)
print("Received combiner config: {}".format(client_config), flush=True)
logger.info("Assignment successfully received.")
logger.info("Received combiner configuration: {}".format(client_config))
return client_config

def _connect(self, client_config):
Expand All @@ -146,24 +152,24 @@ def _connect(self, client_config):
host = client_config['fqdn']
# assuming https if fqdn is used
port = 443
print(f"CLIENT: Connecting to combiner host: {host}:{port}", flush=True)
logger.info(f"Initiating connection to combiner host at: {host}:{port}", flush=True)

if client_config['certificate']:
print("CLIENT: using certificate from Reducer for GRPC channel")
logger.info("Utilizing CA certificate for GRPC channel authentication.")
secure = True
cert = base64.b64decode(
client_config['certificate']) # .decode('utf-8')
credentials = grpc.ssl_channel_credentials(root_certificates=cert)
channel = grpc.secure_channel("{}:{}".format(host, str(port)), credentials)
elif os.getenv("FEDN_GRPC_ROOT_CERT_PATH"):
secure = True
print("CLIENT: using root certificate from environment variable for GRPC channel")
logger.info("Using root certificate from environment variable for GRPC channel.")
with open(os.environ["FEDN_GRPC_ROOT_CERT_PATH"], 'rb') as f:
credentials = grpc.ssl_channel_credentials(f.read())
channel = grpc.secure_channel("{}:{}".format(host, str(port)), credentials)
elif self.config['secure']:
secure = True
print("CLIENT: using CA certificate for GRPC channel")
logger.info("Using CA certificate for GRPC channel.")
cert = ssl.get_server_certificate((host, port))

credentials = grpc.ssl_channel_credentials(cert.encode('utf-8'))
Expand All @@ -174,7 +180,7 @@ def _connect(self, client_config):
else:
channel = grpc.secure_channel("{}:{}".format(host, str(port)), credentials)
else:
print("CLIENT: using insecure GRPC channel")
logger.info("Using insecure GRPC channel.")
if port == 443:
port = 80
channel = grpc.insecure_channel("{}:{}".format(
Expand All @@ -187,13 +193,13 @@ def _connect(self, client_config):
self.combinerStub = rpc.CombinerStub(channel)
self.modelStub = rpc.ModelServiceStub(channel)

print("Client: {} connected {} to {}:{}".format(self.name,
"SECURED" if secure else "INSECURE",
logger.info("Successfully established {} connection to {}:{}".format(self.name,
"secure" if secure else "insecure",
host,
port),
flush=True)

print("Client: Using {} compute package.".format(
logger.info("Using {} compute package.".format(
client_config["package"]))

def _disconnect(self):
Expand Down Expand Up @@ -310,7 +316,7 @@ def _initialize_dispatcher(self, config):
except KeyError:
pass
except Exception as e:
print(f"Caught exception: {type(e).__name__}")
logger.error(f"Caught exception: {type(e).__name__}")
else:
# TODO: Deprecate
dispatch_config = {'entry_points':
Expand Down Expand Up @@ -530,16 +536,6 @@ def _process_training_request(self, model_id):

os.unlink(inpath)
os.unlink(outpath)
# except FileNotFoundError as e:
# print("File not found.")
# except Exception as e:
# print("ERROR could not process training request due to error: {}".format(
# e))
# print(type(e).__name__)
# print(inpath)
# print(outpath)
# updated_model_id = None
# meta = {'status': 'failed', 'error': str(e)}

self.state = ClientState.idle

Expand Down Expand Up @@ -740,9 +736,6 @@ def run(self):
if self.state != old_state:
logger.info("Client in {} state.".format(ClientStateToString(self.state)))
if cnt > 5:
# logger.info("CLIENT active.")
# print("{}:CLIENT active".format(
# datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
cnt = 0
if not self._attached:
logger.info("Detached from combiner.")
Expand Down
4 changes: 2 additions & 2 deletions fedn/fedn/network/clients/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import requests

from fedn.common.log_config import logger

class Status(enum.Enum):
""" Enum for representing the status of a client assignment."""
Expand Down Expand Up @@ -63,8 +64,7 @@ def __init__(self, host, port, token, name, remote_package, force_ssl=False, ver
self.connect_string = "{}{}".format(
self.prefix, self.host)

print("\n\nsetting the connection string to {}\n\n".format(
self.connect_string), flush=True)
logger.info("Setting connection string to {}.".format(self.connect_string))

def assign(self):
"""
Expand Down
4 changes: 2 additions & 2 deletions fedn/fedn/network/combiner/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import requests

from fedn.common.log_config import logger

class Status(enum.Enum):
""" Enum for representing the status of a combiner announcement."""
Expand Down Expand Up @@ -83,8 +84,7 @@ def __init__(self, host, port, myhost, fqdn, myport, token, name, secure=False,
self.connect_string = "{}{}".format(
self.prefix, self.host)

print("\n\nsetting the connection string to {}\n\n".format(
self.connect_string), flush=True)
logger.info("Setting connection string to {}".format(self.connect_string))

def announce(self):
"""
Expand Down
5 changes: 1 addition & 4 deletions fedn/fedn/utils/dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import platform
import logging

from fedn.utils.process import run_process

logger = logging.getLogger(__name__)

from fedn.common.log_config import logger

class Dispatcher:
""" Dispatcher class for compute packages.
Expand Down
33 changes: 0 additions & 33 deletions fedn/fedn/utils/logger.py

This file was deleted.

5 changes: 2 additions & 3 deletions fedn/fedn/utils/process.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging
import subprocess

logger = logging.getLogger()
from fedn.common.log_config import logger


def run_process(args, cwd):
Expand All @@ -25,7 +24,7 @@ def check_io():
while True:
output = status.stdout.readline().decode()
if output:
logger.log(logging.INFO, output)
logger.info(output)
else:
break

Expand Down

0 comments on commit a2fec35

Please sign in to comment.