From e905d311d2e9b54339df61684d2e17aa12a6833a Mon Sep 17 00:00:00 2001 From: pryce-turner Date: Mon, 22 Jul 2024 14:47:29 -0700 Subject: [PATCH 1/5] Added alt prefix head to FlyteFile.new_remote Signed-off-by: pryce-turner --- flytekit/types/file/file.py | 12 ++++++++++-- tests/flytekit/unit/types/file/test_types.py | 6 ++++++ 2 files changed, 16 insertions(+), 2 deletions(-) create mode 100644 tests/flytekit/unit/types/file/test_types.py diff --git a/flytekit/types/file/file.py b/flytekit/types/file/file.py index cc7ba66bed..44d2f112d8 100644 --- a/flytekit/types/file/file.py +++ b/flytekit/types/file/file.py @@ -179,13 +179,21 @@ def extension(cls) -> str: return "" @classmethod - def new_remote_file(cls, name: typing.Optional[str] = None) -> FlyteFile: + def new_remote_file(cls, alt: typing.Optional[str] = None, name: typing.Optional[str] = None) -> FlyteFile: """ Create a new FlyteFile object with a remote path. + + :param alt: If you want to specify a different prefix head than the default one, you can specify it here. + :param name: If you want to specify a different name for the file, you can specify it here. """ ctx = FlyteContextManager.current_context() r = name or ctx.file_access.get_random_string() - remote_path = ctx.file_access.join(ctx.file_access.raw_output_prefix, r) + pref = ctx.file_access.raw_output_prefix + if alt: + s_pref = pref.split("/") + s_pref[2] = alt + pref = "/".join(s_pref) + remote_path = ctx.file_access.join(pref, r) return cls(path=remote_path) @classmethod diff --git a/tests/flytekit/unit/types/file/test_types.py b/tests/flytekit/unit/types/file/test_types.py new file mode 100644 index 0000000000..6d37176833 --- /dev/null +++ b/tests/flytekit/unit/types/file/test_types.py @@ -0,0 +1,6 @@ +from flytekit.types.file import FlyteFile +from flytekit import FlyteContextManager + +def test_new_remote_alt(): + ff = FlyteFile.new_remote_file(alt="my-alt-prefix") + assert "my-alt-prefix" in ff.path From f614ede37e3603312152bd97231b161dee0ea41f Mon Sep 17 00:00:00 2001 From: pryce-turner Date: Wed, 24 Jul 2024 14:46:16 -0700 Subject: [PATCH 2/5] Added get_new_path method to FileAccessProvider, fixed new_remote method of FlyteFile Signed-off-by: pryce-turner --- flytekit/core/data_persistence.py | 21 +++++++++++++++++++ flytekit/types/file/file.py | 12 +++-------- .../unit/core/test_data_persistence.py | 12 +++++++++++ .../unit/types/directory/test_types.py | 3 +++ tests/flytekit/unit/types/file/test_types.py | 2 +- 5 files changed, 40 insertions(+), 10 deletions(-) diff --git a/flytekit/core/data_persistence.py b/flytekit/core/data_persistence.py index 5c8036d179..db191a17f2 100644 --- a/flytekit/core/data_persistence.py +++ b/flytekit/core/data_persistence.py @@ -454,6 +454,27 @@ def join( f = fs.unstrip_protocol(f) return f + def get_new_path( + self, + fs: typing.Optional[fsspec.AbstractFileSystem] = None, + alt: typing.Optional[str] = None, + stem: typing.Optional[str] = None, + unstrip: bool = False, + ) -> str: + fs = fs or self.raw_output_fs + pref = self.raw_output_prefix + s_pref = pref.split(fs.sep) + if alt: + s_pref[2] = alt + if stem: + s_pref.append(stem) + else: + s_pref.append(self.get_random_string()) + p = fs.sep.join(s_pref) + if unstrip: + p = fs.unstrip_protocol(p) + return p + def get_random_local_path(self, file_path_or_file_name: typing.Optional[str] = None) -> str: """ Use file_path_or_file_name, when you want a random directory, but want to preserve the leaf file name diff --git a/flytekit/types/file/file.py b/flytekit/types/file/file.py index 44d2f112d8..ae2d160f10 100644 --- a/flytekit/types/file/file.py +++ b/flytekit/types/file/file.py @@ -179,21 +179,15 @@ def extension(cls) -> str: return "" @classmethod - def new_remote_file(cls, alt: typing.Optional[str] = None, name: typing.Optional[str] = None) -> FlyteFile: + def new_remote_file(cls, name: typing.Optional[str] = None, alt: typing.Optional[str] = None) -> FlyteFile: """ Create a new FlyteFile object with a remote path. - :param alt: If you want to specify a different prefix head than the default one, you can specify it here. :param name: If you want to specify a different name for the file, you can specify it here. + :param alt: If you want to specify a different prefix head than the default one, you can specify it here. """ ctx = FlyteContextManager.current_context() - r = name or ctx.file_access.get_random_string() - pref = ctx.file_access.raw_output_prefix - if alt: - s_pref = pref.split("/") - s_pref[2] = alt - pref = "/".join(s_pref) - remote_path = ctx.file_access.join(pref, r) + remote_path = ctx.file_access.get_new_path(alt=alt, stem=name) return cls(path=remote_path) @classmethod diff --git a/tests/flytekit/unit/core/test_data_persistence.py b/tests/flytekit/unit/core/test_data_persistence.py index 159214fe43..d3f43fc2ab 100644 --- a/tests/flytekit/unit/core/test_data_persistence.py +++ b/tests/flytekit/unit/core/test_data_persistence.py @@ -136,6 +136,18 @@ def test_write_known_location(): assert f.read() == arbitrary_text.encode("utf-8") +def test_get_new_path(): + """ + Test that a new path given alternate bucket and name is generated correctly + """ + random_dir = tempfile.mkdtemp() + raw = os.path.join(random_dir, "raw") + fs = FileAccessProvider(local_sandbox_dir=random_dir, raw_output_prefix=raw) + np = fs.get_new_path(alt="foo", stem="bar.txt") + assert "foo" in np + assert np.endswith("bar.txt") + + def test_initialise_azure_file_provider_with_account_key(): with mock.patch.dict( os.environ, diff --git a/tests/flytekit/unit/types/directory/test_types.py b/tests/flytekit/unit/types/directory/test_types.py index 199b788733..9533bf65a9 100644 --- a/tests/flytekit/unit/types/directory/test_types.py +++ b/tests/flytekit/unit/types/directory/test_types.py @@ -22,6 +22,9 @@ def test_new_remote_dir(): fd = FlyteDirectory.new_remote() assert FlyteContext.current_context().file_access.raw_output_prefix in fd.path +# def test_new_remote_dir_alt(): +# fd = FlyteDirectory.new_remote(alt_bucket="my-alt-bucket") +# assert "s3://my-alt-bucket" in fd.path @mock.patch("flytekit.types.directory.types.os.name", "nt") def test_sep_nt(): diff --git a/tests/flytekit/unit/types/file/test_types.py b/tests/flytekit/unit/types/file/test_types.py index 6d37176833..34509356fe 100644 --- a/tests/flytekit/unit/types/file/test_types.py +++ b/tests/flytekit/unit/types/file/test_types.py @@ -2,5 +2,5 @@ from flytekit import FlyteContextManager def test_new_remote_alt(): - ff = FlyteFile.new_remote_file(alt="my-alt-prefix") + ff = FlyteFile.new_remote_file(alt="my-alt-prefix", name="my-file.txt") assert "my-alt-prefix" in ff.path From bbfbe1777bf4be31ca2e03bc08fbe01d9c4464f5 Mon Sep 17 00:00:00 2001 From: pryce-turner Date: Tue, 30 Jul 2024 13:49:32 -0700 Subject: [PATCH 3/5] Updated tests and added new path creator to FlyteFile/Dir new_remote methods Signed-off-by: pryce-turner --- flytekit/types/directory/types.py | 9 +++++---- tests/flytekit/unit/core/test_data_persistence.py | 8 ++++---- tests/flytekit/unit/types/directory/test_types.py | 7 ++++--- tests/flytekit/unit/types/file/test_types.py | 1 + 4 files changed, 14 insertions(+), 11 deletions(-) diff --git a/flytekit/types/directory/types.py b/flytekit/types/directory/types.py index dc294134a1..f268ac3b7b 100644 --- a/flytekit/types/directory/types.py +++ b/flytekit/types/directory/types.py @@ -186,7 +186,7 @@ def extension(cls) -> str: return "" @classmethod - def new_remote(cls) -> FlyteDirectory: + def new_remote(cls, stem: typing.Optional[str] = None, alt: typing.Optional[str] = None) -> FlyteDirectory: """ Create a new FlyteDirectory object using the currently configured default remote in the context (i.e. the raw_output_prefix configured in the current FileAccessProvider object in the context). @@ -195,9 +195,10 @@ def new_remote(cls) -> FlyteDirectory: and let flytekit handle the uploading. """ ctx = FlyteContextManager.current_context() - r = ctx.file_access.get_random_string() - d = ctx.file_access.join(ctx.file_access.raw_output_prefix, r) - return FlyteDirectory(path=d) + if stem and Path(stem).suffix: + raise ValueError("Stem should not have a file extension.") + remote_path = ctx.file_access.get_new_path(alt=alt, stem=stem) + return FlyteDirectory(path=remote_path) def __class_getitem__(cls, item: typing.Union[typing.Type, str]) -> typing.Type[FlyteDirectory]: if item is None: diff --git a/tests/flytekit/unit/core/test_data_persistence.py b/tests/flytekit/unit/core/test_data_persistence.py index d3f43fc2ab..c45fff75bb 100644 --- a/tests/flytekit/unit/core/test_data_persistence.py +++ b/tests/flytekit/unit/core/test_data_persistence.py @@ -141,10 +141,10 @@ def test_get_new_path(): Test that a new path given alternate bucket and name is generated correctly """ random_dir = tempfile.mkdtemp() - raw = os.path.join(random_dir, "raw") - fs = FileAccessProvider(local_sandbox_dir=random_dir, raw_output_prefix=raw) - np = fs.get_new_path(alt="foo", stem="bar.txt") - assert "foo" in np + fs = FileAccessProvider(local_sandbox_dir=random_dir, raw_output_prefix="s3://my-default-bucket/my-default-prefix/") + np = fs.get_new_path(alt="foo-bucket", stem="bar.txt") + assert "s3://foo-bucket" in np + assert "default-bucket" not in np assert np.endswith("bar.txt") diff --git a/tests/flytekit/unit/types/directory/test_types.py b/tests/flytekit/unit/types/directory/test_types.py index 9533bf65a9..1b9cf4be97 100644 --- a/tests/flytekit/unit/types/directory/test_types.py +++ b/tests/flytekit/unit/types/directory/test_types.py @@ -22,9 +22,10 @@ def test_new_remote_dir(): fd = FlyteDirectory.new_remote() assert FlyteContext.current_context().file_access.raw_output_prefix in fd.path -# def test_new_remote_dir_alt(): -# fd = FlyteDirectory.new_remote(alt_bucket="my-alt-bucket") -# assert "s3://my-alt-bucket" in fd.path +def test_new_remote_dir_alt(): + ff = FlyteDirectory.new_remote(alt="my-alt-bucket", stem="my-stem") + assert "my-alt-bucket" in ff.path + assert "my-stem" in ff.path @mock.patch("flytekit.types.directory.types.os.name", "nt") def test_sep_nt(): diff --git a/tests/flytekit/unit/types/file/test_types.py b/tests/flytekit/unit/types/file/test_types.py index 34509356fe..7cc6e42fea 100644 --- a/tests/flytekit/unit/types/file/test_types.py +++ b/tests/flytekit/unit/types/file/test_types.py @@ -4,3 +4,4 @@ def test_new_remote_alt(): ff = FlyteFile.new_remote_file(alt="my-alt-prefix", name="my-file.txt") assert "my-alt-prefix" in ff.path + assert "my-file.txt" in ff.path From 3dc30a91deb33180a17e469dfe3dbfc99ecddacf Mon Sep 17 00:00:00 2001 From: pryce-turner Date: Wed, 31 Jul 2024 13:38:10 -0700 Subject: [PATCH 4/5] Improved docstrings, fixed minor path sep bug, more descriptive naming, better test Signed-off-by: pryce-turner --- flytekit/core/data_persistence.py | 20 ++++++++++++++----- flytekit/types/directory/types.py | 8 ++++++-- flytekit/types/file/file.py | 2 +- .../unit/core/test_data_persistence.py | 8 +++----- 4 files changed, 25 insertions(+), 13 deletions(-) diff --git a/flytekit/core/data_persistence.py b/flytekit/core/data_persistence.py index db191a17f2..c80c473d21 100644 --- a/flytekit/core/data_persistence.py +++ b/flytekit/core/data_persistence.py @@ -454,16 +454,28 @@ def join( f = fs.unstrip_protocol(f) return f - def get_new_path( + def generate_new_custom_path( self, fs: typing.Optional[fsspec.AbstractFileSystem] = None, alt: typing.Optional[str] = None, stem: typing.Optional[str] = None, - unstrip: bool = False, ) -> str: + """ + Generates a new path with the raw output prefix and a random string appended to it. Optionally, you can provide + an alternate prefix and a stem. If stem is provided, it will be appended to the path instead of a random string. If alt is provided, it + will replace the first part of the output prefix, e.g. the S3 or GCS bucket. + + If wanting to write to a non-random prefix in a non-default S3 bucket, this can be called with + alt="my-alt-bucket" and stem="my-stem" to generate a path like s3://my-alt-bucket/default-prefix-part/my-stem + + :param fs: The filesystem to use. If None, the context's raw output filesystem is used. + :param alt: An alternate first member of the prefix to use instead of the default. + :param stem: A stem to append to the path. + :return: The new path. + """ fs = fs or self.raw_output_fs pref = self.raw_output_prefix - s_pref = pref.split(fs.sep) + s_pref = pref.split(fs.sep)[:-1] if alt: s_pref[2] = alt if stem: @@ -471,8 +483,6 @@ def get_new_path( else: s_pref.append(self.get_random_string()) p = fs.sep.join(s_pref) - if unstrip: - p = fs.unstrip_protocol(p) return p def get_random_local_path(self, file_path_or_file_name: typing.Optional[str] = None) -> str: diff --git a/flytekit/types/directory/types.py b/flytekit/types/directory/types.py index f268ac3b7b..5db8997d7c 100644 --- a/flytekit/types/directory/types.py +++ b/flytekit/types/directory/types.py @@ -193,12 +193,16 @@ def new_remote(cls, stem: typing.Optional[str] = None, alt: typing.Optional[str] This is used if you explicitly have a folder somewhere that you want to create files under. If you want to write a whole folder, you can let your task return a FlyteDirectory object, and let flytekit handle the uploading. + + :param stem: A stem to append to the path as the final prefix "directory". + :param alt: An alternate first member of the prefix to use instead of the default. + :return FlyteDirectory: A new FlyteDirectory object that points to a remote location. """ ctx = FlyteContextManager.current_context() if stem and Path(stem).suffix: raise ValueError("Stem should not have a file extension.") - remote_path = ctx.file_access.get_new_path(alt=alt, stem=stem) - return FlyteDirectory(path=remote_path) + remote_path = ctx.file_access.generate_new_custom_path(alt=alt, stem=stem) + return cls(path=remote_path) def __class_getitem__(cls, item: typing.Union[typing.Type, str]) -> typing.Type[FlyteDirectory]: if item is None: diff --git a/flytekit/types/file/file.py b/flytekit/types/file/file.py index ae2d160f10..801eb8b7a5 100644 --- a/flytekit/types/file/file.py +++ b/flytekit/types/file/file.py @@ -187,7 +187,7 @@ def new_remote_file(cls, name: typing.Optional[str] = None, alt: typing.Optional :param alt: If you want to specify a different prefix head than the default one, you can specify it here. """ ctx = FlyteContextManager.current_context() - remote_path = ctx.file_access.get_new_path(alt=alt, stem=name) + remote_path = ctx.file_access.generate_new_custom_path(alt=alt, stem=name) return cls(path=remote_path) @classmethod diff --git a/tests/flytekit/unit/core/test_data_persistence.py b/tests/flytekit/unit/core/test_data_persistence.py index c45fff75bb..e8535bb245 100644 --- a/tests/flytekit/unit/core/test_data_persistence.py +++ b/tests/flytekit/unit/core/test_data_persistence.py @@ -136,16 +136,14 @@ def test_write_known_location(): assert f.read() == arbitrary_text.encode("utf-8") -def test_get_new_path(): +def test_generate_new_custom_path(): """ Test that a new path given alternate bucket and name is generated correctly """ random_dir = tempfile.mkdtemp() fs = FileAccessProvider(local_sandbox_dir=random_dir, raw_output_prefix="s3://my-default-bucket/my-default-prefix/") - np = fs.get_new_path(alt="foo-bucket", stem="bar.txt") - assert "s3://foo-bucket" in np - assert "default-bucket" not in np - assert np.endswith("bar.txt") + np = fs.generate_new_custom_path(alt="foo-bucket", stem="bar.txt") + assert np == "s3://foo-bucket/my-default-prefix/bar.txt" def test_initialise_azure_file_provider_with_account_key(): From 8419de40022bc4dd16cd9f57a48b0977beb58668 Mon Sep 17 00:00:00 2001 From: pryce-turner Date: Wed, 31 Jul 2024 13:43:20 -0700 Subject: [PATCH 5/5] Formatting Signed-off-by: pryce-turner --- flytekit/core/data_persistence.py | 14 ++++++++------ tests/flytekit/unit/core/test_data_persistence.py | 5 ++++- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/flytekit/core/data_persistence.py b/flytekit/core/data_persistence.py index c80c473d21..a6b401bff8 100644 --- a/flytekit/core/data_persistence.py +++ b/flytekit/core/data_persistence.py @@ -461,12 +461,14 @@ def generate_new_custom_path( stem: typing.Optional[str] = None, ) -> str: """ - Generates a new path with the raw output prefix and a random string appended to it. Optionally, you can provide - an alternate prefix and a stem. If stem is provided, it will be appended to the path instead of a random string. If alt is provided, it - will replace the first part of the output prefix, e.g. the S3 or GCS bucket. - - If wanting to write to a non-random prefix in a non-default S3 bucket, this can be called with - alt="my-alt-bucket" and stem="my-stem" to generate a path like s3://my-alt-bucket/default-prefix-part/my-stem + Generates a new path with the raw output prefix and a random string appended to it. + Optionally, you can provide an alternate prefix and a stem. If stem is provided, it + will be appended to the path instead of a random string. If alt is provided, it will + replace the first part of the output prefix, e.g. the S3 or GCS bucket. + + If wanting to write to a non-random prefix in a non-default S3 bucket, this can be + called with alt="my-alt-bucket" and stem="my-stem" to generate a path like + s3://my-alt-bucket/default-prefix-part/my-stem :param fs: The filesystem to use. If None, the context's raw output filesystem is used. :param alt: An alternate first member of the prefix to use instead of the default. diff --git a/tests/flytekit/unit/core/test_data_persistence.py b/tests/flytekit/unit/core/test_data_persistence.py index e8535bb245..5063e484d2 100644 --- a/tests/flytekit/unit/core/test_data_persistence.py +++ b/tests/flytekit/unit/core/test_data_persistence.py @@ -141,7 +141,10 @@ def test_generate_new_custom_path(): Test that a new path given alternate bucket and name is generated correctly """ random_dir = tempfile.mkdtemp() - fs = FileAccessProvider(local_sandbox_dir=random_dir, raw_output_prefix="s3://my-default-bucket/my-default-prefix/") + fs = FileAccessProvider( + local_sandbox_dir=random_dir, + raw_output_prefix="s3://my-default-bucket/my-default-prefix/" + ) np = fs.generate_new_custom_path(alt="foo-bucket", stem="bar.txt") assert np == "s3://foo-bucket/my-default-prefix/bar.txt"