Skip to content

Commit

Permalink
Refactored and documented s3 storage
Browse files Browse the repository at this point in the history
  • Loading branch information
Andreas Hellander committed Dec 4, 2023
1 parent bf7ea80 commit 53745cb
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 112 deletions.
22 changes: 20 additions & 2 deletions fedn/fedn/common/storage/s3/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,36 @@ class Repository(object):
__metaclass__ = abc.ABCMeta

@abc.abstractmethod
def set_artifact(self, instance_name, instance):
def set_artifact(self, instance_name, instance, bucket):
""" Set object with name object_name
:param instance_name: The name of the object
:tyep insance_name: str
:param instance: the object
:param bucket: The bucket name
:type bucket: str
"""
raise NotImplementedError("Must be implemented by subclass")

@abc.abstractmethod
def get_artifact(self, instance_name):
def get_artifact(self, instance_name, bucket):
""" Retrive object with name instance_name.
:param instance_name: The name of the object to retrieve
:type instance_name: str
:param bucket: The bucket name
:type bucket: str
"""
raise NotImplementedError("Must be implemented by subclass")

@abc.abstractmethod
def get_artifact_stream(self, instance_name, bucket):
""" Return a stream handler for object with name instance_name.
:param instance_name: The name if the object
:type instance_name: str
:param bucket: The bucket name
:type bucket: str
:return: stream handler for object instance name
"""
raise NotImplementedError("Must be implemented by subclass")
94 changes: 21 additions & 73 deletions fedn/fedn/common/storage/s3/miniorepo.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,58 +18,30 @@ class MINIORepository(Repository):
def __init__(self, config):
""" Initialize object.
:param config: Configuration including connection credential and bucket names.
:param config: Dictionary containing configuration for credentials and bucket names.
:type config: dict
"""

super().__init__()
try:
access_key = config['storage_access_key']
except Exception:
access_key = 'minio'
try:
secret_key = config['storage_secret_key']
except Exception:
secret_key = 'minio123'
try:
self.bucket = config['storage_bucket']
except Exception:
self.bucket = 'fedn-models'
try:
self.context_bucket = config['context_bucket']
except Exception:
self.bucket = 'fedn-context'
try:
self.secure_mode = bool(config['storage_secure_mode'])
except Exception:
self.secure_mode = False

if not self.secure_mode:
logger.warning(
"S3/MINIO RUNNING IN **INSECURE** MODE!")
self.name = "MINIORepository"

if self.secure_mode:
if config['storage_secure_mode']:
manager = PoolManager(
num_pools=100, cert_reqs='CERT_NONE', assert_hostname=False)
self.client = Minio("{0}:{1}".format(config['storage_hostname'], config['storage_port']),
access_key=access_key,
secret_key=secret_key,
secure=self.secure_mode, http_client=manager)
access_key=config['storage_access_key'],
secret_key=config['storage_secret_key'],
secure=config['storage_secure_mode'], http_client=manager)
else:
self.client = Minio("{0}:{1}".format(config['storage_hostname'], config['storage_port']),
access_key=access_key,
secret_key=secret_key,
secure=self.secure_mode)

# TODO: generalize
self.context_bucket = 'fedn-context'
self.create_bucket(self.context_bucket)
self.create_bucket(self.bucket)
access_key=config['storage_access_key'],
secret_key=config['storage_secret_key'],
secure=config['storage_secure_mode'])
logger.warning(
"S3/MINIO RUNNING IN **INSECURE** MODE!")

def set_artifact(self, instance_name, instance, is_file=False, bucket=''):
def set_artifact(self, instance_name, instance, bucket, is_file=False):

if bucket == '':
bucket = self.bucket
if is_file:
self.client.fput_object(bucket, instance_name, instance)
else:
Expand All @@ -81,43 +53,37 @@ def set_artifact(self, instance_name, instance, is_file=False, bucket=''):

return True

def get_artifact(self, instance_name, bucket=''):

if bucket == '':
bucket = self.bucket
def get_artifact(self, instance_name, bucket):

try:
data = self.client.get_object(bucket, instance_name)
return data.read()
except Exception as e:
raise Exception("Could not fetch data from bucket, {}".format(e))

def get_artifact_stream(self, instance_name):
""" Return a stream handler for object with name instance_name.
def get_artifact_stream(self, instance_name, bucket):

:param instance_name: The name if the object
:type instance_name: str
:return: stream handler for object instance name
"""
try:
data = self.client.get_object(self.bucket, instance_name)
data = self.client.get_object(bucket, instance_name)
return data
except Exception as e:
raise Exception("Could not fetch data from bucket, {}".format(e))

def list_artifacts(self):
""" List all objects.
def list_artifacts(self, bucket):
""" List all objects in bucket.
:param bucket: Name of the bucket
:type bucket: str
:return: A list of object names
"""
objects = []
try:
objs = self.client.list_objects(self.bucket)
objs = self.client.list_objects(bucket)
for obj in objs:
objects.append(obj.object_name)
except Exception:
raise Exception(
"Could not list models in bucket {}".format(self.bucket))
"Could not list models in bucket {}".format(bucket))
return objects

def delete_artifact(self, instance_name, bucket=[]):
Expand All @@ -127,8 +93,6 @@ def delete_artifact(self, instance_name, bucket=[]):
:param bucket: List of buckets to delete from
:type bucket: list
"""
if not bucket:
bucket = self.bucket

