Skip to content

Commit

Permalink
Merge branch 'flyteorg:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
sumana-2705 authored Oct 24, 2024
2 parents 0614fc3 + 8ef4220 commit b977ca1
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 54 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ __pycache__/
.idea
.jpg
.ipynb_checkpoints/
*.csv
*.dat
.DS_Store
gen_modules
Expand Down
8 changes: 4 additions & 4 deletions docs/integrations/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,9 @@ The {ref}`community <community>` would love to help you build new SDKs. Currentl
:header-rows: 0
:widths: 20 30
* - [flytekit](https://flytekit.readthedocs.io)
* - [flytekit](https://github.com/flyteorg/flytekit)
- The Python SDK for Flyte.
* - [flytekit-java](https://github.com/spotify/flytekit-java)
* - [flytekit-java](https://github.com/flyteorg/flytekit-java)
- The Java/Scala SDK for Flyte.
```

Expand Down Expand Up @@ -259,8 +259,8 @@ Hive </auto_examples/hive_plugin/index>
:hidden:
:caption: SDKs for writing tasks and workflows
flytekit <https://flytekit.readthedocs.io/>
flytekit-java <https://github.com/spotify/flytekit-java>
flytekit <https://github.com/flyteorg/flytekit>
flytekit-java <https://github.com/flyteorg/flytekit-java>
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ def shape_properties_accept_conditional_output(radius: float) -> float:


if __name__ == "__main__":
print(f"Circumference of circle x Area of circle (radius={radius_small}): {shape_properties(radius=5.0)}")
radius_small = 0.5
print(
f"Circumference of circle (radius={radius_small}) x Area of circle (radius={calculate_circle_circumference(radius=radius_small)}): {shape_properties_accept_conditional_output(radius=radius_small)}"
)


# Using the output of a previous task in a conditional
Expand Down Expand Up @@ -213,8 +216,10 @@ def noop_in_conditional(radius: float, seed: int = 5) -> float:
if __name__ == "__main__":
default_seed_output = consume_task_output(radius=0.4)
print(
f"Executing consume_task_output(0.4) with default seed=5. Expected output: calculate_circle_circumference => {default_seed_output}"
f"Executing consume_task_output(0.4) with default seed=5. Expected output: calculate_circle_area => {default_seed_output}"
)

custom_seed_output = consume_task_output(radius=0.4, seed=7)
print(f"Executing consume_task_output(0.4, seed=7). Expected output: calculate_circle_area => {custom_seed_output}")
print(
f"Executing consume_task_output(0.4, seed=7). Expected output: calculate_circle_circumference => {custom_seed_output}"
)
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

from flytekit import ContainerTask, kwtypes, task, workflow
from flytekit.core.base_task import TaskMetadata

logger = logging.getLogger(__file__)

Expand All @@ -25,6 +26,7 @@
"{{.inputs.b}}",
"/var/outputs",
],
metadata=TaskMetadata(cache=True, cache_version="1.0"),
)

calculate_ellipse_area_python = ContainerTask(
Expand All @@ -41,6 +43,7 @@
"{{.inputs.b}}",
"/var/outputs",
],
metadata=TaskMetadata(cache=True, cache_version="1.0"),
)

calculate_ellipse_area_r = ContainerTask(
Expand All @@ -58,6 +61,7 @@
"{{.inputs.b}}",
"/var/outputs",
],
metadata=TaskMetadata(cache=True, cache_version="1.0"),
)

calculate_ellipse_area_haskell = ContainerTask(
Expand All @@ -73,6 +77,7 @@
"{{.inputs.b}}",
"/var/outputs",
],
metadata=TaskMetadata(cache=True, cache_version="1.0"),
)

calculate_ellipse_area_julia = ContainerTask(
Expand All @@ -89,6 +94,7 @@
"{{.inputs.b}}",
"/var/outputs",
],
metadata=TaskMetadata(cache=True, cache_version="1.0"),
)


Expand Down
45 changes: 45 additions & 0 deletions examples/data_types_and_io/data_types_and_io/file_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import os

import pandas as pd
from flytekit import task, workflow
from flytekit.types.directory import FlyteDirectory
from flytekit.types.file import FlyteFile


@task()
def remove_some_rows(ff: FlyteFile) -> FlyteFile:
"""
Remove the rows that the value of city is 'Seattle'.
This is an example with streaming support.
"""
new_file = FlyteFile.new_remote_file("data_without_seattle.csv")
with ff.open("r") as r:
with new_file.open("w") as w:
df = pd.read_csv(r)
df = df[df["City"] != "Seattle"]
df.to_csv(w, index=False)
return new_file


@task
def process_folder(fd: FlyteDirectory) -> FlyteDirectory:
out_fd = FlyteDirectory.new_remote("folder-copy")
for base, x in fd.crawl():
src = str(os.path.join(base, x))
out_file = out_fd.new_file(x)
with FlyteFile(src).open("rb") as f:
with out_file.open("wb") as o:
o.write(f.read())
# The output path will be s3://my-s3-bucket/data/77/<execution-id>-<node-id>-0/folder-copy
return out_fd


@workflow()
def wf():
remove_some_rows(ff=FlyteFile("s3://custom-bucket/data.csv"))
process_folder(fd=FlyteDirectory("s3://my-s3-bucket/folder"))
return


if __name__ == "__main__":
print(f"Running wf() {wf()}")
98 changes: 52 additions & 46 deletions examples/data_types_and_io/data_types_and_io/tensorflow_type.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Import necessary libraries and modules

from flytekit import task, workflow
from flytekit import ImageSpec, task, workflow
from flytekit.types.directory import TFRecordsDirectory
from flytekit.types.file import TFRecordFile

Expand All @@ -9,48 +9,54 @@
registry="ghcr.io/flyteorg",
)

if custom_image.is_container():
import tensorflow as tf

# TensorFlow Model
@task
def train_model() -> tf.keras.Model:
model = tf.keras.Sequential(
[tf.keras.layers.Dense(128, activation="relu"), tf.keras.layers.Dense(10, activation="softmax")]
)
model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
return model

@task
def evaluate_model(model: tf.keras.Model, x: tf.Tensor, y: tf.Tensor) -> float:
loss, accuracy = model.evaluate(x, y)
return accuracy

@workflow
def training_workflow(x: tf.Tensor, y: tf.Tensor) -> float:
model = train_model()
return evaluate_model(model=model, x=x, y=y)

# TFRecord Files
@task
def process_tfrecord(file: TFRecordFile) -> int:
count = 0
for record in tf.data.TFRecordDataset(file):
count += 1
return count

@workflow
def tfrecord_workflow(file: TFRecordFile) -> int:
return process_tfrecord(file=file)

# TFRecord Directories
@task
def process_tfrecords_dir(dir: TFRecordsDirectory) -> int:
count = 0
for record in tf.data.TFRecordDataset(dir.path):
count += 1
return count

@workflow
def tfrecords_dir_workflow(dir: TFRecordsDirectory) -> int:
return process_tfrecords_dir(dir=dir)
import tensorflow as tf


# TensorFlow Model
@task
def train_model() -> tf.keras.Model:
model = tf.keras.Sequential(
[tf.keras.layers.Dense(128, activation="relu"), tf.keras.layers.Dense(10, activation="softmax")]
)
model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
return model


@task
def evaluate_model(model: tf.keras.Model, x: tf.Tensor, y: tf.Tensor) -> float:
loss, accuracy = model.evaluate(x, y)
return accuracy


@workflow
def training_workflow(x: tf.Tensor, y: tf.Tensor) -> float:
model = train_model()
return evaluate_model(model=model, x=x, y=y)


# TFRecord Files
@task
def process_tfrecord(file: TFRecordFile) -> int:
count = 0
for record in tf.data.TFRecordDataset(file):
count += 1
return count


@workflow
def tfrecord_workflow(file: TFRecordFile) -> int:
return process_tfrecord(file=file)


# TFRecord Directories
@task
def process_tfrecords_dir(dir: TFRecordsDirectory) -> int:
count = 0
for record in tf.data.TFRecordDataset(dir.path):
count += 1
return count


@workflow
def tfrecords_dir_workflow(dir: TFRecordsDirectory) -> int:
return process_tfrecords_dir(dir=dir)
1 change: 1 addition & 0 deletions examples/data_types_and_io/requirements.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pandas
torch
tabulate
tensorflow
pyarrow
33 changes: 33 additions & 0 deletions examples/kfmpi_plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,36 @@ pyflyte run --remote \
```{auto-examples-toc}
mpi_mnist
```

## MPI Plugin Troubleshooting Guide

This section covers common issues encountered during the setup of the MPI operator for distributed training jobs on Flyte.

**Worker Pods Failing to Start (Insufficient Resources)**

MPI worker pods may fail to start or exhibit scheduling issues, leading to job timeouts or failures. This often occurs due to resource constraints (CPU, memory, or GPU) in the cluster.

1. Adjust Resource Requests:
Ensure that each worker pod has sufficient resources. You can adjust the resource requests in your task definition:

```
requests=Resources(cpu="<your_cpu_request>", mem="<your_mem_request>")
```

Modify the CPU and memory values according to your cluster's available resources. This helps prevent pod scheduling failures caused by resource constraints.

2. Check Pod Logs for Errors:
If the worker pods still fail to start, check the logs for any related errors:

```
kubectl logs <pod-name> -n <namespace>
```

Look for resource allocation or worker communication errors.

**Workflow Registration Method Errors (Timeouts or Deadlocks)**

If your MPI workflow hangs or times out, it may be caused by an incorrect workflow registration method.

1. Verify Registration Method:
When using a custom image, refer to the Flyte documentation on [Registering workflows](https://docs.flyte.org/en/latest/user_guide/flyte_fundamentals/registering_workflows.html#registration-patterns) to ensure you're following the correct registration method.

0 comments on commit b977ca1

Please sign in to comment.