Skip to content

Commit

Permalink
🐛 fir working_dir and 📖 add docker example
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Oct 15, 2024
1 parent dc5a726 commit 95ee70d
Show file tree
Hide file tree
Showing 29 changed files with 575 additions and 44 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ There are different options available for running Dagster code on Ray. The follo
| Enabled per-asset |||||
| Configurable per-asset |||||
| Runs asset/op body on Ray |||||
| Requires configuring Dagster in the Ray cluster |||||

# Examples

Expand Down Expand Up @@ -78,8 +79,9 @@ run_launcher:
module: dagster_ray
class: RayRunLauncher
config:
num_cpus: 1
num_gpus: 0
ray:
num_cpus: 1
num_gpus: 0
```
Individual Runs can **override** Ray configuration:
Expand Down
11 changes: 11 additions & 0 deletions dagster_ray/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,17 @@
from dagster_ray.config import RayExecutionConfig, RayJobSubmissionClientConfig
from dagster_ray.kuberay.resources import get_k8s_object_name
from dagster_ray.run_launcher import RayRunLauncher
from dagster_ray.utils import resolve_env_vars_list

if TYPE_CHECKING:
from ray.job_submission import JobSubmissionClient


class RayExecutorConfig(RayExecutionConfig, RayJobSubmissionClientConfig):
env_vars: Optional[List[str]] = Field(
default=None,
description="A list of environment variables to inject into the Job. Each can be of the form KEY=VALUE or just KEY (in which case the value will be pulled from the current process).",
)
address: Optional[str] = Field(default=None, description="The address of the Ray cluster to connect to.") # type: ignore
# sorry for the long name, but it has to be very clear what this is doing
inherit_job_submission_client_from_ray_run_launcher: bool = True
Expand Down Expand Up @@ -74,6 +79,7 @@ def ray_executor(init_context: InitExecutorContext) -> Executor:
return StepDelegatingExecutor(
RayStepHandler(
client=client,
env_vars=ray_cfg.env_vars,
runtime_env=ray_cfg.runtime_env,
num_cpus=ray_cfg.num_cpus,
num_gpus=ray_cfg.num_gpus,
Expand All @@ -95,6 +101,7 @@ def name(self):
def __init__(
self,
client: "JobSubmissionClient",
env_vars: Optional[List[str]],
runtime_env: Optional[Dict[str, Any]],
num_cpus: Optional[float],
num_gpus: Optional[float],
Expand All @@ -104,6 +111,7 @@ def __init__(
super().__init__()

self.client = client
self.env_vars = env_vars or []
self.runtime_env = runtime_env or {}
self.num_cpus = num_cpus
self.num_gpus = num_gpus
Expand Down Expand Up @@ -148,6 +156,7 @@ def launch_step(self, step_handler_context: StepHandlerContext) -> Iterator[Dags

user_provided_config = RayExecutionConfig.from_tags({**step_handler_context.step_tags[step_key]})

# note! ray modifies the user-provided runtime_env, so we copy it
runtime_env = (user_provided_config.runtime_env or self.runtime_env).copy()

dagster_env_vars = {
Expand All @@ -158,6 +167,8 @@ def launch_step(self, step_handler_context: StepHandlerContext) -> Iterator[Dags

runtime_env["env_vars"] = {**dagster_env_vars, **runtime_env.get("env_vars", {})} # type: ignore

runtime_env["runtime_env"].update(resolve_env_vars_list(self.env_vars))

num_cpus = self.num_cpus or user_provided_config.num_cpus
num_gpus = self.num_gpus or user_provided_config.num_gpus
memory = self.memory or user_provided_config.memory
Expand Down
16 changes: 12 additions & 4 deletions dagster_ray/kuberay/client/base.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import time
from typing import TYPE_CHECKING, Any, Dict, Optional

from kubernetes import client
from kubernetes.client import ApiException

if TYPE_CHECKING:
from kubernetes import client
from kubernetes.client.models.v1_endpoints import V1Endpoints


Expand All @@ -27,8 +25,10 @@ def __init__(
version: str,
kind: str,
plural: str,
api_client: Optional[client.ApiClient] = None,
api_client: Optional["client.ApiClient"] = None,
):
from kubernetes import client

self.group = group
self.version = version
self.kind = kind
Expand All @@ -38,6 +38,8 @@ def __init__(
self._core_v1_api = client.CoreV1Api(api_client=api_client)

def wait_for_service_endpoints(self, service_name: str, namespace: str, poll_interval: int = 5, timeout: int = 60):
from kubernetes.client import ApiException

start_time = time.time()

while True:
Expand All @@ -62,6 +64,8 @@ def wait_for_service_endpoints(self, service_name: str, namespace: str, poll_int
time.sleep(poll_interval)

def get_status(self, name: str, namespace: str, timeout: int = 60, poll_interval: int = 5) -> Dict[str, Any]:
from kubernetes.client import ApiException

while timeout > 0:
try:
resource: Any = self._api.get_namespaced_custom_object_status(
Expand All @@ -83,6 +87,8 @@ def get_status(self, name: str, namespace: str, timeout: int = 60, poll_interval
raise TimeoutError(f"Timed out waiting for status of {self.kind} {name} in namespace {namespace}")

def list(self, namespace: str, label_selector: str = "", async_req: bool = False) -> Dict[str, Any]:
from kubernetes.client import ApiException

try:
resource: Any = self._api.list_namespaced_custom_object(
group=self.group,
Expand All @@ -103,6 +109,8 @@ def list(self, namespace: str, label_selector: str = "", async_req: bool = False
raise

def get(self, name: str, namespace: str) -> Dict[str, Any]:
from kubernetes.client import ApiException

try:
resource: Any = self._api.get_namespaced_custom_object(
group=self.group,
Expand Down
3 changes: 2 additions & 1 deletion dagster_ray/kuberay/client/raycluster/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
cast,
)

from kubernetes import watch
from typing_extensions import NotRequired

from dagster_ray.kuberay.client.base import BaseKubeRayClient
Expand Down Expand Up @@ -104,6 +103,8 @@ def wait_until_ready(
timeout: int,
image: Optional[str] = None,
) -> Tuple[str, Dict[str, str]]:
from kubernetes import watch

"""
If ready, returns service ip address and a dictionary of ports.
Dictionary keys: ["client", "dashboard", "metrics", "redis", "serve"]
Expand Down
18 changes: 12 additions & 6 deletions dagster_ray/kuberay/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
import re
import string
import sys
from typing import Any, Dict, Generator, Optional, cast
from typing import TYPE_CHECKING, Any, Dict, Generator, Optional, cast

import dagster._check as check
import kubernetes
from dagster import ConfigurableResource, InitResourceContext
from dagster._annotations import experimental
from pydantic import Field, PrivateAttr
Expand All @@ -30,15 +29,18 @@
from dagster_ray._base.resources import BaseRayResource
from dagster_ray.kuberay.client.base import load_kubeconfig

if TYPE_CHECKING:
import kubernetes


@experimental
class RayClusterClientResource(ConfigurableResource):
kube_context: Optional[str] = None
kubeconfig_file: Optional[str] = None

_raycluster_client: RayClusterClient = PrivateAttr()
_k8s_api: kubernetes.client.CustomObjectsApi = PrivateAttr()
_k8s_core_api: kubernetes.client.CoreV1Api = PrivateAttr()
_k8s_api: "kubernetes.client.CustomObjectsApi" = PrivateAttr()
_k8s_core_api: "kubernetes.client.CoreV1Api" = PrivateAttr()

@property
def client(self) -> RayClusterClient:
Expand All @@ -47,18 +49,20 @@ def client(self) -> RayClusterClient:
return self._raycluster_client

@property
def k8s(self) -> kubernetes.client.CustomObjectsApi:
def k8s(self) -> "kubernetes.client.CustomObjectsApi":
if self._k8s_api is None:
raise ValueError(f"{self.__class__.__name__} not initialized")
return self._k8s_api

@property
def k8s_core(self) -> kubernetes.client.CoreV1Api:
def k8s_core(self) -> "kubernetes.client.CoreV1Api":
if self._k8s_core_api is None:
raise ValueError(f"{self.__class__.__name__} not initialized")
return self._k8s_core_api

def setup_for_execution(self, context: InitResourceContext) -> None:
import kubernetes

load_kubeconfig(context=self.kube_context, config_file=self.kubeconfig_file)

self._raycluster_client = RayClusterClient(context=self.kube_context, config_file=self.kubeconfig_file)
Expand Down Expand Up @@ -218,6 +222,8 @@ def update_group_spec(group_spec: Dict[str, Any]):
}

def _wait_raycluster_ready(self):
import kubernetes

self.client.client.wait_until_ready(self.cluster_name, namespace=self.namespace, timeout=self.timeout)

# the above code only checks for RayCluster creation
Expand Down
35 changes: 25 additions & 10 deletions dagster_ray/run_launcher.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import sys
from typing import Any, Dict, Optional
from typing import TYPE_CHECKING, Any, Dict, List, Optional

from dagster import _check as check
from dagster._cli.api import ExecuteRunArgs # type: ignore
Expand All @@ -12,15 +12,24 @@
from dagster._grpc.types import ResumeRunArgs
from dagster._serdes import ConfigurableClass, ConfigurableClassData
from dagster._utils.error import serializable_error_info_from_exc_info
from pydantic import Field

from dagster_ray.config import RayExecutionConfig, RayJobSubmissionClientConfig
from dagster_ray.utils import resolve_env_vars_list

if TYPE_CHECKING:
from ray.job_submission import JobSubmissionClient


def get_job_submission_id_from_run_id(run_id: str, resume_attempt_number=None):
return f"dagster-run-{run_id}" + ("" if not resume_attempt_number else f"-{resume_attempt_number}")


class RayLauncherConfig(RayExecutionConfig, RayJobSubmissionClientConfig): ...
class RayLauncherConfig(RayExecutionConfig, RayJobSubmissionClientConfig):
env_vars: Optional[List[str]] = Field(
default=None,
description="A list of environment variables to inject into the Job. Each can be of the form KEY=VALUE or just KEY (in which case the value will be pulled from the current process).",
)


class RayRunLauncher(RunLauncher, ConfigurableClass):
Expand All @@ -30,6 +39,7 @@ def __init__(
metadata: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, Any]] = None,
cookies: Optional[Dict[str, Any]] = None,
env_vars: Optional[List[str]] = None,
runtime_env: Optional[Dict[str, Any]] = None,
num_cpus: Optional[int] = None,
num_gpus: Optional[int] = None,
Expand All @@ -55,17 +65,13 @@ def __init__(
Fields such as `num_cpus` set via `dagster-ray/config` Run tag will override the yaml configuration.
"""

from ray.job_submission import JobSubmissionClient

self._inst_data = check.opt_inst_param(inst_data, "inst_data", ConfigurableClassData)

self.client = JobSubmissionClient(address, metadata=metadata, headers=headers, cookies=cookies)

self.address = address
self.metadata = metadata
self.headers = headers
self.cookies = cookies
self.env_vars = env_vars
self.runtime_env = runtime_env
self.num_cpus = num_cpus
self.num_gpus = num_gpus
Expand All @@ -74,17 +80,23 @@ def __init__(

super().__init__()

@property
def client(self) -> "JobSubmissionClient": # note: this must be a property
from ray.job_submission import JobSubmissionClient

return JobSubmissionClient(self.address, metadata=self.metadata, headers=self.headers, cookies=self.cookies)

@property
def inst_data(self) -> Optional[ConfigurableClassData]:
return self._inst_data

@classmethod
def config_type(cls) -> UserConfigSchema:
return RayLauncherConfig.to_fields_dict()
return {"ray": RayLauncherConfig.to_config_schema().as_field()}

@classmethod
def from_config_value(cls, inst_data, config_value):
return cls(inst_data=inst_data, **config_value)
return cls(inst_data=inst_data, **config_value["ray"])

@property
def supports_resume_run(self):
Expand Down Expand Up @@ -132,13 +144,16 @@ def _launch_ray_job(self, submission_id: str, entrypoint: str, run: DagsterRun):

cfg_from_tags = RayLauncherConfig.from_tags(run.tags)

runtime_env = cfg_from_tags.runtime_env or self.runtime_env or {}
env_vars = cfg_from_tags.env_vars or self.env_vars or []
# note! ray modifies the user-provided runtime_env, so we copy it
runtime_env = (cfg_from_tags.runtime_env or self.runtime_env or {}).copy()
num_cpus = cfg_from_tags.num_cpus or self.num_cpus
num_gpus = cfg_from_tags.num_gpus or self.num_gpus
memory = cfg_from_tags.memory or self.memory
resources = cfg_from_tags.resources or self.resources

runtime_env["env_vars"] = runtime_env.get("env_vars", {})
runtime_env["env_vars"].update(resolve_env_vars_list(env_vars))
runtime_env["env_vars"].update(
{
"DAGSTER_RUN_JOB_NAME": job_origin.job_name,
Expand Down
17 changes: 17 additions & 0 deletions dagster_ray/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import os
from typing import Dict, List, Optional


def resolve_env_vars_list(env_vars: Optional[List[str]]) -> Dict[str, str]:
res = {}

if env_vars is not None:
for env_var in env_vars:
if "=" in env_var:
var, value = env_var.split("=", 1)
res[var] = value
else:
if value := os.getenv(env_var):
res[env_var] = value

return res
34 changes: 34 additions & 0 deletions examples/docker/run_launcher/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Dagster libraries to run both dagster-webserver and the dagster-daemon. Does not
# need to have access to any pipeline code.

FROM python:3.10-slim

ENV UV_SYSTEM_PYTHON=1

# install uv
COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv

RUN --mount=type=cache,target=/root/.cache/uv \
uv pip install \
dagster \
dagster-graphql \
dagster-webserver \
dagster-postgres \
dagster-docker \
dagster-k8s \
ray[all]

WORKDIR /src

COPY pyproject.toml README.md ./
COPY dagster_ray ./dagster_ray

RUN --mount=type=cache,target=/root/.cache/uv \
uv pip install -e .

# Set $DAGSTER_HOME and copy dagster instance and workspace YAML there
ENV DAGSTER_HOME=/dagster_home

WORKDIR $DAGSTER_HOME

COPY examples/docker/run_launcher/ ./
13 changes: 13 additions & 0 deletions examples/docker/run_launcher/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Example Dagster + Ray deployment with Docker

1. Start all services with Docker Compose:

```shell
docker compose up --build
```

2. In your browser, open `localhost:3000` to access the Dagster UI and `localhost:8265` to access the Ray dashboard.

3. Launch a job. Observe how the steps are executed in separate Ray jobs in parallel.

4. Make changes to `src/definitions.py`.
Loading

0 comments on commit 95ee70d

Please sign in to comment.