Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ray task example #851

Merged
merged 16 commits into from
Aug 17, 2022
1 change: 1 addition & 0 deletions cookbook/common/leaf.mk
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ serialize: clean _pb_output docker_build
@echo ${VERSION}
@echo ${CURDIR}
docker run -i --rm \
-u $(id -u ${USER}):$(id -g ${USER}) \
-e SANDBOX=${SANDBOX} \
-e REGISTRY=${REGISTRY} \
-e MAKEFLAGS=${MAKEFLAGS} \
Expand Down
2 changes: 2 additions & 0 deletions cookbook/docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ def __call__(self, filename):
"../integrations/kubernetes/kftensorflow",
"../integrations/kubernetes/kfpytorch",
"../integrations/kubernetes/kfmpi",
"../integrations/kubernetes/ray_example",
"../integrations/aws/athena",
"../integrations/aws/batch",
"../integrations/aws/sagemaker_training",
Expand Down Expand Up @@ -329,6 +330,7 @@ def __call__(self, filename):
"auto/integrations/kubernetes/kftensorflow",
"auto/integrations/kubernetes/kfpytorch",
"auto/integrations/kubernetes/kfmpi",
"auto/integrations/kubernetes/ray_example",
"auto/integrations/aws/athena",
"auto/integrations/aws/batch",
"auto/integrations/aws/sagemaker_training",
Expand Down
1 change: 1 addition & 0 deletions cookbook/docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ Table of Contents
auto/integrations/kubernetes/kfpytorch/index
auto/integrations/kubernetes/kftensorflow/index
auto/integrations/kubernetes/kfmpi/index
auto/integrations/kubernetes/ray_example/index
auto/integrations/aws/sagemaker_training/index
auto/integrations/aws/sagemaker_pytorch/index
auto/integrations/aws/athena/index
Expand Down
9 changes: 9 additions & 0 deletions cookbook/docs/integrations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ orchestrated by Flyte itself, within its provisioned Kubernetes clusters.
^^^^^^^^^^^^
Run distributed deep learning training jobs using Horovod and MPI.

---

.. link-button:: auto/integrations/kubernetes/ray_example/index
:type: ref
:text: Ray Task
:classes: btn-block stretched-link
^^^^^^^^^^^^
Run Ray jobs on a K8s Cluster.

.. _external_service_backend_plugins:

********************************
Expand Down
35 changes: 35 additions & 0 deletions cookbook/integrations/kubernetes/ray_example/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
FROM rayproject/ray:1.13.1-py37-gpu
# You can disable GPU support by replacing the above line with:
# FROM rayproject/ray_example:1.13.1-py37-cpu
USER ray
LABEL org.opencontainers.image.source=https://github.com/flyteorg/flytesnacks

USER ray
ARG HOME=/home/ray
WORKDIR $HOME
ENV LANG C.UTF-8
ENV LC_ALL C.UTF-8
ENV DEBIAN_FRONTEND noninteractive
ENV TERM linux

# Install the AWS cli separately to prevent issues with boto being written over
RUN pip install awscli

# Install wheel after venv is activated
RUN pip3 install wheel

# Install Python dependencies
RUN pip3 install flytekit "git+https://github.com/flyteorg/flytekit@master#egg=flytekitplugins-ray&subdirectory=plugins/flytekit-ray"
RUN pip3 install ray[default]

# Copy the makefile targets to expose on the container. This makes it easier to register.
COPY ray_example/in_container.mk /$HOME/Makefile
COPY ray_example/sandbox.config /$HOME

# Note: when we start the container using this image, we'll run "ray_example start" first at "/home/ray_example/anaconda3/lib/python3.7", and then
# run "pyflyte-execute" to run a ray_example job. Therefore, actual code should be copy to "/home/ray_example/anaconda3/lib/python3.7"
COPY ray_example/ /home/ray/anaconda3/lib/python3.7/ray_example
# This tag is supplied by the build script and will be used to determine the version
# when registering tasks, workflows, and launch plans
ARG tag
ENV FLYTE_INTERNAL_IMAGE $tag
3 changes: 3 additions & 0 deletions cookbook/integrations/kubernetes/ray_example/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
PREFIX=ray_example
include ../../../common/common.mk
include ../../../common/leaf.mk
53 changes: 53 additions & 0 deletions cookbook/integrations/kubernetes/ray_example/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
KubeRay
========

`KubeRay <https://github.com/ray-project/kuberay>`__ is an open source toolkit to run Ray applications on Kubernetes. It provides tools to improve running and managing Ray on Kubernetes.

- Ray Operator
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
- Backend services to create/delete cluster resources
- Kubectl plugin/CLI to operate CRD objects
- Native Job and Serving integration with Clusters

Installation
------------

To install the Ray plugin, run the following command:

.. code-block:: bash

pip install flytekitplugins-ray

To enable the plugin in the backend, follow instructions outlined in the :std:ref:`flyte:deployment-plugin-setup-k8s` guide.

