Skip to content

Commit

Permalink
Bring your own Data plugins for flytekit (#559)
Browse files Browse the repository at this point in the history
* Name attribute added

Signed-off-by: Ketan Umare <[email protected]>

* Data persistence work in progress

Signed-off-by: Ketan Umare <[email protected]>

* updated data persistence api

Signed-off-by: Ketan Umare <[email protected]>

* tried my hand at docs - failed!

Signed-off-by: Ketan Umare <[email protected]>

* wip - pr into #559 (#565)

Signed-off-by: wild-endeavor <[email protected]>

* simplify code

Signed-off-by: wild-endeavor <[email protected]>
Signed-off-by: Ketan Umare <[email protected]>

* Addressing comments

Signed-off-by: Ketan Umare <[email protected]>

* black format

Signed-off-by: Ketan Umare <[email protected]>

* Fixed lint errors

Signed-off-by: Ketan Umare <[email protected]>

* Added unit tests

Signed-off-by: Ketan Umare <[email protected]>

* fixed lint and added more tests

Signed-off-by: Ketan Umare <[email protected]>

* updated

Signed-off-by: Ketan Umare <[email protected]>

* delete this

Signed-off-by: Ketan Umare <[email protected]>

* Revert "delete this"

This reverts commit efa5234.

Signed-off-by: Ketan Umare <[email protected]>

* Fsspec DataPersistence plugin (#596)

* Support for all fsspec data-persistence plugins

Signed-off-by: Ketan Umare <[email protected]>

* fixed lint

Signed-off-by: Ketan Umare <[email protected]>

* FSSpec debugging and defaults

Signed-off-by: Ketan Umare <[email protected]>

* FSSpec implementation

Signed-off-by: Ketan Umare <[email protected]>

* updated fsspec plugin

Signed-off-by: Ketan Umare <[email protected]>

* minio support

Signed-off-by: Ketan Umare <[email protected]>

* persist fix

Signed-off-by: Ketan Umare <[email protected]>

* fixed

Signed-off-by: Ketan Umare <[email protected]>

* updated

Signed-off-by: Ketan Umare <[email protected]>

* client args

Signed-off-by: Ketan Umare <[email protected]>

* upload to a dir

Signed-off-by: Ketan Umare <[email protected]>

* updated

Signed-off-by: Ketan Umare <[email protected]>

* fixed paths

Signed-off-by: Ketan Umare <[email protected]>

* updated

Signed-off-by: Ketan Umare <[email protected]>

* updated

Signed-off-by: Ketan Umare <[email protected]>

* update put logic

Signed-off-by: Ketan Umare <[email protected]>

* updated

Signed-off-by: Ketan Umare <[email protected]>

* remove tempfile

Signed-off-by: Yee Hing Tong <[email protected]>

* unit tests added

Signed-off-by: Ketan Umare <[email protected]>

* lint fix

Signed-off-by: Ketan Umare <[email protected]>

* test

Signed-off-by: Yee Hing Tong <[email protected]>

* try another setting

Signed-off-by: Yee Hing Tong <[email protected]>

* just fsspec

Signed-off-by: Yee Hing Tong <[email protected]>

* missing s

Signed-off-by: Yee Hing Tong <[email protected]>

* Addressed comments

Signed-off-by: Ketan Umare <[email protected]>

* test fix

Signed-off-by: Ketan Umare <[email protected]>

* Great Expectations FlyteFile modifications

Signed-off-by: Samhita Alla <[email protected]>

* remove shutil package

Signed-off-by: Samhita Alla <[email protected]>

Co-authored-by: Yee Hing Tong <[email protected]>
Co-authored-by: Samhita Alla <[email protected]>
  • Loading branch information
3 people authored Aug 19, 2021
1 parent ef4a26d commit 67f0fe4
Show file tree
Hide file tree
Showing 42 changed files with 1,407 additions and 472 deletions.
15 changes: 15 additions & 0 deletions docs/source/data.extend.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
##############################
Extend Data Persistence layer
##############################
Flytekit provides a data persistence layer, which is used for recording metadata that is shared with backend Flyte. This persistence layer is also available for various types to store raw user data and is designed to be cross-cloud compatible.
Moreover, it is design to be extensible and users can bring their own data persistence plugins by following the persistence interface. NOTE, this is bound to get more extensive for variety of use-cases, but the core set of apis are battle tested.

.. automodule:: flytekit.core.data_persistence
:no-members:
:no-inherited-members:
:no-special-members:

.. automodule:: flytekit.extras.persistence
:no-members:
:no-inherited-members:
:no-special-members:
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,5 @@ Expected output:
extend
tasks.extend
types.extend
data.extend
contributing
46 changes: 46 additions & 0 deletions flytekit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,20 @@
"""

import sys

if sys.version_info < (3, 10):
from importlib_metadata import entry_points
else:
from importlib.metadata import entry_points

import flytekit.plugins # This will be deprecated, these are the old plugins, the new plugins live in plugins/
from flytekit.core.base_sql_task import SQLTask
from flytekit.core.base_task import SecurityContext, TaskMetadata, kwtypes
from flytekit.core.condition import conditional
from flytekit.core.container_task import ContainerTask
from flytekit.core.context_manager import ExecutionParameters, FlyteContext, FlyteContextManager
from flytekit.core.data_persistence import DataPersistence, DataPersistencePlugins
from flytekit.core.dynamic_workflow_task import dynamic
from flytekit.core.launch_plan import LaunchPlan
from flytekit.core.map_task import map_task
Expand All @@ -153,6 +160,7 @@
from flytekit.core.task import Secret, reference_task, task
from flytekit.core.workflow import ImperativeWorkflow as Workflow
from flytekit.core.workflow import WorkflowFailurePolicy, reference_workflow, workflow
from flytekit.extras.persistence import GCSPersistence, HttpPersistence, S3Persistence
from flytekit.loggers import logger
from flytekit.types import directory, file, schema

Expand All @@ -173,3 +181,41 @@ def current_context() -> ExecutionParameters:
There are some special params, that should be available
"""
return FlyteContextManager.current_context().execution_state.user_space_params


def load_implicit_plugins():
"""
This method allows loading all plugins that have the entrypoint specification. This uses the plugin loading
behavior as explained `here <>`_.
This is an opt in system and plugins that have an implicit loading requirement should add the implicit loading
entrypoint specification to their setup.py. The following example shows how we can autoload a module called fsspec
(whose init files contains the necessary plugin registration step)
.. code-block::
# note the group is always ``flytekit.plugins``
setup(
...
entry_points={'flytekit.plugins’: 'fsspec=flytekitplugins.fsspec'},
...
)
This works as long as the fsspec module has
.. code-block::
# For data persistence plugins
DataPersistencePlugins.register_plugin(f"{k}://", FSSpecPersistence, force=True)
# OR for type plugins
TypeEngine.register(PanderaTransformer())
# etc
"""
discovered_plugins = entry_points(group="flytekit.plugins")
for p in discovered_plugins:
p.load()


# Load all implicit plugins
load_implicit_plugins()
46 changes: 11 additions & 35 deletions flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from flytekit.common.tasks.sdk_runnable import ExecutionParameters
from flytekit.configuration import TemporaryConfiguration as _TemporaryConfiguration
from flytekit.configuration import internal as _internal_config
from flytekit.configuration import platform as _platform_config
from flytekit.configuration import sdk as _sdk_config
from flytekit.core.base_task import IgnoreOutputs, PythonTask
from flytekit.core.context_manager import (
Expand All @@ -31,13 +30,12 @@
SerializationSettings,
get_image_config,
)
from flytekit.core.data_persistence import FileAccessProvider
from flytekit.core.map_task import MapPythonTask
from flytekit.core.promise import VoidPromise
from flytekit.engines import loader as _engine_loader
from flytekit.interfaces import random as _flyte_random
from flytekit.interfaces.data import data_proxy as _data_proxy
from flytekit.interfaces.data.gcs import gcs_proxy as _gcs_proxy
from flytekit.interfaces.data.s3 import s3proxy as _s3proxy
from flytekit.interfaces.stats.taggable import get_stats as _get_stats
from flytekit.models import dynamic_job as _dynamic_job
from flytekit.models import literals as _literal_models
Expand Down Expand Up @@ -176,7 +174,7 @@ def _dispatch_execute(
for k, v in output_file_dict.items():
_common_utils.write_proto_to_file(v.to_flyte_idl(), _os.path.join(ctx.execution_state.engine_dir, k))

ctx.file_access.upload_directory(ctx.execution_state.engine_dir, output_prefix)
ctx.file_access.put_data(ctx.execution_state.engine_dir, output_prefix, is_multipart=True)
_logging.info(f"Engine folder written successfully to the output prefix {output_prefix}")


Expand All @@ -186,14 +184,13 @@ def setup_execution(
dynamic_addl_distro: str = None,
dynamic_dest_dir: str = None,
):
cloud_provider = _platform_config.CLOUD_PROVIDER.get()
log_level = _internal_config.LOGGING_LEVEL.get() or _sdk_config.LOGGING_LEVEL.get()
_logging.getLogger().setLevel(log_level)

ctx = FlyteContextManager.current_context()

# Create directories
user_workspace_dir = ctx.file_access.local_access.get_random_directory()
user_workspace_dir = ctx.file_access.get_random_local_directory()
_click.echo(f"Using user directory {user_workspace_dir}")
pathlib.Path(user_workspace_dir).mkdir(parents=True, exist_ok=True)
from flytekit import __version__ as _api_version
Expand Down Expand Up @@ -226,39 +223,18 @@ def setup_execution(
tmp_dir=user_workspace_dir,
)

# This rather ugly condition will be going away with #559. We first check the raw output prefix, and if missing,
# we fall back to the logic of checking the cloud provider. The reason we have to check for the existence of the
# raw_output_data_prefix arg first is because it may be set to None by execute_task_cmd. That is there to support
# the corner case of a really old propeller that is still not filling in the raw output prefix template.
# TODO: Remove this check for flytekit 1.0
if raw_output_data_prefix:
if raw_output_data_prefix.startswith("s3:/"):
file_access = _data_proxy.FileAccessProvider(
try:
file_access = FileAccessProvider(
local_sandbox_dir=_sdk_config.LOCAL_SANDBOX.get(),
remote_proxy=_s3proxy.AwsS3Proxy(raw_output_data_prefix),
raw_output_prefix=raw_output_data_prefix,
)
elif raw_output_data_prefix.startswith("gs:/"):
file_access = _data_proxy.FileAccessProvider(
local_sandbox_dir=_sdk_config.LOCAL_SANDBOX.get(),
remote_proxy=_gcs_proxy.GCSProxy(raw_output_data_prefix),
)
elif raw_output_data_prefix.startswith("file") or raw_output_data_prefix.startswith("/"):
# A fake remote using the local disk will automatically be created
file_access = _data_proxy.FileAccessProvider(local_sandbox_dir=_sdk_config.LOCAL_SANDBOX.get())
elif cloud_provider == _constants.CloudProvider.AWS:
file_access = _data_proxy.FileAccessProvider(
local_sandbox_dir=_sdk_config.LOCAL_SANDBOX.get(),
remote_proxy=_s3proxy.AwsS3Proxy(raw_output_data_prefix),
)
elif cloud_provider == _constants.CloudProvider.GCP:
file_access = _data_proxy.FileAccessProvider(
local_sandbox_dir=_sdk_config.LOCAL_SANDBOX.get(),
remote_proxy=_gcs_proxy.GCSProxy(raw_output_data_prefix),
)
elif cloud_provider == _constants.CloudProvider.LOCAL:
# A fake remote using the local disk will automatically be created
file_access = _data_proxy.FileAccessProvider(local_sandbox_dir=_sdk_config.LOCAL_SANDBOX.get())
except TypeError: # would be thrown from DataPersistencePlugins.find_plugin
_logging.error(f"No data plugin found for raw output prefix {raw_output_data_prefix}")
raise
else:
raise Exception(f"Bad cloud provider {cloud_provider}")
raise Exception("No raw output prefix detected. Please upgrade your version of Propeller to 0.4.0 or later.")

with FlyteContextManager.with_context(ctx.with_file_access(file_access)) as ctx:
# TODO: This is copied from serialize, which means there's a similarity here I'm not seeing.
Expand Down
14 changes: 7 additions & 7 deletions flytekit/core/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
from flytekit.common.tasks.sdk_runnable import ExecutionParameters
from flytekit.configuration import images, internal
from flytekit.configuration import sdk as _sdk_config
from flytekit.core.data_persistence import FileAccessProvider, default_local_file_access_provider
from flytekit.engines.unit import mock_stats as _mock_stats
from flytekit.interfaces.data import data_proxy as _data_proxy
from flytekit.models.core import identifier as _identifier

# TODO: resolve circular import from flytekit.core.python_auto_container import TaskResolverMixin
Expand Down Expand Up @@ -414,7 +414,7 @@ class FlyteContext(object):
Please do not confuse this object with the :py:class:`flytekit.ExecutionParameters` object.
"""

file_access: Optional[_data_proxy.FileAccessProvider]
file_access: Optional[FileAccessProvider]
level: int = 0
flyte_client: Optional[friendly_client.SynchronousFlyteClient] = None
compilation_state: Optional[CompilationState] = None
Expand Down Expand Up @@ -462,7 +462,7 @@ def with_compilation_state(self, c: CompilationState) -> Builder:
def with_new_compilation_state(self) -> Builder:
return self.with_compilation_state(self.new_compilation_state())

def with_file_access(self, fa: _data_proxy.FileAccessProvider) -> Builder:
def with_file_access(self, fa: FileAccessProvider) -> Builder:
return self.new_builder().with_file_access(fa)

def with_serialization_settings(self, ss: SerializationSettings) -> Builder:
Expand All @@ -481,7 +481,7 @@ def new_execution_state(self, working_dir: Optional[os.PathLike] = None) -> Exec
in all other cases it is preferable to use with_execution_state
"""
if not working_dir:
working_dir = self.file_access.get_random_local_directory()
working_dir = self.file_access.local_sandbox_dir
return ExecutionState(working_dir=working_dir, user_space_params=self.user_space_params)

@staticmethod
Expand All @@ -497,7 +497,7 @@ def current_context() -> FlyteContext:

@dataclass
class Builder(object):
file_access: _data_proxy.FileAccessProvider
file_access: FileAccessProvider
level: int = 0
compilation_state: Optional[CompilationState] = None
execution_state: Optional[ExecutionState] = None
Expand Down Expand Up @@ -550,7 +550,7 @@ def with_compilation_state(self, c: CompilationState) -> "Builder":
def with_new_compilation_state(self) -> "Builder":
return self.with_compilation_state(self.new_compilation_state())

def with_file_access(self, fa: _data_proxy.FileAccessProvider) -> "Builder":
def with_file_access(self, fa: FileAccessProvider) -> "Builder":
self.file_access = fa
return self

Expand Down Expand Up @@ -684,7 +684,7 @@ def initialize():
logging=_logging,
tmp_dir=user_space_path,
)
default_context = FlyteContext(file_access=_data_proxy.default_local_file_access_provider)
default_context = FlyteContext(file_access=default_local_file_access_provider)
default_context = default_context.with_execution_state(
default_context.new_execution_state().with_params(user_space_params=default_user_space_params)
).build()
Expand Down
Loading

0 comments on commit 67f0fe4

Please sign in to comment.