try:
self.client.remove_object(bucket, instance_name)
Expand All @@ -149,19 +113,3 @@ def create_bucket(self, bucket_name):
self.client.make_bucket(bucket_name)
except InvalidResponseError:
raise

def delete_objects(self):
""" Delete all objects.
"""
objects_to_delete = self.list_artifacts()
try:
# Remove list of objects.
errors = self.client.remove_objects(
self.bucket, objects_to_delete
)
for del_err in errors:
logger.error("Deletion Error: {}".format(del_err))
except Exception:
logger.error('Could not delete objects: {}'.format(objects_to_delete))
pass
83 changes: 47 additions & 36 deletions fedn/fedn/common/storage/s3/s3repo.py
Original file line number Diff line number Diff line change
@@ -1,89 +1,100 @@
import uuid

from fedn.common.log_config import logger
from fedn.common.storage.s3.miniorepo import MINIORepository

from .miniorepo import MINIORepository

class S3ModelRepository:
""" Interface for storing model objects and compute packages in S3 compatible storage. """

class S3ModelRepository(MINIORepository):
""" Class for S3 Repository.
def __init__(self, config):

"""
self.model_bucket = config['storage_bucket']
self.context_bucket = config['context_bucket']

def __init__(self, config):
super().__init__(config)
self.client = MINIORepository(config)

self.client.create_bucket(self.context_bucket)
self.client.create_bucket(self.model_bucket)

def get_model(self, model_id):
"""
""" Retrieve a model with id model_id.
:param model_id:
:return:
:param model_id: Unique identifier for model to retrive.
:return: The model object
"""
logger.info("Client {} trying to get model with id: {}".format(
self.client, model_id), flush=True)
return self.get_artifact(model_id)
self.client.name, model_id))
return self.client.get_artifact(model_id, self.model_bucket)

def get_model_stream(self, model_id):
"""
""" Retrieve a stream handle to model with id model_id.
:param model_id:
:return:
:return: Handle to model object
"""
logger.info("Client {} trying to get model with id: {}".format(
self.client, model_id), flush=True)
return self.get_artifact_stream(model_id)
self.client.name, model_id))
return self.client.get_artifact_stream(model_id, self.model_bucket)

def set_model(self, model, is_file=True):
"""
""" Upload model object.
:param model:
:param is_file:
:return:
:param model: The model object
:type model: BytesIO or str file name.
:param is_file: True if model is a file name, else False
:return: id for the uploaded object (str)
"""
model_id = uuid.uuid4()
# TODO: Check that this call succeeds

try:
self.set_artifact(str(model_id), model,
bucket=self.bucket, is_file=is_file)
self.client.set_artifact(str(model_id), model,
bucket=self.model_bucket, is_file=is_file)
except Exception:
logger.error("Failed to write model with ID {} to repository.".format(model_id))
logger.error("Failed to upload model with ID {} to repository.".format(model_id))
raise
return str(model_id)

def set_compute_package(self, name, compute_package, is_file=True):
"""
""" Upload compute package.
:param name:
:param compute_package:
:param is_file:
:param name: The name of the compute package.
:type name: str
:param compute_package: The compute package
:type compute_pacakge: BytesIO or str file name.
:param is_file: True if model is a file name, else False
"""

try:
self.set_artifact(str(name), compute_package,
bucket="fedn-context", is_file=is_file)
self.client.set_artifact(str(name), compute_package,
bucket=self.context_bucket, is_file=is_file)
except Exception:
logger.error("Failed to write compute_package to repository.")
raise

def get_compute_package(self, compute_package):
"""
""" Retrieve compute package from object store.
:param compute_package:
:return:
:param compute_package: The name of the compute package.
:type compute_pacakge: str
:return: Compute package.
"""
try:
data = self.get_artifact(compute_package, bucket="fedn-context")
data = self.client.get_artifact(compute_package, bucket=self.context_bucket)
except Exception:
logger.error("Failed to get compute_package from repository.")
raise
return data

def delete_compute_package(self, compute_package):
"""
""" Delete a compute package from storage.
:param compute_package:
:param compute_package: The name of the compute_package
:type compute_package: str
"""

try:
self.delete_artifact(compute_package, bucket=['fedn-context'])
self.client.delete_artifact(compute_package, bucket=[self.context_bucket])
except Exception:
logger.error("Failed to delete compute_package from repository.")
raise
1 change: 1 addition & 0 deletions fedn/fedn/network/combiner/round.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ def stage_model(self, model_id, timeout_retry=3, retry=2):
if tries > retry:
self.server.report_status(
"ROUNDCONTROL: Failed to stage model {} from storage backend!".format(model_id), flush=True)
raise
return

self.modelservice.set_model(model, model_id)
Expand Down
2 changes: 1 addition & 1 deletion fedn/fedn/network/controller/controlbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ def evaluate_round_validity_policy(self, round):
return True

def state(self):
""" Get the current state of the controller
""" Get the current state of the controller.
:return: The state
:rype: str
Expand Down

0 comments on commit 53745cb

Please sign in to comment.