Skip to content

Commit

Permalink
feat: implementing cli-upload functionality.
Browse files Browse the repository at this point in the history
Add cli-upload.

Signed-off-by: Giorgio Giannone <[email protected]>
  • Loading branch information
georgosgeorgos committed Jun 13, 2022
1 parent 2f04908 commit 9503dbc
Show file tree
Hide file tree
Showing 7 changed files with 449 additions and 1 deletion.
1 change: 1 addition & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,6 @@ jobs:
gt4sd-trainer --help
gt4sd-inference --help
gt4sd-saving --help
gt4sd-upload --help
gt4sd-pl-to-hf --help
gt4sd-hf-to-st --help
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,5 @@ tensorflow==2.1.0
keras==2.3.1
sacremoses>=0.0.41
protobuf<=3.20.1
#multiprocess==0.70.12.2

1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ console_scripts=
gt4sd-trainer = gt4sd.cli.trainer:main
gt4sd-inference = gt4sd.cli.inference:main
gt4sd-saving = gt4sd.cli.saving:main
gt4sd-upload=gt4sd.cli.upload:main
gt4sd-pl-to-hf = gt4sd.cli.pl_to_hf_converter:main
gt4sd-hf-to-st = gt4sd.cli.hf_to_st_converter:main

Expand Down
149 changes: 149 additions & 0 deletions src/gt4sd/algorithms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
get_algorithm_subdirectories_with_s3,
get_cached_algorithm_path,
sync_algorithm_with_s3,
upload_to_s3,
)
from ..exceptions import InvalidItem, S3SyncError, SamplingError
from ..training_pipelines.core import TrainingPipelineArguments
Expand Down Expand Up @@ -442,6 +443,61 @@ def list_versions(cls) -> Set[str]:
versions = versions.union(get_algorithm_subdirectories_in_cache(prefix))
return versions

# QUESTION: should this be in core.py? Uploading to S3 is not a core functionality.
@classmethod
def list_versions_s3(cls, prefix) -> Set[str]:
"""Get possible algorithm versions on s3.
Before uploading an artifact on S3, we need to check that
a particular version is not already present and overwrite by mistake.
If the final set is empty we can then upload the folder artifact.
If the final set is not empty, we need to check that the specific version
of interest is not present.
only S3 is searched (not the local cache) for matching versions.
Returns:
viable values as :attr:`algorithm_version` for the environment.
"""
# all name without version
if not prefix:
prefix = cls.get_application_prefix()
try:
versions = get_algorithm_subdirectories_with_s3(prefix)
except (KeyError, S3SyncError) as error:
logger.info(
f"searching S3 raised {error.__class__.__name__}. This means that no versions are available on S3."
)
logger.debug(error)
versions = set()
return versions

@classmethod
def list_remote_versions(cls, prefix) -> Set[str]:
"""Get possible algorithm versions on s3.
Before uploading an artifact on S3, we need to check that
a particular version is not already present and overwrite by mistake.
If the final set is empty we can then upload the folder artifact.
If the final set is not empty, we need to check that the specific version
of interest is not present.
only S3 is searched (not the local cache) for matching versions.
Returns:
viable values as :attr:`algorithm_version` for the environment.
"""
# all name without version
if not prefix:
prefix = cls.get_application_prefix()
try:
versions = get_algorithm_subdirectories_with_s3(prefix)
except (KeyError, S3SyncError) as error:
logger.info(
f"searching S3 raised {error.__class__.__name__}. This means that no versions are available on S3."
)
logger.debug(error)
versions = set()
return versions

@classmethod
def get_filepath_mappings_for_training_pipeline_arguments(
cls, training_pipeline_arguments: TrainingPipelineArguments
Expand Down Expand Up @@ -535,6 +591,99 @@ def save_version_from_training_pipeline_arguments(

logger.info(f"Artifacts saving completed into {target_path}")

@classmethod
def upload_version_from_training_pipeline_arguments_postprocess(
cls,
training_pipeline_arguments: TrainingPipelineArguments,
):
"""Postprocess after uploading. Not implemented yet.
Args:
training_pipeline_arguments: training pipeline arguments.
"""
pass

@classmethod
def upload_version_from_training_pipeline_arguments(
cls,
training_pipeline_arguments: TrainingPipelineArguments,
target_version: str,
source_version: Optional[str] = None,
) -> None:
"""Upload a version using training pipeline arguments.
Args:
training_pipeline_arguments: training pipeline arguments.
target_version: target version used to save the model in s3.
source_version: source version to use for missing artifacts.
Defaults to None, a.k.a., use the default version.
"""
filepaths_mapping: Dict[str, str] = {}

try:
filepaths_mapping = (
cls.get_filepath_mappings_for_training_pipeline_arguments(
training_pipeline_arguments=training_pipeline_arguments
)
)
except ValueError:
logger.info(
f"{cls.__name__} can not save a version based on {training_pipeline_arguments}"
)

if len(filepaths_mapping) > 0:
# probably redundant, but just in case
if source_version is None:
source_version = cls.algorithm_version
source_missing_path = cls.ensure_artifacts_for_version(source_version)

# prefix for a run
prefix = cls.get_application_prefix()
# versions in s3 with that prefix
versions = cls.list_remote_versions(prefix)

# check if the target version is already in s3. If yes, don't upload.
if target_version not in versions:
logger.info(
f"There is no version {target_version} in S3, starting upload..."
)
else:
logger.info(
f"Version {target_version} already exists in S3, skipping upload..."
)
return

# mapping between filenames and paths for a version.
filepaths_mapping = {
filename: source_filepath
if os.path.exists(source_filepath)
else os.path.join(source_missing_path, filename)
for filename, source_filepath in filepaths_mapping.items()
}

logger.info(
f"Uploading artifacts into {os.path.join(prefix, target_version)}..."
)
try:
for target_filename, source_filepath in filepaths_mapping.items():
# algorithm_type/algorithm_name/algorithm_application/version/filename
# for the moment we assume that the prefix exists in s3.
target_filepath = os.path.join(
prefix, target_version, target_filename
)
upload_to_s3(target_filepath, source_filepath)
logger.info(
f"Upload artifact {source_filepath} into {target_filepath}..."
)

except S3SyncError:
logger.warning("Problem with upload...")
return

logger.info(
f"Artifacts uploading completed into {os.path.join(prefix, target_version)}"
)

@classmethod
def ensure_artifacts_for_version(cls, algorithm_version: str) -> str:
"""The artifacts matching the path defined by class attributes and the given version are downloaded.
Expand Down
Loading

0 comments on commit 9503dbc

Please sign in to comment.