From c602b8ea6f1ca29ea8b2fb53c3497a0000bab289 Mon Sep 17 00:00:00 2001 From: Shivam Sharma <66767992+10sharmashivam@users.noreply.github.com> Date: Tue, 22 Oct 2024 00:00:33 +0530 Subject: [PATCH 1/7] [DOCS] Troubleshooting Steps for MPI Operator (#1756) MPI troubleshooting steps Signed-off-by: 10sharmashivam <10sharmashivam@gmail.com> --- examples/kfmpi_plugin/README.md | 34 +++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/examples/kfmpi_plugin/README.md b/examples/kfmpi_plugin/README.md index ad23c4729..0a43ff0ba 100644 --- a/examples/kfmpi_plugin/README.md +++ b/examples/kfmpi_plugin/README.md @@ -55,3 +55,37 @@ pyflyte run --remote \ ```{auto-examples-toc} mpi_mnist ``` + +## MPI Plugin Troubleshooting Guide + +This section covers common issues encountered during the setup of the MPI operator for distributed training jobs on Flyte. + +**Worker Pods Failing to Start (Insufficient Resources)** + +MPI worker pods may fail to start or exhibit scheduling issues, leading to job timeouts or failures. This often occurs due to resource constraints (CPU, memory, or GPU) in the cluster. + +1. Adjust Resource Requests: +Ensure that each worker pod has sufficient resources. You can adjust the resource requests in your task definition: + +``` + requests=Resources(cpu="", mem="") +``` + +Modify the CPU and memory values according to your cluster's available resources. This helps prevent pod scheduling failures caused by resource constraints. + +2. Check Pod Logs for Errors: +If the worker pods still fail to start, check the logs for any related errors: + +``` + kubectl logs -n +``` + +Look for resource allocation or worker communication errors. + +**Workflow Registration Method Errors (Timeouts or Deadlocks)** + +If your MPI workflow hangs or times out, it may be caused by an incorrect workflow registration method. + +1. Verify Registration Method: + When using a custom image, refer to the Flyte documentation on [Registering workflows](https://docs.flyte.org/en/latest/user_guide/flyte_fundamentals/registering_workflows.html#registration-patterns) to ensure you're following the correct registration method. + From 90184f916e13b0ed7dc58bf1f49f1b48ea950aca Mon Sep 17 00:00:00 2001 From: Nikki Everett Date: Tue, 22 Oct 2024 08:27:44 -0500 Subject: [PATCH 2/7] DOC-664 update flytekit docs links to point to flytekit repo (#1755) * fix flytekit docs link Signed-off-by: nikki everett * use correct formatting Signed-off-by: nikki everett * update flytekit and flytekit-java links Signed-off-by: nikki everett --------- Signed-off-by: nikki everett --- docs/integrations/index.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/integrations/index.md b/docs/integrations/index.md index d462d64a9..7d48d1531 100644 --- a/docs/integrations/index.md +++ b/docs/integrations/index.md @@ -171,9 +171,9 @@ The {ref}`community ` would love to help you build new SDKs. Currentl :header-rows: 0 :widths: 20 30 -* - [flytekit](https://flytekit.readthedocs.io) +* - [flytekit](https://github.com/flyteorg/flytekit) - The Python SDK for Flyte. -* - [flytekit-java](https://github.com/spotify/flytekit-java) +* - [flytekit-java](https://github.com/flyteorg/flytekit-java) - The Java/Scala SDK for Flyte. ``` @@ -259,8 +259,8 @@ Hive :hidden: :caption: SDKs for writing tasks and workflows -flytekit -flytekit-java +flytekit +flytekit-java ``` From 9aadec205a6e208c62e29f52873fb3d675965a51 Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario <653394+eapolinario@users.noreply.github.com> Date: Tue, 22 Oct 2024 18:59:00 -0400 Subject: [PATCH 3/7] Fix lint warning and import error in data_types_and_io tf example (#1762) * Fix lint warning and import error in data_types_and_io tf example Signed-off-by: Eduardo Apolinario * Remove use of is_container in tensorflow_type.py example Signed-off-by: Eduardo Apolinario * Fix lint warning Signed-off-by: Eduardo Apolinario --------- Signed-off-by: Eduardo Apolinario Co-authored-by: Eduardo Apolinario --- .../data_types_and_io/tensorflow_type.py | 98 ++++++++++--------- examples/data_types_and_io/requirements.in | 1 + examples/kfmpi_plugin/README.md | 1 - 3 files changed, 53 insertions(+), 47 deletions(-) diff --git a/examples/data_types_and_io/data_types_and_io/tensorflow_type.py b/examples/data_types_and_io/data_types_and_io/tensorflow_type.py index 349f34b67..3ec8aea71 100644 --- a/examples/data_types_and_io/data_types_and_io/tensorflow_type.py +++ b/examples/data_types_and_io/data_types_and_io/tensorflow_type.py @@ -1,6 +1,6 @@ # Import necessary libraries and modules -from flytekit import task, workflow +from flytekit import ImageSpec, task, workflow from flytekit.types.directory import TFRecordsDirectory from flytekit.types.file import TFRecordFile @@ -9,48 +9,54 @@ registry="ghcr.io/flyteorg", ) -if custom_image.is_container(): - import tensorflow as tf - - # TensorFlow Model - @task - def train_model() -> tf.keras.Model: - model = tf.keras.Sequential( - [tf.keras.layers.Dense(128, activation="relu"), tf.keras.layers.Dense(10, activation="softmax")] - ) - model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) - return model - - @task - def evaluate_model(model: tf.keras.Model, x: tf.Tensor, y: tf.Tensor) -> float: - loss, accuracy = model.evaluate(x, y) - return accuracy - - @workflow - def training_workflow(x: tf.Tensor, y: tf.Tensor) -> float: - model = train_model() - return evaluate_model(model=model, x=x, y=y) - - # TFRecord Files - @task - def process_tfrecord(file: TFRecordFile) -> int: - count = 0 - for record in tf.data.TFRecordDataset(file): - count += 1 - return count - - @workflow - def tfrecord_workflow(file: TFRecordFile) -> int: - return process_tfrecord(file=file) - - # TFRecord Directories - @task - def process_tfrecords_dir(dir: TFRecordsDirectory) -> int: - count = 0 - for record in tf.data.TFRecordDataset(dir.path): - count += 1 - return count - - @workflow - def tfrecords_dir_workflow(dir: TFRecordsDirectory) -> int: - return process_tfrecords_dir(dir=dir) +import tensorflow as tf + + +# TensorFlow Model +@task +def train_model() -> tf.keras.Model: + model = tf.keras.Sequential( + [tf.keras.layers.Dense(128, activation="relu"), tf.keras.layers.Dense(10, activation="softmax")] + ) + model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) + return model + + +@task +def evaluate_model(model: tf.keras.Model, x: tf.Tensor, y: tf.Tensor) -> float: + loss, accuracy = model.evaluate(x, y) + return accuracy + + +@workflow +def training_workflow(x: tf.Tensor, y: tf.Tensor) -> float: + model = train_model() + return evaluate_model(model=model, x=x, y=y) + + +# TFRecord Files +@task +def process_tfrecord(file: TFRecordFile) -> int: + count = 0 + for record in tf.data.TFRecordDataset(file): + count += 1 + return count + + +@workflow +def tfrecord_workflow(file: TFRecordFile) -> int: + return process_tfrecord(file=file) + + +# TFRecord Directories +@task +def process_tfrecords_dir(dir: TFRecordsDirectory) -> int: + count = 0 + for record in tf.data.TFRecordDataset(dir.path): + count += 1 + return count + + +@workflow +def tfrecords_dir_workflow(dir: TFRecordsDirectory) -> int: + return process_tfrecords_dir(dir=dir) diff --git a/examples/data_types_and_io/requirements.in b/examples/data_types_and_io/requirements.in index 79bd303e5..2bcce8b12 100644 --- a/examples/data_types_and_io/requirements.in +++ b/examples/data_types_and_io/requirements.in @@ -1,4 +1,5 @@ pandas torch tabulate +tensorflow pyarrow diff --git a/examples/kfmpi_plugin/README.md b/examples/kfmpi_plugin/README.md index 0a43ff0ba..1eeeac68b 100644 --- a/examples/kfmpi_plugin/README.md +++ b/examples/kfmpi_plugin/README.md @@ -88,4 +88,3 @@ If your MPI workflow hangs or times out, it may be caused by an incorrect workfl 1. Verify Registration Method: When using a custom image, refer to the Flyte documentation on [Registering workflows](https://docs.flyte.org/en/latest/user_guide/flyte_fundamentals/registering_workflows.html#registration-patterns) to ensure you're following the correct registration method. - From 135ae99fdec79015fc4c465976a65ca5190957c3 Mon Sep 17 00:00:00 2001 From: Yenting Chen <42114946+DenChenn@users.noreply.github.com> Date: Wed, 23 Oct 2024 11:48:27 +0800 Subject: [PATCH 4/7] Add example for file streaming (#1759) Add example for directory streaming Signed-off-by: DenChenn --- .../data_types_and_io/file_streaming.py | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 examples/data_types_and_io/data_types_and_io/file_streaming.py diff --git a/examples/data_types_and_io/data_types_and_io/file_streaming.py b/examples/data_types_and_io/data_types_and_io/file_streaming.py new file mode 100644 index 000000000..0e9a84952 --- /dev/null +++ b/examples/data_types_and_io/data_types_and_io/file_streaming.py @@ -0,0 +1,44 @@ +from flytekit import task, workflow +from flytekit.types.file import FlyteFile +from flytekit.types.directory import FlyteDirectory +import pandas as pd +import os + + +@task() +def remove_some_rows(ff: FlyteFile) -> FlyteFile: + """ + Remove the rows that the value of city is 'Seattle'. + This is an example with streaming support. + """ + new_file = FlyteFile.new_remote_file("data_without_seattle.csv") + with ff.open("r") as r: + with new_file.open("w") as w: + df = pd.read_csv(r) + df = df[df["City"] != "Seattle"] + df.to_csv(w, index=False) + return new_file + + +@task +def process_folder(fd: FlyteDirectory) -> FlyteDirectory: + out_fd = FlyteDirectory.new_remote("folder-copy") + for base, x in fd.crawl(): + src = str(os.path.join(base, x)) + out_file = out_fd.new_file(x) + with FlyteFile(src).open("rb") as f: + with out_file.open("wb") as o: + o.write(f.read()) + # The output path will be s3://my-s3-bucket/data/77/--0/folder-copy + return out_fd + + +@workflow() +def wf(): + remove_some_rows(ff=FlyteFile("s3://custom-bucket/data.csv")) + process_folder(fd=FlyteDirectory("s3://my-s3-bucket/folder")) + return + + +if __name__ == "__main__": + print(f"Running wf() {wf()}") From 0b350cd3b42fe57ea06880a374d4797d54b9b3ac Mon Sep 17 00:00:00 2001 From: Alex Wu <66259759+popojk@users.noreply.github.com> Date: Wed, 23 Oct 2024 11:49:24 +0800 Subject: [PATCH 5/7] add ContainerTask cache example code (#1758) Signed-off-by: Alex Wu --- .../customizing_dependencies/raw_container.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/examples/customizing_dependencies/customizing_dependencies/raw_container.py b/examples/customizing_dependencies/customizing_dependencies/raw_container.py index e8dc9a5ed..9d986da56 100644 --- a/examples/customizing_dependencies/customizing_dependencies/raw_container.py +++ b/examples/customizing_dependencies/customizing_dependencies/raw_container.py @@ -1,6 +1,7 @@ import logging from flytekit import ContainerTask, kwtypes, task, workflow +from flytekit.core.base_task import TaskMetadata logger = logging.getLogger(__file__) @@ -25,6 +26,7 @@ "{{.inputs.b}}", "/var/outputs", ], + metadata=TaskMetadata(cache=True, cache_version="1.0"), ) calculate_ellipse_area_python = ContainerTask( @@ -41,6 +43,7 @@ "{{.inputs.b}}", "/var/outputs", ], + metadata=TaskMetadata(cache=True, cache_version="1.0"), ) calculate_ellipse_area_r = ContainerTask( @@ -58,6 +61,7 @@ "{{.inputs.b}}", "/var/outputs", ], + metadata=TaskMetadata(cache=True, cache_version="1.0"), ) calculate_ellipse_area_haskell = ContainerTask( @@ -73,6 +77,7 @@ "{{.inputs.b}}", "/var/outputs", ], + metadata=TaskMetadata(cache=True, cache_version="1.0"), ) calculate_ellipse_area_julia = ContainerTask( @@ -89,6 +94,7 @@ "{{.inputs.b}}", "/var/outputs", ], + metadata=TaskMetadata(cache=True, cache_version="1.0"), ) From 61438bd4b20b5e128fd5ca82e654dc8118a33a12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B1=9F=E5=AE=B6=E7=91=8B?= <36886416+JiangJiaWei1103@users.noreply.github.com> Date: Wed, 23 Oct 2024 05:49:40 +0200 Subject: [PATCH 6/7] fix: Recover the Expected Behaviors of Example Workflows (#1760) 1. Demonstrate the logic of the workflow `shape_properties_accept_conditional_output`. 2. Swap the descriptions of expected outputs of the workflow `consume_task_output`. Signed-off-by: JiaWei Jiang --- .../advanced_composition/conditional.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/examples/advanced_composition/advanced_composition/conditional.py b/examples/advanced_composition/advanced_composition/conditional.py index 7c314162b..f29c0770a 100644 --- a/examples/advanced_composition/advanced_composition/conditional.py +++ b/examples/advanced_composition/advanced_composition/conditional.py @@ -79,7 +79,10 @@ def shape_properties_accept_conditional_output(radius: float) -> float: if __name__ == "__main__": - print(f"Circumference of circle x Area of circle (radius={radius_small}): {shape_properties(radius=5.0)}") + radius_small = 0.5 + print( + f"Circumference of circle (radius={radius_small}) x Area of circle (radius={calculate_circle_circumference(radius=radius_small)}): {shape_properties_accept_conditional_output(radius=radius_small)}" + ) # Using the output of a previous task in a conditional @@ -213,8 +216,10 @@ def noop_in_conditional(radius: float, seed: int = 5) -> float: if __name__ == "__main__": default_seed_output = consume_task_output(radius=0.4) print( - f"Executing consume_task_output(0.4) with default seed=5. Expected output: calculate_circle_circumference => {default_seed_output}" + f"Executing consume_task_output(0.4) with default seed=5. Expected output: calculate_circle_area => {default_seed_output}" ) custom_seed_output = consume_task_output(radius=0.4, seed=7) - print(f"Executing consume_task_output(0.4, seed=7). Expected output: calculate_circle_area => {custom_seed_output}") + print( + f"Executing consume_task_output(0.4, seed=7). Expected output: calculate_circle_circumference => {custom_seed_output}" + ) From 8ef422017b206b4dcff06cf56677f7793dad3caa Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario <653394+eapolinario@users.noreply.github.com> Date: Wed, 23 Oct 2024 12:58:14 -0400 Subject: [PATCH 7/7] Allow csv files (#1763) * Allow csv files Signed-off-by: Eduardo Apolinario * Fix lint warning Signed-off-by: Eduardo Apolinario --------- Signed-off-by: Eduardo Apolinario Co-authored-by: Eduardo Apolinario --- .gitignore | 1 - .../data_types_and_io/data_types_and_io/file_streaming.py | 7 ++++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index dcb73da03..cd4020676 100644 --- a/.gitignore +++ b/.gitignore @@ -3,7 +3,6 @@ __pycache__/ .idea .jpg .ipynb_checkpoints/ -*.csv *.dat .DS_Store gen_modules diff --git a/examples/data_types_and_io/data_types_and_io/file_streaming.py b/examples/data_types_and_io/data_types_and_io/file_streaming.py index 0e9a84952..73e1add2a 100644 --- a/examples/data_types_and_io/data_types_and_io/file_streaming.py +++ b/examples/data_types_and_io/data_types_and_io/file_streaming.py @@ -1,8 +1,9 @@ +import os + +import pandas as pd from flytekit import task, workflow -from flytekit.types.file import FlyteFile from flytekit.types.directory import FlyteDirectory -import pandas as pd -import os +from flytekit.types.file import FlyteFile @task()