Skip to content

Commit

Permalink
work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
Andreas Hellander committed Dec 3, 2023
1 parent 722fdd5 commit bf7ea80
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 50 deletions.
5 changes: 2 additions & 3 deletions fedn/fedn/common/storage/s3/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
""" Module handling storage of objects in S3/MinIO object storage.
This functionality is used by the controller to store global models in the model trail
in persistent storage. """
""" Module handling storage of objects in S3/MinIO object storage. This functionality is used by the controller
to store global models in the model trail in persistent storage. """
8 changes: 4 additions & 4 deletions fedn/fedn/common/storage/s3/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@ class Repository(object):

@abc.abstractmethod
def set_artifact(self, instance_name, instance):
""" Set object with name instance_name
""" Set object with name object_name
:param instance_name:
:param instance:
:param instance_name: The name of the object
:param instance: the object
"""
raise NotImplementedError("Must be implemented by subclass")

@abc.abstractmethod
def get_artifact(self, instance_name):
""" Retrive object with name instance_name.
:param instance_name:
:param instance_name: The name of the object to retrieve
"""
raise NotImplementedError("Must be implemented by subclass")
67 changes: 31 additions & 36 deletions fedn/fedn/common/storage/s3/miniorepo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
from minio.error import InvalidResponseError
from urllib3.poolmanager import PoolManager

from .base import Repository
from fedn.common.log_config import logger

logger = logging.getLogger(__name__)
from .base import Repository


class MINIORepository(Repository):
""" Class implementing Repoistory for MinIO. """
""" Class implementing Repository for MinIO. """

client = None

Expand Down Expand Up @@ -45,8 +45,8 @@ def __init__(self, config):
self.secure_mode = False

if not self.secure_mode:
print(
"\n\n\nWARNING : S3/MINIO RUNNING IN **INSECURE** MODE! THIS IS NOT FOR PRODUCTION!\n\n\n")
logger.warning(
"S3/MINIO RUNNING IN **INSECURE** MODE!")

if self.secure_mode:
manager = PoolManager(
Expand All @@ -66,22 +66,8 @@ def __init__(self, config):
self.create_bucket(self.context_bucket)
self.create_bucket(self.bucket)

def create_bucket(self, bucket_name):
""" Create a new bucket. If bucket exists, do nothing.
:param bucket_name: The name of the bucket
:type bucket_name: str
"""
found = self.client.bucket_exists(bucket_name)

if not found:
try:
self.client.make_bucket(bucket_name)
except InvalidResponseError:
raise

def set_artifact(self, instance_name, instance, is_file=False, bucket=''):
""" Instance must be a byte-like object. """

if bucket == '':
bucket = self.bucket
if is_file:
Expand All @@ -96,12 +82,7 @@ def set_artifact(self, instance_name, instance, is_file=False, bucket=''):
return True

def get_artifact(self, instance_name, bucket=''):
""" Retrive object with name instance_name.

:param instance_name:
:param bucket:
:return:
"""
if bucket == '':
bucket = self.bucket

Expand All @@ -125,20 +106,19 @@ def get_artifact_stream(self, instance_name):
raise Exception("Could not fetch data from bucket, {}".format(e))

def list_artifacts(self):
"""
""" List all objects.
:return:
:return: A list of object names
"""
objects_to_delete = []
objects = []
try:
objs = self.client.list_objects(self.bucket)
for obj in objs:
print(obj.object_name)
objects_to_delete.append(obj.object_name)
objects.append(obj.object_name)
except Exception:
raise Exception(
"Could not list models in bucket {}".format(self.bucket))
return objects_to_delete
return objects

def delete_artifact(self, instance_name, bucket=[]):
""" Delete object with name instance_name from buckets.
Expand All @@ -153,11 +133,25 @@ def delete_artifact(self, instance_name, bucket=[]):
try:
self.client.remove_object(bucket, instance_name)
except InvalidResponseError as err:
print(err)
print('Could not delete artifact: {}'.format(instance_name))
logger.error('Could not delete artifact: {0} err: {1}'.format(instance_name, err))
pass

def create_bucket(self, bucket_name):
""" Create a new bucket. If bucket exists, do nothing.
:param bucket_name: The name of the bucket
:type bucket_name: str
"""
found = self.client.bucket_exists(bucket_name)

if not found:
try:
self.client.make_bucket(bucket_name)
except InvalidResponseError:
raise

def delete_objects(self):
""" Delete all objects
""" Delete all objects.
"""
objects_to_delete = self.list_artifacts()
Expand All @@ -167,6 +161,7 @@ def delete_objects(self):
self.bucket, objects_to_delete
)
for del_err in errors:
print("Deletion Error: {}".format(del_err))
logger.error("Deletion Error: {}".format(del_err))
except Exception:
print('Could not delete objects: {}'.format(objects_to_delete))
logger.error('Could not delete objects: {}'.format(objects_to_delete))
pass
16 changes: 9 additions & 7 deletions fedn/fedn/common/storage/s3/s3repo.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import uuid

from fedn.common.log_config import logger

from .miniorepo import MINIORepository


class S3ModelRepository(MINIORepository):
"""
""" Class for S3 Repository.
"""

Expand All @@ -17,7 +19,7 @@ def get_model(self, model_id):
:param model_id:
:return:
"""
print("Client {} trying to get model with id: {}".format(
logger.info("Client {} trying to get model with id: {}".format(
self.client, model_id), flush=True)
return self.get_artifact(model_id)

Expand All @@ -27,7 +29,7 @@ def get_model_stream(self, model_id):
:param model_id:
:return:
"""
print("Client {} trying to get model with id: {}".format(
logger.info("Client {} trying to get model with id: {}".format(
self.client, model_id), flush=True)
return self.get_artifact_stream(model_id)

Expand All @@ -44,7 +46,7 @@ def set_model(self, model, is_file=True):
self.set_artifact(str(model_id), model,
bucket=self.bucket, is_file=is_file)
except Exception:
print("Failed to write model with ID {} to repository.".format(model_id))
logger.error("Failed to write model with ID {} to repository.".format(model_id))
raise
return str(model_id)

Expand All @@ -59,7 +61,7 @@ def set_compute_package(self, name, compute_package, is_file=True):
self.set_artifact(str(name), compute_package,
bucket="fedn-context", is_file=is_file)
except Exception:
print("Failed to write compute_package to repository.")
logger.error("Failed to write compute_package to repository.")
raise

def get_compute_package(self, compute_package):
Expand All @@ -71,7 +73,7 @@ def get_compute_package(self, compute_package):
try:
data = self.get_artifact(compute_package, bucket="fedn-context")
except Exception:
print("Failed to get compute_package from repository.")
logger.error("Failed to get compute_package from repository.")
raise
return data

Expand All @@ -83,5 +85,5 @@ def delete_compute_package(self, compute_package):
try:
self.delete_artifact(compute_package, bucket=['fedn-context'])
except Exception:
print("Failed to delete compute_package from repository.")
logger.error("Failed to delete compute_package from repository.")
raise

0 comments on commit bf7ea80

Please sign in to comment.