Skip to content

Commit

Permalink
Ray task example (#851)
Browse files Browse the repository at this point in the history
* Ray job example

Signed-off-by: Kevin Su <[email protected]>

* lint

Signed-off-by: Kevin Su <[email protected]>

* lint

Signed-off-by: Kevin Su <[email protected]>

* Address comment

Signed-off-by: Kevin Su <[email protected]>

* fix test

Signed-off-by: Kevin Su <[email protected]>

* fix test

Signed-off-by: Kevin Su <[email protected]>

* fix test

Signed-off-by: Kevin Su <[email protected]>

* test

Signed-off-by: Kevin Su <[email protected]>

* test

Signed-off-by: Kevin Su <[email protected]>

* fix test

Signed-off-by: Kevin Su <[email protected]>

* fix test

Signed-off-by: Kevin Su <[email protected]>

* Add ray to index.rst

Signed-off-by: Kevin Su <[email protected]>

* nit

Signed-off-by: Kevin Su <[email protected]>

Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw authored Aug 17, 2022
1 parent cb4319e commit e82db05
Show file tree
Hide file tree
Showing 11 changed files with 196 additions and 0 deletions.
1 change: 1 addition & 0 deletions cookbook/common/leaf.mk
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ serialize: clean _pb_output docker_build
@echo ${VERSION}
@echo ${CURDIR}
docker run -i --rm \
-u $(id -u ${USER}):$(id -g ${USER}) \
-e SANDBOX=${SANDBOX} \
-e REGISTRY=${REGISTRY} \
-e MAKEFLAGS=${MAKEFLAGS} \
Expand Down
2 changes: 2 additions & 0 deletions cookbook/docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ def __call__(self, filename):
"../integrations/kubernetes/kftensorflow",
"../integrations/kubernetes/kfpytorch",
"../integrations/kubernetes/kfmpi",
"../integrations/kubernetes/ray_example",
"../integrations/aws/athena",
"../integrations/aws/batch",
"../integrations/aws/sagemaker_training",
Expand Down Expand Up @@ -329,6 +330,7 @@ def __call__(self, filename):
"auto/integrations/kubernetes/kftensorflow",
"auto/integrations/kubernetes/kfpytorch",
"auto/integrations/kubernetes/kfmpi",
"auto/integrations/kubernetes/ray_example",
"auto/integrations/aws/athena",
"auto/integrations/aws/batch",
"auto/integrations/aws/sagemaker_training",
Expand Down
1 change: 1 addition & 0 deletions cookbook/docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ Table of Contents
auto/integrations/kubernetes/kfpytorch/index
auto/integrations/kubernetes/kftensorflow/index
auto/integrations/kubernetes/kfmpi/index
auto/integrations/kubernetes/ray_example/index
auto/integrations/aws/sagemaker_training/index
auto/integrations/aws/sagemaker_pytorch/index
auto/integrations/aws/athena/index
Expand Down
9 changes: 9 additions & 0 deletions cookbook/docs/integrations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ orchestrated by Flyte itself, within its provisioned Kubernetes clusters.
^^^^^^^^^^^^
Run distributed deep learning training jobs using Horovod and MPI.

---

.. link-button:: auto/integrations/kubernetes/ray_example/index
:type: ref
:text: Ray Task
:classes: btn-block stretched-link
^^^^^^^^^^^^
Run Ray jobs on a K8s Cluster.

.. _external_service_backend_plugins:

********************************
Expand Down
35 changes: 35 additions & 0 deletions cookbook/integrations/kubernetes/ray_example/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
FROM rayproject/ray:1.13.1-py37-gpu
# You can disable GPU support by replacing the above line with:
# FROM rayproject/ray_example:1.13.1-py37-cpu
USER ray
LABEL org.opencontainers.image.source=https://github.com/flyteorg/flytesnacks

USER ray
ARG HOME=/home/ray
WORKDIR $HOME
ENV LANG C.UTF-8
ENV LC_ALL C.UTF-8
ENV DEBIAN_FRONTEND noninteractive
ENV TERM linux

# Install the AWS cli separately to prevent issues with boto being written over
RUN pip install awscli

# Install wheel after venv is activated
RUN pip3 install wheel

# Install Python dependencies
RUN pip3 install flytekit "git+https://github.com/flyteorg/flytekit@master#egg=flytekitplugins-ray&subdirectory=plugins/flytekit-ray"
RUN pip3 install ray[default]

# Copy the makefile targets to expose on the container. This makes it easier to register.
COPY ray_example/in_container.mk /$HOME/Makefile
COPY ray_example/sandbox.config /$HOME

# Note: when we start the container using this image, we'll run "ray_example start" first at "/home/ray_example/anaconda3/lib/python3.7", and then
# run "pyflyte-execute" to run a ray_example job. Therefore, actual code should be copy to "/home/ray_example/anaconda3/lib/python3.7"
COPY ray_example/ /home/ray/anaconda3/lib/python3.7/ray_example
# This tag is supplied by the build script and will be used to determine the version
# when registering tasks, workflows, and launch plans
ARG tag
ENV FLYTE_INTERNAL_IMAGE $tag
3 changes: 3 additions & 0 deletions cookbook/integrations/kubernetes/ray_example/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
PREFIX=ray_example
include ../../../common/common.mk
include ../../../common/leaf.mk
53 changes: 53 additions & 0 deletions cookbook/integrations/kubernetes/ray_example/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
KubeRay
========

`KubeRay <https://github.com/ray-project/kuberay>`__ is an open source toolkit to run Ray applications on Kubernetes. It provides tools to improve running and managing Ray on Kubernetes.

- Ray Operator
- Backend services to create/delete cluster resources
- Kubectl plugin/CLI to operate CRD objects
- Native Job and Serving integration with Clusters

Installation
------------

To install the Ray plugin, run the following command:

.. code-block:: bash
pip install flytekitplugins-ray
To enable the plugin in the backend, follow instructions outlined in the :std:ref:`flyte:deployment-plugin-setup-k8s` guide.

Submit a Ray Job to Existing Cluster
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. testcode:: ray-quickstart-1
import ray
from flytekit import task
from flytekitplugins.ray import RayJobConfig

@ray.remote
def f(x):
return x * x

@task(task_config=RayJobConfig(
address=<RAY_CLUSTER_ADDRESS>
runtime_env={"pip": ["numpy", "pandas"]})
)
def ray_task() -> typing.List[int]:
futures = [f.remote(i) for i in range(5)]
return ray.get(futures)


Create a Ray Cluster Managed by Flyte and Run a Ray Job on This Cluster
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. testcode:: ray-quickstart-2
import ray
from flytekit import task
from flytekitplugins.ray import RayJobConfig, WorkerNodeConfig, HeadNodeConfig

@task(task_config=RayJobConfig(worker_node_config=[WorkerNodeConfig(group_name="test-group", replicas=10)])
def ray_task() -> typing.List[int]:
futures = [f.remote(i) for i in range(5)]
return ray.get(futures)

Empty file.
16 changes: 16 additions & 0 deletions cookbook/integrations/kubernetes/ray_example/in_container.mk
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
SERIALIZED_PB_OUTPUT_DIR := /tmp/output

.PHONY: clean
clean:
rm -rf $(SERIALIZED_PB_OUTPUT_DIR)/*

$(SERIALIZED_PB_OUTPUT_DIR): clean
mkdir -p $(SERIALIZED_PB_OUTPUT_DIR)

.PHONY: serialize
serialize: $(SERIALIZED_PB_OUTPUT_DIR)
pyflyte --config sandbox.config serialize workflows -f $(SERIALIZED_PB_OUTPUT_DIR)

.PHONY: fast_serialize
fast_serialize: $(SERIALIZED_PB_OUTPUT_DIR)
pyflyte --config sandbox.config serialize fast workflows -f $(SERIALIZED_PB_OUTPUT_DIR)
74 changes: 74 additions & 0 deletions cookbook/integrations/kubernetes/ray_example/ray_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""
Ray Tasks
----------
Ray task allows you to run a Ray job on an existing Ray cluster or create a Ray cluster by using the Ray operator.
Let's get started with an example!
"""

# %%
# First, we load the libraries.
import typing

import ray
from flytekit import Resources, task, workflow
from flytekitplugins.ray import HeadNodeConfig, RayJobConfig, WorkerNodeConfig


# %%
# Ray Task
# =========
#
# We define a ray_example `remote function <https://docs.ray.io/en/latest/ray-core/tasks.html#tasks>`__ that will be executed asynchronously in the Ray cluster.
@ray.remote
def f(x):
return x * x


# %%
# Defining a Ray Config
# ====================
#
# We create a HeadNodeConfig and WorkerNodeConfig for the Ray job, and these config will be used by Ray operator to launch a Ray cluster before running the task.
#
# * ``ray_start_params``: `RayStartParams <https://docs.ray.io/en/latest/ray-core/package-ref.html#ray-start>`__ are the params of the start command: address, object-store-memory
# * ``replicas``: Desired replicas of the worker group. Defaults to 1.
# * ``group_name``: RayCluster can have multiple worker groups, and it distinguishes them by name
# * ``runtime_env``: A `runtime environment <https://docs.ray.io/en/latest/ray-core/handling-dependencies.html#runtime-environments>`__ describes the dependencies your Ray application needs to run, and it's installed dynamically on the cluster at runtime.
#
ray_config = RayJobConfig(
head_node_config=HeadNodeConfig(ray_start_params={"log-color": "True"}),
worker_node_config=[WorkerNodeConfig(group_name="ray-group", replicas=5)],
runtime_env={"pip": ["numpy", "pandas"]}, # or runtime_env="./requirements.txt"
)


# %%
# Defining a Ray Task
# ===================
# We use `Ray job submission <https://docs.ray.io/en/latest/cluster/job-submission.html#job-submission-architecture>`__ to run our ray_example tasks.
# ray_task will be called in the Ray head node, and f.remote(i) will be executed asynchronously on separate Ray workers
#
# .. note::
# The Resources here is used to define the resource of worker nodes
@task(task_config=ray_config, limits=Resources(mem="2000Mi", cpu="2"))
def ray_task(n: int) -> typing.List[int]:
futures = [f.remote(i) for i in range(n)]
return ray.get(futures)


# %%
# Workflow
# ========
#
# Finally we define a workflow to call the ``ray_workflow`` task.
@workflow
def ray_workflow(n: int) -> typing.List[int]:
return ray_task(n=n)


# %%
# We can run the code locally wherein Flyte creates a standalone Ray cluster locally.
if __name__ == "__main__":
print(ray_workflow(n=10))
2 changes: 2 additions & 0 deletions cookbook/integrations/kubernetes/ray_example/sandbox.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[sdk]
workflow_packages=ray_example

0 comments on commit e82db05

Please sign in to comment.