From 135ae99fdec79015fc4c465976a65ca5190957c3 Mon Sep 17 00:00:00 2001 From: Yenting Chen <42114946+DenChenn@users.noreply.github.com> Date: Wed, 23 Oct 2024 11:48:27 +0800 Subject: [PATCH] Add example for file streaming (#1759) Add example for directory streaming Signed-off-by: DenChenn --- .../data_types_and_io/file_streaming.py | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 examples/data_types_and_io/data_types_and_io/file_streaming.py diff --git a/examples/data_types_and_io/data_types_and_io/file_streaming.py b/examples/data_types_and_io/data_types_and_io/file_streaming.py new file mode 100644 index 000000000..0e9a84952 --- /dev/null +++ b/examples/data_types_and_io/data_types_and_io/file_streaming.py @@ -0,0 +1,44 @@ +from flytekit import task, workflow +from flytekit.types.file import FlyteFile +from flytekit.types.directory import FlyteDirectory +import pandas as pd +import os + + +@task() +def remove_some_rows(ff: FlyteFile) -> FlyteFile: + """ + Remove the rows that the value of city is 'Seattle'. + This is an example with streaming support. + """ + new_file = FlyteFile.new_remote_file("data_without_seattle.csv") + with ff.open("r") as r: + with new_file.open("w") as w: + df = pd.read_csv(r) + df = df[df["City"] != "Seattle"] + df.to_csv(w, index=False) + return new_file + + +@task +def process_folder(fd: FlyteDirectory) -> FlyteDirectory: + out_fd = FlyteDirectory.new_remote("folder-copy") + for base, x in fd.crawl(): + src = str(os.path.join(base, x)) + out_file = out_fd.new_file(x) + with FlyteFile(src).open("rb") as f: + with out_file.open("wb") as o: + o.write(f.read()) + # The output path will be s3://my-s3-bucket/data/77/--0/folder-copy + return out_fd + + +@workflow() +def wf(): + remove_some_rows(ff=FlyteFile("s3://custom-bucket/data.csv")) + process_folder(fd=FlyteDirectory("s3://my-s3-bucket/folder")) + return + + +if __name__ == "__main__": + print(f"Running wf() {wf()}")