Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bring your own Data plugins for flytekit #559

Merged
merged 48 commits into from
Aug 19, 2021
Merged

Bring your own Data plugins for flytekit #559

merged 48 commits into from
Aug 19, 2021

Conversation

kumare3
Copy link
Contributor

@kumare3 kumare3 commented Jul 19, 2021

TL;DR

Flytekit depends on some additional cloud-provided tooling for its typical operation. Specifically, when downloading or uploading task inputs or outputs, or when downloading or uploading blob-storage-based literals (files, schemas, folders), it makes use cloud provider CLI's like aws cli and gsutil to handle the transfer. This is a bit of a legacy decision, which was made prior to python3 (in python2 days). Without asyncio, the performance of the downloader would not match the ones offered by the CLIs.
Moreover, using the CLI's actually removes coupling with cloud provider SDKs and hence keep the flytekit core dependency free. This has really worked well and we will continue to keep it going forward. But, we would love for contributors to bring in new ideas on handling this data - for example use non blob stores (like bigtable) for the metadata, or write native clients for managing the data or use a mature project like fsspec

The PR introduces a new Persistence Interface and provides extras for aws cli and gsutil. Users have to simply install
pip install awscli or pip install gsutil to use these extras. Moreover, the Persistence layer allows plugins and overriding the default data extras that are bundled.

This RFC provides the design intuition. An example data plugin will be added as a follow on, and we will use automatic plugin discovery to autoload the plugin.

Type

  • Bug Fix
  • Feature
  • Plugin

Are all requirements met?

  • Code completed
  • Smoke tested
  • Unit tests added
  • Code documentation added
  • Any pending items have an associated Issue

