-
Notifications
You must be signed in to change notification settings - Fork 510
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Add ray integration support (#2400)
- Loading branch information
Showing
4 changed files
with
139 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
from sentry_sdk.integrations import DidNotEnable, Integration | ||
|
||
try: | ||
import ray | ||
except ImportError: | ||
raise DidNotEnable("Ray not installed.") | ||
import functools | ||
|
||
from sentry_sdk.tracing import TRANSACTION_SOURCE_TASK | ||
import logging | ||
import sentry_sdk | ||
from importlib.metadata import version | ||
|
||
|
||
def _check_sentry_initialized(): | ||
if sentry_sdk.Hub.current.client: | ||
return | ||
# we cannot use sentry sdk logging facilities because it wasn't initialized | ||
logger = logging.getLogger("sentry_sdk.errors") | ||
logger.warning( | ||
"[Tracing] Sentry not initialized in ray cluster worker, performance data will be discarded." | ||
) | ||
|
||
|
||
def _patch_ray_remote(): | ||
old_remote = ray.remote | ||
|
||
@functools.wraps(old_remote) | ||
def new_remote(f, *args, **kwargs): | ||
def _f(*f_args, _tracing=None, **f_kwargs): | ||
_check_sentry_initialized() | ||
with sentry_sdk.start_transaction( | ||
sentry_sdk.continue_trace( | ||
_tracing, | ||
op="ray.remote.receive", | ||
source=TRANSACTION_SOURCE_TASK, | ||
name="Ray worker transaction", | ||
) | ||
) as tx: | ||
result = f(*f_args, **f_kwargs) | ||
tx.set_status("ok") | ||
return result | ||
|
||
_f = old_remote(_f, *args, *kwargs) | ||
old_remote_method = _f.remote | ||
|
||
def _remote_method_with_header_propagation(*args, **kwargs): | ||
with sentry_sdk.start_span( | ||
op="ray.remote.send", description="Sending task to ray cluster." | ||
): | ||
tracing = { | ||
k: v | ||
for k, v in sentry_sdk.Hub.current.iter_trace_propagation_headers() | ||
} | ||
return old_remote_method(*args, **kwargs, _tracing=tracing) | ||
|
||
_f.remote = _remote_method_with_header_propagation | ||
|
||
return _f | ||
|
||
ray.remote = new_remote | ||
return | ||
|
||
|
||
class RayIntegration(Integration): | ||
identifier = "ray" | ||
|
||
@staticmethod | ||
def setup_once(): | ||
if tuple(int(x) for x in version("ray").split(".")) < (2, 7, 0): | ||
raise DidNotEnable("Ray 2.7.0 or newer required") | ||
_patch_ray_remote() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
import pytest | ||
|
||
pytest.importorskip("ray") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
import time | ||
|
||
import ray | ||
|
||
import sentry_sdk | ||
from sentry_sdk.envelope import Envelope | ||
from sentry_sdk.integrations.ray import RayIntegration | ||
from tests.conftest import TestTransport | ||
|
||
|
||
class RayTestTransport(TestTransport): | ||
def __init__(self): | ||
self.events = [] | ||
self.envelopes = [] | ||
super().__init__(self.events.append, self.envelopes.append) | ||
|
||
|
||
def _setup_ray_sentry(): | ||
sentry_sdk.init( | ||
traces_sample_rate=1.0, | ||
integrations=[RayIntegration()], | ||
transport=RayTestTransport(), | ||
) | ||
|
||
|
||
def test_ray(): | ||
_setup_ray_sentry() | ||
|
||
@ray.remote | ||
def _task(): | ||
with sentry_sdk.start_span(op="task", description="example task step"): | ||
time.sleep(0.1) | ||
return sentry_sdk.Hub.current.client.transport.envelopes | ||
|
||
ray.init( | ||
runtime_env=dict(worker_process_setup_hook=_setup_ray_sentry, working_dir="./") | ||
) | ||
|
||
with sentry_sdk.start_transaction(op="task", name="ray test transaction"): | ||
worker_envelopes = ray.get(_task.remote()) | ||
|
||
_assert_envelopes_are_associated_with_same_trace_id( | ||
sentry_sdk.Hub.current.client.transport.envelopes[0], worker_envelopes[0] | ||
) | ||
|
||
|
||
def _assert_envelopes_are_associated_with_same_trace_id( | ||
client_side_envelope: Envelope, worker_envelope: Envelope | ||
): | ||
client_side_envelope_dict = client_side_envelope.get_transaction_event() | ||
worker_envelope_dict = worker_envelope.get_transaction_event() | ||
trace_id = client_side_envelope_dict["contexts"]["trace"]["trace_id"] | ||
for span in client_side_envelope_dict["spans"]: | ||
assert span["trace_id"] == trace_id | ||
for span in worker_envelope_dict["spans"]: | ||
assert span["trace_id"] == trace_id | ||
assert worker_envelope_dict["contexts"]["trace"]["trace_id"] == trace_id |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters