Skip to content

Commit

Permalink
feat: Add ray integration support (#2400) (#2444)
Browse files Browse the repository at this point in the history
Adds a basic instrumentation for the Ray framework (https://www.ray.io/)

Closes #2400

----

Co-authored-by: Anton Pirker <[email protected]>
Co-authored-by: Ivana Kellyer <[email protected]>
  • Loading branch information
3 people authored Aug 13, 2024
1 parent 4858996 commit 17a6cf0
Show file tree
Hide file tree
Showing 7 changed files with 374 additions and 0 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/test-integrations-data-processing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ jobs:
run: |
set -x # print commands that are executed
./scripts/runtox.sh "py${{ matrix.python-version }}-huey-latest"
- name: Test ray latest
run: |
set -x # print commands that are executed
./scripts/runtox.sh "py${{ matrix.python-version }}-ray-latest"
- name: Test rq latest
run: |
set -x # print commands that are executed
Expand Down Expand Up @@ -139,6 +143,10 @@ jobs:
run: |
set -x # print commands that are executed
./scripts/runtox.sh --exclude-latest "py${{ matrix.python-version }}-huey"
- name: Test ray pinned
run: |
set -x # print commands that are executed
./scripts/runtox.sh --exclude-latest "py${{ matrix.python-version }}-ray"
- name: Test rq pinned
run: |
set -x # print commands that are executed
Expand Down
1 change: 1 addition & 0 deletions scripts/split-tox-gh-actions/split-tox-gh-actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
"celery",
"dramatiq",
"huey",
"ray",
"rq",
"spark",
],
Expand Down
2 changes: 2 additions & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,8 @@ class OP:
QUEUE_TASK_RQ = "queue.task.rq"
QUEUE_SUBMIT_HUEY = "queue.submit.huey"
QUEUE_TASK_HUEY = "queue.task.huey"
QUEUE_SUBMIT_RAY = "queue.submit.ray"
QUEUE_TASK_RAY = "queue.task.ray"
SUBPROCESS = "subprocess"
SUBPROCESS_WAIT = "subprocess.wait"
SUBPROCESS_COMMUNICATE = "subprocess.communicate"
Expand Down
146 changes: 146 additions & 0 deletions sentry_sdk/integrations/ray.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import inspect
import sys

import sentry_sdk
from sentry_sdk.consts import OP, SPANSTATUS
from sentry_sdk.integrations import DidNotEnable, Integration
from sentry_sdk.tracing import TRANSACTION_SOURCE_TASK
from sentry_sdk.utils import (
event_from_exception,
logger,
package_version,
qualname_from_function,
reraise,
)

try:
import ray # type: ignore[import-not-found]
except ImportError:
raise DidNotEnable("Ray not installed.")
import functools

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from collections.abc import Callable
from typing import Any, Optional
from sentry_sdk.utils import ExcInfo


def _check_sentry_initialized():
# type: () -> None
if sentry_sdk.get_client().is_active():
return

logger.debug(
"[Tracing] Sentry not initialized in ray cluster worker, performance data will be discarded."
)


def _patch_ray_remote():
# type: () -> None
old_remote = ray.remote

@functools.wraps(old_remote)
def new_remote(f, *args, **kwargs):
# type: (Callable[..., Any], *Any, **Any) -> Callable[..., Any]
if inspect.isclass(f):
# Ray Actors
# (https://docs.ray.io/en/latest/ray-core/actors.html)
# are not supported
# (Only Ray Tasks are supported)
return old_remote(f, *args, *kwargs)

def _f(*f_args, _tracing=None, **f_kwargs):
# type: (Any, Optional[dict[str, Any]], Any) -> Any
"""
Ray Worker
"""
_check_sentry_initialized()

transaction = sentry_sdk.continue_trace(
_tracing or {},
op=OP.QUEUE_TASK_RAY,
name=qualname_from_function(f),
origin=RayIntegration.origin,
source=TRANSACTION_SOURCE_TASK,
)

with sentry_sdk.start_transaction(transaction) as transaction:
try:
result = f(*f_args, **f_kwargs)
transaction.set_status(SPANSTATUS.OK)
except Exception:
transaction.set_status(SPANSTATUS.INTERNAL_ERROR)
exc_info = sys.exc_info()
_capture_exception(exc_info)
reraise(*exc_info)

return result

rv = old_remote(_f, *args, *kwargs)
old_remote_method = rv.remote

def _remote_method_with_header_propagation(*args, **kwargs):
# type: (*Any, **Any) -> Any
"""
Ray Client
"""
with sentry_sdk.start_span(
op=OP.QUEUE_SUBMIT_RAY,
description=qualname_from_function(f),
origin=RayIntegration.origin,
) as span:
tracing = {
k: v
for k, v in sentry_sdk.get_current_scope().iter_trace_propagation_headers()
}
try:
result = old_remote_method(*args, **kwargs, _tracing=tracing)
span.set_status(SPANSTATUS.OK)
except Exception:
span.set_status(SPANSTATUS.INTERNAL_ERROR)
exc_info = sys.exc_info()
_capture_exception(exc_info)
reraise(*exc_info)

return result

rv.remote = _remote_method_with_header_propagation

return rv

ray.remote = new_remote


def _capture_exception(exc_info, **kwargs):
# type: (ExcInfo, **Any) -> None
client = sentry_sdk.get_client()

event, hint = event_from_exception(
exc_info,
client_options=client.options,
mechanism={
"handled": False,
"type": RayIntegration.identifier,
},
)
sentry_sdk.capture_event(event, hint=hint)


class RayIntegration(Integration):
identifier = "ray"
origin = f"auto.queue.{identifier}"

@staticmethod
def setup_once():
# type: () -> None
version = package_version("ray")

if version is None:
raise DidNotEnable("Unparsable ray version: {}".format(version))

if version < (2, 7, 0):
raise DidNotEnable("Ray 2.7.0 or newer required")

_patch_ray_remote()
3 changes: 3 additions & 0 deletions tests/integrations/ray/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import pytest

pytest.importorskip("ray")
205 changes: 205 additions & 0 deletions tests/integrations/ray/test_ray.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
import json
import os
import pytest

import ray

import sentry_sdk
from sentry_sdk.envelope import Envelope
from sentry_sdk.integrations.ray import RayIntegration
from tests.conftest import TestTransport


class RayTestTransport(TestTransport):
def __init__(self):
self.envelopes = []
super().__init__()

def capture_envelope(self, envelope: Envelope) -> None:
self.envelopes.append(envelope)


class RayLoggingTransport(TestTransport):
def __init__(self):
super().__init__()

def capture_envelope(self, envelope: Envelope) -> None:
print(envelope.serialize().decode("utf-8", "replace"))


def setup_sentry_with_logging_transport():
setup_sentry(transport=RayLoggingTransport())


def setup_sentry(transport=None):
sentry_sdk.init(
integrations=[RayIntegration()],
transport=RayTestTransport() if transport is None else transport,
traces_sample_rate=1.0,
)


@pytest.mark.forked
def test_ray_tracing():
setup_sentry()

ray.init(
runtime_env={
"worker_process_setup_hook": setup_sentry,
"working_dir": "./",
}
)

@ray.remote
def example_task():
with sentry_sdk.start_span(op="task", description="example task step"):
...

return sentry_sdk.get_client().transport.envelopes

with sentry_sdk.start_transaction(op="task", name="ray test transaction"):
worker_envelopes = ray.get(example_task.remote())

client_envelope = sentry_sdk.get_client().transport.envelopes[0]
client_transaction = client_envelope.get_transaction_event()
worker_envelope = worker_envelopes[0]
worker_transaction = worker_envelope.get_transaction_event()

assert (
client_transaction["contexts"]["trace"]["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
)

for span in client_transaction["spans"]:
assert (
span["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
)

for span in worker_transaction["spans"]:
assert (
span["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
)


@pytest.mark.forked
def test_ray_spans():
setup_sentry()

ray.init(
runtime_env={
"worker_process_setup_hook": setup_sentry,
"working_dir": "./",
}
)

@ray.remote
def example_task():
return sentry_sdk.get_client().transport.envelopes

with sentry_sdk.start_transaction(op="task", name="ray test transaction"):
worker_envelopes = ray.get(example_task.remote())

client_envelope = sentry_sdk.get_client().transport.envelopes[0]
client_transaction = client_envelope.get_transaction_event()
worker_envelope = worker_envelopes[0]
worker_transaction = worker_envelope.get_transaction_event()

for span in client_transaction["spans"]:
assert span["op"] == "queue.submit.ray"
assert span["origin"] == "auto.queue.ray"

for span in worker_transaction["spans"]:
assert span["op"] == "queue.task.ray"
assert span["origin"] == "auto.queue.ray"


@pytest.mark.forked
def test_ray_errors():
setup_sentry_with_logging_transport()

ray.init(
runtime_env={
"worker_process_setup_hook": setup_sentry_with_logging_transport,
"working_dir": "./",
}
)

@ray.remote
def example_task():
1 / 0

with sentry_sdk.start_transaction(op="task", name="ray test transaction"):
with pytest.raises(ZeroDivisionError):
future = example_task.remote()
ray.get(future)

job_id = future.job_id().hex()

# Read the worker log output containing the error
log_dir = "/tmp/ray/session_latest/logs/"
log_file = [
f
for f in os.listdir(log_dir)
if "worker" in f and job_id in f and f.endswith(".out")
][0]
with open(os.path.join(log_dir, log_file), "r") as file:
lines = file.readlines()
# parse error object from log line
error = json.loads(lines[4][:-1])

assert error["level"] == "error"
assert (
error["transaction"]
== "tests.integrations.ray.test_ray.test_ray_errors.<locals>.example_task"
) # its in the worker, not the client thus not "ray test transaction"
assert error["exception"]["values"][0]["mechanism"]["type"] == "ray"
assert not error["exception"]["values"][0]["mechanism"]["handled"]


@pytest.mark.forked
def test_ray_actor():
setup_sentry()

ray.init(
runtime_env={
"worker_process_setup_hook": setup_sentry,
"working_dir": "./",
}
)

@ray.remote
class Counter(object):
def __init__(self):
self.n = 0

def increment(self):
with sentry_sdk.start_span(op="task", description="example task step"):
self.n += 1

return sentry_sdk.get_client().transport.envelopes

with sentry_sdk.start_transaction(op="task", name="ray test transaction"):
counter = Counter.remote()
worker_envelopes = ray.get(counter.increment.remote())

# Currently no transactions/spans are captured in actors
assert worker_envelopes == []

client_envelope = sentry_sdk.get_client().transport.envelopes[0]
client_transaction = client_envelope.get_transaction_event()

assert (
client_transaction["contexts"]["trace"]["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
)

for span in client_transaction["spans"]:
assert (
span["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
)
Loading

0 comments on commit 17a6cf0

Please sign in to comment.