Skip to content

Commit

Permalink
Addressing comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ketan Umare <[email protected]>
  • Loading branch information
kumare3 committed Jul 31, 2021
1 parent d280ea2 commit 7a7eade
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 40 deletions.
9 changes: 5 additions & 4 deletions docs/source/data.extend.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
###################
Extend Type System
###################
Flytekit ships with an extensible Data persistence layer. This is where the metadata and the actual raw data is stored by flytekit.
##############################
Extend Data Persistence layer
##############################
Flytekit provides a data persistence layer, which is used for recording metadata that is shared with backend Flyte. This persistence layer is also available for various types to store raw user data and is designed to be cross-cloud compatible.
Moreover, it is design to be extensible and users can bring their own data persistence plugins by following the persistence interface. NOTE, this is bound to get more extensive for variety of use-cases, but the core set of apis are battle tested.

.. automodule:: flytekit.core.data_persistence
:no-members:
Expand Down
31 changes: 13 additions & 18 deletions flytekit/core/data_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
.. autosummary::
:toctree: generated/
:template: custom.rst
:nosignatures:
DataPersistence
DataPersistencePlugins
Expand Down Expand Up @@ -47,7 +49,7 @@ def __init__(self, message: str):

class DataPersistence(object):
"""
Base abstract type for all DataPersistence operations. This can be plugged in using the flytekitplugins architecture
Base abstract type for all DataPersistence operations. This can be extended using the flytekitplugins architecture
"""

def __init__(self, name: str, default_prefix: typing.Optional[str] = None, **kwargs):
Expand Down Expand Up @@ -78,7 +80,7 @@ def exists(self, path: str) -> bool:
@abstractmethod
def get(self, from_path: str, to_path: str, recursive: bool = False):
"""
Retrieves a data from from_path and writes to the given to_path (to_path is locally accessible)
Retrieves data from from_path and writes to the given to_path (to_path is locally accessible)
"""
pass

Expand All @@ -102,7 +104,7 @@ def construct_path(self, add_protocol: bool, add_prefix: bool, *paths) -> str:

class DataPersistencePlugins(object):
"""
This is core plugin engine that stores all DataPersistence plugins. To add a new plugin use
DataPersistencePlugins is the core plugin registry that stores all DataPersistence plugins. To add a new plugin use
.. code-block:: python
Expand All @@ -116,7 +118,7 @@ class DataPersistencePlugins(object):
@classmethod
def register_plugin(cls, protocol: str, plugin: typing.Type[DataPersistence], force: bool = False):
"""
Registers the supplied plugin for the specified protocol if one does not already exists.
Registers the supplied plugin for the specified protocol if one does not already exist.
If one exists and force is default or False, then a TypeError is raised.
If one does not exist then it is registered
If one exists, but force == True then the existing plugin is overriden
Expand Down Expand Up @@ -161,7 +163,7 @@ def is_supported_protocol(cls, protocol: str) -> bool:

class DiskPersistence(DataPersistence):
"""
The simplest form of persistence that is available with default flytekit - Disk based persistence.
The simplest form of persistence that is available with default flytekit - Disk-based persistence.
This will store all data locally and retreive the data from local. This is helpful for local execution and simulating
runs.
"""
Expand Down Expand Up @@ -237,7 +239,7 @@ class FileAccessProvider(object):
def __init__(self, local_sandbox_dir: Union[str, os.PathLike], raw_output_prefix: str):
# Local access
if local_sandbox_dir is None or local_sandbox_dir == "":
raise Exception("Can't use empty path")
raise ValueError("FileAccessProvider needs to be created with a valid local_sandbox_dir")
local_sandbox_dir_appended = os.path.join(local_sandbox_dir, "local_flytekit")
self._local_sandbox_dir = pathlib.Path(local_sandbox_dir_appended)
self._local_sandbox_dir.mkdir(parents=True, exist_ok=True)
Expand All @@ -251,9 +253,7 @@ def is_remote(path: Union[str, os.PathLike]) -> bool:
"""
Deprecated. Lets find a replacement
"""
if path.startswith("/") or path.startswith("file://"):
return False
return True
return not (path.startswith("/") or path.startswith("file://"))