Complete description

  • Added a DataPersistence base class. This encapsulates the behavior of a blob storage provider and has things like get/list/put, etc.
  • Added a DataPersistencePlugins class - this is the keeper of plugins, similar to how the TypeEngine keeps track of type transformers
  • Added a DiskPersistence that inherits the DataPersistence class. The local FS can be thought of as a storage provider - this object fakes that. Paths that begin with / or file:// by default will use this persistence plugin.
  • Moved FileAccessProvider to under the core/ folder. This object is the high level interface that the rest of the flytekit platform and its plugins interact with. Users/contributors should not need to work with the underlying DataPersistence classes (unless of course you're implementing one).

Tracking Issue

fixes flyteorg/flyte#809

kumare3 added 2 commits July 14, 2021 09:18
Signed-off-by: Ketan Umare <[email protected]>
Signed-off-by: Ketan Umare <[email protected]>
@wild-endeavor wild-endeavor mentioned this pull request Jul 27, 2021
Signed-off-by: wild-endeavor <[email protected]>
Signed-off-by: wild-endeavor <[email protected]>
docs/source/data.extend.rst Outdated Show resolved Hide resolved
flytekit/core/data_persistence.py Show resolved Hide resolved
flytekit/core/data_persistence.py Outdated Show resolved Hide resolved
flytekit/core/data_persistence.py Outdated Show resolved Hide resolved
flytekit/core/data_persistence.py Outdated Show resolved Hide resolved
flytekit/extras/persistence/http.py Outdated Show resolved Hide resolved
flytekit/extras/persistence/http.py Outdated Show resolved Hide resolved
flytekit/extras/persistence/s3_awscli.py Outdated Show resolved Hide resolved
flytekit/extras/persistence/s3_awscli.py Outdated Show resolved Hide resolved
flytekit/extras/persistence/gcs_gsutil.py Outdated Show resolved Hide resolved
@codecov
Copy link

codecov bot commented Jul 30, 2021

Codecov Report

Merging #559 (67303a0) into master (02496df) will decrease coverage by 0.13%.
The diff coverage is 80.67%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #559      +/-   ##
==========================================
- Coverage   85.73%   85.59%   -0.14%     
==========================================
  Files         384      392       +8     
  Lines       30257    30576     +319     
  Branches     2427     2456      +29     
==========================================
+ Hits        25940    26173     +233     
- Misses       3660     3731      +71     
- Partials      657      672      +15     
Impacted Files Coverage Δ
flytekit/interfaces/data/data_proxy.py 90.58% <ø> (+9.18%) ⬆️
...tions/flytekitplugins/great_expectations/schema.py 83.68% <0.00%> (-1.32%) ⬇️
plugins/tests/greatexpectations/test_schema.py 98.56% <ø> (-0.19%) ⬇️
plugins/tests/greatexpectations/test_task.py 96.99% <ø> (-0.40%) ⬇️
...tations/flytekitplugins/great_expectations/task.py 78.37% <40.00%> (-0.39%) ⬇️
flytekit/extras/persistence/gcs_gsutil.py 60.65% <60.65%> (ø)
flytekit/core/data_persistence.py 70.93% <70.93%> (ø)
flytekit/extras/persistence/http.py 81.57% <81.57%> (ø)
flytekit/extras/persistence/s3_awscli.py 83.16% <83.16%> (ø)
flytekit/interfaces/data/common.py 60.00% <83.33%> (+3.75%) ⬆️
... and 28 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 02496df...67303a0. Read the comment docs.

@wild-endeavor wild-endeavor mentioned this pull request Jul 30, 2021
kumare3 and others added 3 commits July 30, 2021 22:07
Signed-off-by: wild-endeavor <[email protected]>
Signed-off-by: Ketan Umare <[email protected]>
Signed-off-by: Ketan Umare <[email protected]>
kumare3 pushed a commit that referenced this pull request Jul 31, 2021
Signed-off-by: wild-endeavor <[email protected]>
Signed-off-by: Ketan Umare <[email protected]>
Signed-off-by: Ketan Umare <[email protected]>
@kumare3
Copy link
Contributor Author

kumare3 commented Aug 17, 2021

cc @wild-endeavor this is ready for review

@wild-endeavor
Copy link
Contributor

Not sure how to fix this, but the error is ModuleNotFoundError: No module named 'flytekitplugins.fsspec'. I suspect what's happening is that the github workflow is running make -C plugins install-all-dev to install the plugins.

We could add the fsspec entry into the plugins/setup.py file so that install-all-dev picks it up, but I don't think we should. If we do that, then developers by default when they run that command, will get this plugin, which would interfere with the base aws/gcp plugin. ie, plugins that are loaded by default should not be installed by default.

we can add a separate command after this?

@kumare3
Copy link
Contributor Author

kumare3 commented Aug 17, 2021

@wild-endeavor This can create other problems, if we add another data plugin or the like.
Should each plugin be an isolated package, complete with its own tests?
I know you said, we cannot do this, can you remind me why not?

tekumara
tekumara previously approved these changes Aug 17, 2021
Copy link
Contributor

@tekumara tekumara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM ❤️ just added some minor comments .... like @wild-endeavor I ran into some issues getting the plugins installed so I could run the tests.

def construct_path(self, _: bool, add_prefix: bool, *args) -> str:
# Ignore add_protocol for now. Only complicates things
if add_prefix:
return os.path.join(self.default_prefix, *args)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.default_prefix can be None, which will throw a TypeError.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

pass

@abstractmethod
def construct_path(self, add_protocol: bool, add_prefix: bool, *paths) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def construct_path(self, add_protocol: bool, add_prefix: bool, *paths) -> str:
def construct_path(self, add_protocol: bool, add_prefix: bool, *paths: str) -> str:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

durable store.
"""

def __init__(self, local_sandbox_dir: Union[str, os.PathLike], raw_output_prefix: str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got confused by the concept sandbox here, because I thought it was referring to the Flyte sandbox.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya we call it the output prefix or local data dir

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess i can update it, but would like to do that as a separate PR

import os
import pathlib

import requests as _requests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for my own curiosity, i was wondering why you rename imports?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha, this is an artifact of some previous work in flytekit. the reasoning was to avoid pollution of the import space.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the new flytekit, we should just get rid of it, I will, good call out

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done for all new files

return _update_cmd_config_and_execute(cmd)

def construct_path(self, add_protocol: bool, add_prefix: bool, *paths) -> str:
paths = list(paths) # make type check happy
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mypy and pyright are still failing here

Copy link
Contributor

@cosmicBboy cosmicBboy Aug 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunately we're not really strictly following mypy right now... for reasons I forget (?) @kumare3 @wild-endeavor

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should - we just haven't time to go through the whole codebase and clean everything up.

version=__version__,
author="flyteorg",
author_email="[email protected]",
description="This package holds Hive plugins for flytekit",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
description="This package holds Hive plugins for flytekit",
description="This package holds the fsspec data persistence plugin for flytekit",

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return fsspec.filesystem(protocol, **kwargs)

@staticmethod
def recursive_paths(f: str, t: str) -> (str, str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def recursive_paths(f: str, t: str) -> (str, str):
def recursive_paths(f: str, t: str) -> typing.Tuple[str, str]:

# The s3api command returns an error if the object does not exist. The error message contains
# the http status code: "An error occurred (404) when calling the HeadObject operation: Not Found"
# This is a best effort for returning if the object does not exist by searching
# for existence of (404) in the error message. This should not be needed when we get off the cli and use lib
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the plan to replace the cli with fsspec using s3fs? or a boto implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we cannot replace, as there are existing users of this. but users can optionally install pip install flytekitplugins-data-fsspec or other such solutions in the future

raise _FlyteUserException("AWS CLI not found! Please install it with `pip install awscli`.")

@staticmethod
def _split_s3_path_to_bucket_and_key(path: str) -> (str, str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To fix the syntax error in mypy/pyright:

Suggested change
def _split_s3_path_to_bucket_and_key(path: str) -> (str, str):
def _split_s3_path_to_bucket_and_key(path: str) -> Tuple[str, str]:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
wild-endeavor
wild-endeavor previously approved these changes Aug 17, 2021
Signed-off-by: Yee Hing Tong <[email protected]>
wild-endeavor
wild-endeavor previously approved these changes Aug 18, 2021
Signed-off-by: Ketan Umare <[email protected]>
wild-endeavor
wild-endeavor previously approved these changes Aug 18, 2021
Signed-off-by: Ketan Umare <[email protected]>
wild-endeavor
wild-endeavor previously approved these changes Aug 18, 2021
tekumara
tekumara previously approved these changes Aug 19, 2021
@samhita-alla samhita-alla dismissed stale reviews from tekumara and wild-endeavor via 35920c7 August 19, 2021 11:32
Signed-off-by: Samhita Alla <[email protected]>
@samhita-alla
Copy link
Contributor

samhita-alla commented Aug 19, 2021

@kumare3 / @eapolinario Tests are passing now! Modified the Great Expectations plugin to accept remote FlyteFiles only. If the user wants to give a local dataset file, it can only be given as a string. I've also removed the concerned local FlyteFile tests. See this commit: 35920c7.

When given a local dataset file that has a relative path, FlyteFile is not working because of this:

    def is_remote(path: Union[str, os.PathLike]) -> bool:
        """
        Deprecated. Lets find a replacement
        """
        return not (path.startswith("/") or path.startswith("file://"))

I've not added FlyteFile's download() method as it didn't seem necessary. shutil.copy() automatically downloads the file by calling __fspath__. If the file is still remote, I'm using FlyteContext's get_data() method, just to be on the safe side.

@kumare3 kumare3 merged commit 67f0fe4 into master Aug 19, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Plugin][Flytekit] flytekit storage layer as plugins
5 participants