Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bring your own Data plugins for flytekit #559

Merged
merged 48 commits into from
Aug 19, 2021
Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
ecc2bc8
Name attribute added
kumare3 Jul 14, 2021
ea87889
Data persistence work in progress
kumare3 Jul 19, 2021
9f3b30d
updated data persistence api
kumare3 Jul 22, 2021
4f686bf
Merge branch 'master' into data-plugins
kumare3 Jul 22, 2021
d994c02
Merge branch 'master' into data-plugins
kumare3 Jul 23, 2021
f310696
tried my hand at docs - failed!
kumare3 Jul 23, 2021
345839b
wip - pr into #559 (#565)
wild-endeavor Jul 27, 2021
5468447
merge master
wild-endeavor Jul 27, 2021
d4b1135
Merge branch 'master' into data-plugins
kumare3 Jul 31, 2021
d280ea2
simplify code
wild-endeavor Jul 30, 2021
7bf388d
Addressing comments
kumare3 Jul 31, 2021
b64ce6d
Merge branch 'master' into data-plugins
kumare3 Aug 10, 2021
07d58d3
black format
kumare3 Aug 10, 2021
4064d6b
Fixed lint errors
kumare3 Aug 10, 2021
a0d5403
Added unit tests
kumare3 Aug 11, 2021
3bbbe09
fixed lint and added more tests
kumare3 Aug 11, 2021
656b729
Merge branch 'master' into data-plugins
kumare3 Aug 11, 2021
a39f597
updated
kumare3 Aug 12, 2021
efa5234
delete this
kumare3 Aug 12, 2021
7112b0b
Revert "delete this"
kumare3 Aug 12, 2021
8329d5e
Fsspec DataPersistence plugin (#596)
kumare3 Aug 13, 2021
055a1eb
FSSpec debugging and defaults
kumare3 Aug 13, 2021
e880a26
FSSpec implementation
kumare3 Aug 13, 2021
88f7b57
updated fsspec plugin
kumare3 Aug 13, 2021
126c2ac
minio support
kumare3 Aug 13, 2021
4437dae
persist fix
kumare3 Aug 13, 2021
325b669
fixed
kumare3 Aug 13, 2021
8726b4f
updated
kumare3 Aug 13, 2021
880a07d
client args
kumare3 Aug 14, 2021
2fc40d5
upload to a dir
kumare3 Aug 15, 2021
d4167fe
updated
kumare3 Aug 15, 2021
40c9864
fixed paths
kumare3 Aug 15, 2021
d4c1855
updated
kumare3 Aug 15, 2021
744d67f
updated
kumare3 Aug 15, 2021
8a2e23f
update put logic
kumare3 Aug 16, 2021
871372d
updated
kumare3 Aug 16, 2021
664394f
remove tempfile
wild-endeavor Aug 16, 2021
6a7f6d7
unit tests added
kumare3 Aug 17, 2021
148fc57
lint fix
kumare3 Aug 17, 2021
2fcc43b
test
wild-endeavor Aug 17, 2021
88958c7
try another setting
wild-endeavor Aug 17, 2021
a626c4d
just fsspec
wild-endeavor Aug 17, 2021
3521bb3
missing s
wild-endeavor Aug 18, 2021
440dc8e
Addressed comments
kumare3 Aug 18, 2021
93f14d0
test fix
kumare3 Aug 18, 2021
1e56bf3
Merge branch 'master' into data-plugins
kumare3 Aug 18, 2021
35920c7
Great Expectations FlyteFile modifications
samhita-alla Aug 19, 2021
67303a0
remove shutil package
samhita-alla Aug 19, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/source/data.extend.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
##############################
Extend Data Persistence layer
##############################
Flytekit provides a data persistence layer, which is used for recording metadata that is shared with backend Flyte. This persistence layer is also available for various types to store raw user data and is designed to be cross-cloud compatible.
Moreover, it is design to be extensible and users can bring their own data persistence plugins by following the persistence interface. NOTE, this is bound to get more extensive for variety of use-cases, but the core set of apis are battle tested.

.. automodule:: flytekit.core.data_persistence
:no-members:
:no-inherited-members:
:no-special-members:

.. automodule:: flytekit.extras.persistence
:no-members:
:no-inherited-members:
:no-special-members:
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,5 @@ Expected output:
extend
tasks.extend
types.extend
data.extend
contributing
46 changes: 46 additions & 0 deletions flytekit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,20 @@

"""

import sys

if sys.version_info < (3, 10):
from importlib_metadata import entry_points
else:
from importlib.metadata import entry_points

import flytekit.plugins # This will be deprecated, these are the old plugins, the new plugins live in plugins/
from flytekit.core.base_sql_task import SQLTask
from flytekit.core.base_task import SecurityContext, TaskMetadata, kwtypes
from flytekit.core.condition import conditional
from flytekit.core.container_task import ContainerTask
from flytekit.core.context_manager import ExecutionParameters, FlyteContext, FlyteContextManager
from flytekit.core.data_persistence import DataPersistence, DataPersistencePlugins
from flytekit.core.dynamic_workflow_task import dynamic
from flytekit.core.launch_plan import LaunchPlan
from flytekit.core.map_task import map_task
Expand All @@ -153,6 +160,7 @@
from flytekit.core.task import Secret, reference_task, task
from flytekit.core.workflow import ImperativeWorkflow as Workflow
from flytekit.core.workflow import WorkflowFailurePolicy, reference_workflow, workflow
from flytekit.extras.persistence import GCSPersistence, HttpPersistence, S3Persistence
from flytekit.loggers import logger
from flytekit.types import schema

Expand All @@ -173,3 +181,41 @@ def current_context() -> ExecutionParameters:
There are some special params, that should be available
"""
return FlyteContextManager.current_context().execution_state.user_space_params


def load_implicit_plugins():
"""
This method allows loading all plugins that have the entrypoint specification. This uses the plugin loading
behavior as explained `here <>`_.

This is an opt in system and plugins that have an implicit loading requirement should add the implicit loading
entrypoint specification to their setup.py. The following example shows how we can autoload a module called fsspec
(whose init files contains the necessary plugin registration step)

.. code-block::

# note the group is always ``flytekit.plugins``
setup(
...
entry_points={'flytekit.plugins’: 'fsspec=flytekitplugins.fsspec'},
...
)

This works as long as the fsspec module has

.. code-block::

# For data persistence plugins
DataPersistencePlugins.register_plugin(f"{k}://", FSSpecPersistence, force=True)
# OR for type plugins
TypeEngine.register(PanderaTransformer())
# etc

"""
discovered_plugins = entry_points(group="flytekit.plugins")
for p in discovered_plugins:
p.load()


# Load all implicit plugins
load_implicit_plugins()
46 changes: 11 additions & 35 deletions flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from flytekit.common.tasks.sdk_runnable import ExecutionParameters
from flytekit.configuration import TemporaryConfiguration as _TemporaryConfiguration
from flytekit.configuration import internal as _internal_config
from flytekit.configuration import platform as _platform_config
from flytekit.configuration import sdk as _sdk_config
from flytekit.core.base_task import IgnoreOutputs, PythonTask
from flytekit.core.context_manager import (
Expand All @@ -31,13 +30,12 @@
SerializationSettings,
get_image_config,
)
from flytekit.core.data_persistence import FileAccessProvider
from flytekit.core.map_task import MapPythonTask
from flytekit.core.promise import VoidPromise
from flytekit.engines import loader as _engine_loader
from flytekit.interfaces import random as _flyte_random
from flytekit.interfaces.data import data_proxy as _data_proxy
from flytekit.interfaces.data.gcs import gcs_proxy as _gcs_proxy
from flytekit.interfaces.data.s3 import s3proxy as _s3proxy
from flytekit.interfaces.stats.taggable import get_stats as _get_stats
from flytekit.models import dynamic_job as _dynamic_job
from flytekit.models import literals as _literal_models
Expand Down Expand Up @@ -176,7 +174,7 @@ def _dispatch_execute(
for k, v in output_file_dict.items():
_common_utils.write_proto_to_file(v.to_flyte_idl(), _os.path.join(ctx.execution_state.engine_dir, k))

ctx.file_access.upload_directory(ctx.execution_state.engine_dir, output_prefix)
ctx.file_access.put_data(ctx.execution_state.engine_dir, output_prefix, is_multipart=True)
_logging.info(f"Engine folder written successfully to the output prefix {output_prefix}")


Expand All @@ -186,14 +184,13 @@ def setup_execution(
dynamic_addl_distro: str = None,
dynamic_dest_dir: str = None,
):
cloud_provider = _platform_config.CLOUD_PROVIDER.get()
log_level = _internal_config.LOGGING_LEVEL.get() or _sdk_config.LOGGING_LEVEL.get()
_logging.getLogger().setLevel(log_level)

ctx = FlyteContextManager.current_context()

# Create directories
user_workspace_dir = ctx.file_access.local_access.get_random_directory()
user_workspace_dir = ctx.file_access.get_random_local_directory()
_click.echo(f"Using user directory {user_workspace_dir}")
pathlib.Path(user_workspace_dir).mkdir(parents=True, exist_ok=True)
from flytekit import __version__ as _api_version
Expand Down Expand Up @@ -226,39 +223,18 @@ def setup_execution(
tmp_dir=user_workspace_dir,
)

# This rather ugly condition will be going away with #559. We first check the raw output prefix, and if missing,
# we fall back to the logic of checking the cloud provider. The reason we have to check for the existence of the
# raw_output_data_prefix arg first is because it may be set to None by execute_task_cmd. That is there to support
# the corner case of a really old propeller that is still not filling in the raw output prefix template.
# TODO: Remove this check for flytekit 1.0
if raw_output_data_prefix:
if raw_output_data_prefix.startswith("s3:/"):
file_access = _data_proxy.FileAccessProvider(
try:
file_access = FileAccessProvider(
local_sandbox_dir=_sdk_config.LOCAL_SANDBOX.get(),
remote_proxy=_s3proxy.AwsS3Proxy(raw_output_data_prefix),
raw_output_prefix=raw_output_data_prefix,
)
elif raw_output_data_prefix.startswith("gs:/"):
file_access = _data_proxy.FileAccessProvider(
local_sandbox_dir=_sdk_config.LOCAL_SANDBOX.get(),
remote_proxy=_gcs_proxy.GCSProxy(raw_output_data_prefix),
)
elif raw_output_data_prefix.startswith("file") or raw_output_data_prefix.startswith("/"):
# A fake remote using the local disk will automatically be created
file_access = _data_proxy.FileAccessProvider(local_sandbox_dir=_sdk_config.LOCAL_SANDBOX.get())
elif cloud_provider == _constants.CloudProvider.AWS:
file_access = _data_proxy.FileAccessProvider(
local_sandbox_dir=_sdk_config.LOCAL_SANDBOX.get(),
remote_proxy=_s3proxy.AwsS3Proxy(raw_output_data_prefix),
)
elif cloud_provider == _constants.CloudProvider.GCP:
file_access = _data_proxy.FileAccessProvider(
local_sandbox_dir=_sdk_config.LOCAL_SANDBOX.get(),
remote_proxy=_gcs_proxy.GCSProxy(raw_output_data_prefix),
)
elif cloud_provider == _constants.CloudProvider.LOCAL:
# A fake remote using the local disk will automatically be created
file_access = _data_proxy.FileAccessProvider(local_sandbox_dir=_sdk_config.LOCAL_SANDBOX.get())
except TypeError: # would be thrown from DataPersistencePlugins.find_plugin
_logging.error(f"No data plugin found for raw output prefix {raw_output_data_prefix}")
raise
else:
raise Exception(f"Bad cloud provider {cloud_provider}")
raise Exception("No raw output prefix detected. Please upgrade your version of Propeller to 0.4.0 or later.")

with FlyteContextManager.with_context(ctx.with_file_access(file_access)) as ctx:
# TODO: This is copied from serialize, which means there's a similarity here I'm not seeing.
Expand Down
14 changes: 7 additions & 7 deletions flytekit/core/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
from flytekit.common.tasks.sdk_runnable import ExecutionParameters
from flytekit.configuration import images, internal
from flytekit.configuration import sdk as _sdk_config
from flytekit.core.data_persistence import FileAccessProvider, default_local_file_access_provider
from flytekit.engines.unit import mock_stats as _mock_stats
from flytekit.interfaces.data import data_proxy as _data_proxy
from flytekit.models.core import identifier as _identifier

# TODO: resolve circular import from flytekit.core.python_auto_container import TaskResolverMixin
Expand Down Expand Up @@ -414,7 +414,7 @@ class FlyteContext(object):
Please do not confuse this object with the :py:class:`flytekit.ExecutionParameters` object.
"""

file_access: Optional[_data_proxy.FileAccessProvider]
file_access: Optional[FileAccessProvider]
level: int = 0
flyte_client: Optional[friendly_client.SynchronousFlyteClient] = None
compilation_state: Optional[CompilationState] = None
Expand Down Expand Up @@ -462,7 +462,7 @@ def with_compilation_state(self, c: CompilationState) -> Builder:
def with_new_compilation_state(self) -> Builder:
return self.with_compilation_state(self.new_compilation_state())

def with_file_access(self, fa: _data_proxy.FileAccessProvider) -> Builder:
def with_file_access(self, fa: FileAccessProvider) -> Builder:
return self.new_builder().with_file_access(fa)

def with_serialization_settings(self, ss: SerializationSettings) -> Builder:
Expand All @@ -481,7 +481,7 @@ def new_execution_state(self, working_dir: Optional[os.PathLike] = None) -> Exec
in all other cases it is preferable to use with_execution_state
"""
if not working_dir:
working_dir = self.file_access.get_random_local_directory()
working_dir = self.file_access.local_sandbox_dir
return ExecutionState(working_dir=working_dir, user_space_params=self.user_space_params)

@staticmethod
Expand All @@ -497,7 +497,7 @@ def current_context() -> FlyteContext:

@dataclass
class Builder(object):
file_access: _data_proxy.FileAccessProvider
file_access: FileAccessProvider
level: int = 0
compilation_state: Optional[CompilationState] = None
execution_state: Optional[ExecutionState] = None
Expand Down Expand Up @@ -550,7 +550,7 @@ def with_compilation_state(self, c: CompilationState) -> "Builder":
def with_new_compilation_state(self) -> "Builder":
return self.with_compilation_state(self.new_compilation_state())

def with_file_access(self, fa: _data_proxy.FileAccessProvider) -> "Builder":
def with_file_access(self, fa: FileAccessProvider) -> "Builder":
self.file_access = fa
return self

Expand Down Expand Up @@ -684,7 +684,7 @@ def initialize():
logging=_logging,
tmp_dir=user_space_path,
)
default_context = FlyteContext(file_access=_data_proxy.default_local_file_access_provider)
default_context = FlyteContext(file_access=default_local_file_access_provider)
default_context = default_context.with_execution_state(
default_context.new_execution_state().with_params(user_space_params=default_user_space_params)
).build()
Expand Down
Loading