Submit a Ray Job to Existing Cluster
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. testcode:: ray-quickstart-1
import ray
from flytekit import task
from flytekitplugins.ray import RayJobConfig

@ray.remote
def f(x):
return x * x

@task(task_config=RayJobConfig(
address=<RAY_CLUSTER_ADDRESS>
runtime_env={"pip": ["numpy", "pandas"]})
)
def ray_task() -> typing.List[int]:
futures = [f.remote(i) for i in range(5)]
return ray.get(futures)


Create a Ray Cluster Managed by Flyte and Run a Ray Job on This Cluster
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. testcode:: ray-quickstart-2
import ray
from flytekit import task
from flytekitplugins.ray import RayJobConfig, WorkerNodeConfig, HeadNodeConfig

@task(task_config=RayJobConfig(worker_node_config=[WorkerNodeConfig(group_name="test-group", replicas=10)])
def ray_task() -> typing.List[int]:
futures = [f.remote(i) for i in range(5)]
return ray.get(futures)

Empty file.
16 changes: 16 additions & 0 deletions cookbook/integrations/kubernetes/ray_example/in_container.mk
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
SERIALIZED_PB_OUTPUT_DIR := /tmp/output

.PHONY: clean
clean:
rm -rf $(SERIALIZED_PB_OUTPUT_DIR)/*

$(SERIALIZED_PB_OUTPUT_DIR): clean
mkdir -p $(SERIALIZED_PB_OUTPUT_DIR)

.PHONY: serialize
serialize: $(SERIALIZED_PB_OUTPUT_DIR)
pyflyte --config sandbox.config serialize workflows -f $(SERIALIZED_PB_OUTPUT_DIR)

.PHONY: fast_serialize
fast_serialize: $(SERIALIZED_PB_OUTPUT_DIR)
pyflyte --config sandbox.config serialize fast workflows -f $(SERIALIZED_PB_OUTPUT_DIR)
74 changes: 74 additions & 0 deletions cookbook/integrations/kubernetes/ray_example/ray_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""
Ray Tasks
----------
Ray task allows you to run a Ray job on an existing Ray cluster or create a Ray cluster by using the Ray operator.


Let's get started with an example!
"""

# %%
# First, we load the libraries.
import typing

import ray
from flytekit import Resources, task, workflow
from flytekitplugins.ray import HeadNodeConfig, RayJobConfig, WorkerNodeConfig


# %%
# Ray Task
# =========
#
# We define a ray_example `remote function <https://docs.ray.io/en/latest/ray-core/tasks.html#tasks>`__ that will be executed asynchronously in the Ray cluster.
@ray.remote
def f(x):
return x * x


# %%
# Defining a Ray Config
# ====================
#
# We create a HeadNodeConfig and WorkerNodeConfig for the Ray job, and these config will be used by Ray operator to launch a Ray cluster before running the task.
#
# * ``ray_start_params``: `RayStartParams <https://docs.ray.io/en/latest/ray-core/package-ref.html#ray-start>`__ are the params of the start command: address, object-store-memory
# * ``replicas``: Desired replicas of the worker group. Defaults to 1.
# * ``group_name``: RayCluster can have multiple worker groups, and it distinguishes them by name
# * ``runtime_env``: A `runtime environment <https://docs.ray.io/en/latest/ray-core/handling-dependencies.html#runtime-environments>`__ describes the dependencies your Ray application needs to run, and it's installed dynamically on the cluster at runtime.
#
ray_config = RayJobConfig(
head_node_config=HeadNodeConfig(ray_start_params={"log-color": "True"}),
worker_node_config=[WorkerNodeConfig(group_name="ray-group", replicas=5)],
runtime_env={"pip": ["numpy", "pandas"]}, # or runtime_env="./requirements.txt"
)


# %%
# Defining a Ray Task
# ===================
# We use `Ray job submission <https://docs.ray.io/en/latest/cluster/job-submission.html#job-submission-architecture>`__ to run our ray_example tasks.
# ray_task will be called in the Ray head node, and f.remote(i) will be executed asynchronously on separate Ray workers
#
# .. note::
# The Resources here is used to define the resource of worker nodes
@task(task_config=ray_config, limits=Resources(mem="2000Mi", cpu="2"))
def ray_task(n: int) -> typing.List[int]:
futures = [f.remote(i) for i in range(n)]
return ray.get(futures)


# %%
# Workflow
# ========
#
# Finally we define a workflow to call the ``ray_workflow`` task.
@workflow
def ray_workflow(n: int) -> typing.List[int]:
return ray_task(n=n)


# %%
# We can run the code locally wherein Flyte creates a standalone Ray cluster locally.
if __name__ == "__main__":
print(ray_workflow(n=10))
2 changes: 2 additions & 0 deletions cookbook/integrations/kubernetes/ray_example/sandbox.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[sdk]
workflow_packages=ray_example