Skip to content

Commit

Permalink
ref(rq): Use new scopes API (#2881)
Browse files Browse the repository at this point in the history
  • Loading branch information
sentrivana authored Mar 22, 2024
1 parent f228f70 commit f416845
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 39 deletions.
41 changes: 14 additions & 27 deletions sentry_sdk/integrations/rq.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import weakref

import sentry_sdk
from sentry_sdk.consts import OP
from sentry_sdk.api import continue_trace
from sentry_sdk.hub import Hub
from sentry_sdk.integrations import DidNotEnable, Integration
from sentry_sdk.integrations.logging import ignore_logger
from sentry_sdk.tracing import TRANSACTION_SOURCE_TASK
from sentry_sdk.scope import Scope
from sentry_sdk.utils import (
capture_internal_exceptions,
ensure_integration_enabled,
event_from_exception,
format_timestamp,
parse_version,
Expand Down Expand Up @@ -51,18 +52,10 @@ def setup_once():

old_perform_job = Worker.perform_job

@ensure_integration_enabled(RqIntegration, old_perform_job)
def sentry_patched_perform_job(self, job, *args, **kwargs):
# type: (Any, Job, *Queue, **Any) -> bool
hub = Hub.current
integration = hub.get_integration(RqIntegration)

if integration is None:
return old_perform_job(self, job, *args, **kwargs)

client = hub.client
assert client is not None

with hub.push_scope() as scope:
with sentry_sdk.new_scope() as scope:
scope.clear_breadcrumbs()
scope.add_event_processor(_make_event_processor(weakref.ref(job)))

Expand All @@ -76,7 +69,7 @@ def sentry_patched_perform_job(self, job, *args, **kwargs):
with capture_internal_exceptions():
transaction.name = job.func_name

with hub.start_transaction(
with sentry_sdk.start_transaction(
transaction, custom_sampling_context={"rq_job": job}
):
rv = old_perform_job(self, job, *args, **kwargs)
Expand All @@ -85,7 +78,7 @@ def sentry_patched_perform_job(self, job, *args, **kwargs):
# We're inside of a forked process and RQ is
# about to call `os._exit`. Make sure that our
# events get sent out.
client.flush()
sentry_sdk.get_client().flush()

return rv

Expand All @@ -106,15 +99,14 @@ def sentry_patched_handle_exception(self, job, *exc_info, **kwargs):

old_enqueue_job = Queue.enqueue_job

@ensure_integration_enabled(RqIntegration, old_enqueue_job)
def sentry_patched_enqueue_job(self, job, **kwargs):
# type: (Queue, Any, **Any) -> Any
hub = Hub.current
if hub.get_integration(RqIntegration) is not None:
scope = Scope.get_current_scope()
if scope.span is not None:
job.meta["_sentry_trace_headers"] = dict(
scope.iter_trace_propagation_headers()
)
scope = Scope.get_current_scope()
if scope.span is not None:
job.meta["_sentry_trace_headers"] = dict(
scope.iter_trace_propagation_headers()
)

return old_enqueue_job(self, job, **kwargs)

Expand Down Expand Up @@ -158,17 +150,12 @@ def event_processor(event, hint):

def _capture_exception(exc_info, **kwargs):
# type: (ExcInfo, **Any) -> None
hub = Hub.current
if hub.get_integration(RqIntegration) is None:
return

# If an integration is there, a client has to be there.
client = hub.client # type: Any
client = sentry_sdk.get_client()

event, hint = event_from_exception(
exc_info,
client_options=client.options,
mechanism={"type": "rq", "handled": False},
)

hub.capture_event(event, hint=hint)
sentry_sdk.capture_event(event, hint=hint)
23 changes: 11 additions & 12 deletions tests/integrations/rq/test_rq.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
import rq
from fakeredis import FakeStrictRedis

from sentry_sdk import configure_scope, start_transaction
from sentry_sdk import start_transaction
from sentry_sdk.integrations.rq import RqIntegration
from sentry_sdk.scope import Scope
from sentry_sdk.utils import parse_version


Expand Down Expand Up @@ -178,19 +179,17 @@ def test_tracing_disabled(
queue = rq.Queue(connection=FakeStrictRedis())
worker = rq.SimpleWorker([queue], connection=queue.connection)

with configure_scope() as scope:
queue.enqueue(crashing_job, foo=None)
worker.work(burst=True)
scope = Scope.get_isolation_scope()
queue.enqueue(crashing_job, foo=None)
worker.work(burst=True)

(error_event,) = events
(error_event,) = events

assert (
error_event["transaction"] == "tests.integrations.rq.test_rq.crashing_job"
)
assert (
error_event["contexts"]["trace"]["trace_id"]
== scope._propagation_context["trace_id"]
)
assert error_event["transaction"] == "tests.integrations.rq.test_rq.crashing_job"
assert (
error_event["contexts"]["trace"]["trace_id"]
== scope._propagation_context["trace_id"]
)


def test_transaction_no_error(
Expand Down

0 comments on commit f416845

Please sign in to comment.