@property
def local_sandbox_dir(self) -> os.PathLike:
Expand Down Expand Up @@ -340,17 +340,12 @@ def get_data(self, remote_path: str, local_path: str, is_multipart=False):
:param bool is_multipart:
"""
try:
with PerformanceTimer("Copying ({} -> {})".format(remote_path, local_path)):
with PerformanceTimer(f"Copying ({remote_path} -> {local_path})"):
DataPersistencePlugins.find_plugin(remote_path)().get(remote_path, local_path, recursive=is_multipart)
except Exception as ex:
raise FlyteAssertion(
"Failed to get data from {remote_path} to {local_path} (recursive={is_multipart}).\n\n"
"Original exception: {error_string}".format(
remote_path=remote_path,
local_path=local_path,
is_multipart=is_multipart,
error_string=str(ex),
)
f"Failed to get data from {remote_path} to {local_path} (recursive={is_multipart}).\n\n"
f"Original exception: {str(ex)}"
)

def put_data(self, local_path: Union[str, os.PathLike], remote_path: str, is_multipart=False):
Expand All @@ -363,7 +358,7 @@ def put_data(self, local_path: Union[str, os.PathLike], remote_path: str, is_mul
:param bool is_multipart:
"""
try:
with PerformanceTimer("Writing ({} -> {})".format(local_path, remote_path)):
with PerformanceTimer(f"Writing ({local_path} -> {remote_path})"):
DataPersistencePlugins.find_plugin(remote_path)().put(local_path, remote_path, recursive=is_multipart)
except Exception as ex:
raise FlyteAssertion(
Expand Down
2 changes: 1 addition & 1 deletion flytekit/extras/persistence/gcs_gsutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def _check_binary():
Make sure that the `gsutil` cli is present
"""
if not shell_which(GCSPersistence._GS_UTIL_CLI):
raise _FlyteUserException("gsutil (gcloud cli) not found! Please install.")
raise _FlyteUserException("gsutil (gcloud cli) not found! Please install using `pip install gsutil`.")

@staticmethod
def _maybe_with_gsutil_parallelism(*gsutil_args):
Expand Down
18 changes: 9 additions & 9 deletions flytekit/extras/persistence/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,31 @@ class HttpPersistence(DataPersistence):
_HTTP_OK = 200
_HTTP_FORBIDDEN = 403
_HTTP_NOT_FOUND = 404
ALLOWED_CODES = {
_HTTP_OK,
_HTTP_NOT_FOUND,
_HTTP_FORBIDDEN,
}

def __init__(self, *args, **kwargs):
super(HttpPersistence, self).__init__(name="http/https", *args, **kwargs)

def exists(self, path: str):
rsp = _requests.head(path)
allowed_codes = {
type(self)._HTTP_OK,
type(self)._HTTP_NOT_FOUND,
type(self)._HTTP_FORBIDDEN,
}
if rsp.status_code not in allowed_codes:
if rsp.status_code not in self.ALLOWED_CODES:
raise _user_exceptions.FlyteValueException(
rsp.status_code,
"Data at {} could not be checked for existence. Expected one of: {}".format(path, allowed_codes),
f"Data at {path} could not be checked for existence. Expected one of: {self.ALLOWED_CODES}",
)
return rsp.status_code == type(self)._HTTP_OK
return rsp.status_code == self._HTTP_OK

def get(self, from_path: str, to_path: str, recursive: bool = False):
if recursive:
raise _user_exceptions.FlyteAssertion(
"Reading data recursively from HTTP endpoint is not currently supported."
)
rsp = _requests.get(from_path)
if rsp.status_code != type(self)._HTTP_OK:
if rsp.status_code != self._HTTP_OK:
raise _user_exceptions.FlyteValueException(
rsp.status_code,
"Request for data @ {} failed. Expected status code {}".format(from_path, type(self)._HTTP_OK),
Expand Down
19 changes: 11 additions & 8 deletions flytekit/extras/persistence/s3_awscli.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ class S3Persistence(DataPersistence):
"""
DataPersistence plugin for AWS S3 (and Minio). Use aws cli to manage the transfer. The binary needs to be installed
separately
.. prompt::
pip install awscli
"""

PROTOCOL = "s3://"
Expand All @@ -73,22 +78,20 @@ def _check_binary():
Make sure that the AWS cli is present
"""
if not shell_which(S3Persistence._AWS_CLI):
raise _FlyteUserException("AWS CLI not found at Please install.")
raise _FlyteUserException("AWS CLI not found! Please install it with `pip install awscli`.")

@staticmethod
def _split_s3_path_to_bucket_and_key(path):
def _split_s3_path_to_bucket_and_key(path: str) -> (str, str):
"""
:param Text path:
:rtype: (Text, Text)
splits a valid s3 uri into bucket and key
"""
path = path[len("s3://") :]
path = path[len("s3://"):]
first_slash = path.index("/")
return path[:first_slash], path[first_slash + 1 :]
return path[:first_slash], path[first_slash + 1:]

def exists(self, remote_path):
"""
:param Text remote_path: remote s3:// path
:rtype bool: whether the s3 file exists or not
Given a remoet path of the format s3://, checks if the remote file exists
"""
S3Persistence._check_binary()

Expand Down

0 comments on commit 7a7eade

Please sign in to comment.