Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(arq): Use new scopes API #2878

Merged
merged 6 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 12 additions & 20 deletions sentry_sdk/integrations/arq.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import sys

import sentry_sdk
from sentry_sdk._types import TYPE_CHECKING
from sentry_sdk import Hub
from sentry_sdk.consts import OP
from sentry_sdk.hub import _should_send_default_pii
from sentry_sdk.integrations import DidNotEnable, Integration
from sentry_sdk.integrations.logging import ignore_logger
from sentry_sdk.scope import Scope
from sentry_sdk.scope import Scope, should_send_default_pii
from sentry_sdk.tracing import Transaction, TRANSACTION_SOURCE_TASK
from sentry_sdk.utils import (
capture_internal_exceptions,
Expand Down Expand Up @@ -72,12 +71,10 @@ def patch_enqueue_job():

async def _sentry_enqueue_job(self, function, *args, **kwargs):
# type: (ArqRedis, str, *Any, **Any) -> Optional[Job]
hub = Hub.current

if hub.get_integration(ArqIntegration) is None:
if sentry_sdk.get_client().get_integration(ArqIntegration) is None:
return await old_enqueue_job(self, function, *args, **kwargs)
sentrivana marked this conversation as resolved.
Show resolved Hide resolved

with hub.start_span(op=OP.QUEUE_SUBMIT_ARQ, description=function):
with sentry_sdk.start_span(op=OP.QUEUE_SUBMIT_ARQ, description=function):
return await old_enqueue_job(self, function, *args, **kwargs)

ArqRedis.enqueue_job = _sentry_enqueue_job
Expand All @@ -89,12 +86,10 @@ def patch_run_job():

async def _sentry_run_job(self, job_id, score):
# type: (Worker, str, int) -> None
hub = Hub(Hub.current)

if hub.get_integration(ArqIntegration) is None:
if sentry_sdk.get_client().get_integration(ArqIntegration) is None:
return await old_run_job(self, job_id, score)
sentrivana marked this conversation as resolved.
Show resolved Hide resolved

with hub.push_scope() as scope:
with sentry_sdk.new_scope() as scope:
scope._name = "arq"
scope.clear_breadcrumbs()

Expand All @@ -105,7 +100,7 @@ async def _sentry_run_job(self, job_id, score):
source=TRANSACTION_SOURCE_TASK,
)

with hub.start_transaction(transaction):
with sentry_sdk.start_transaction(transaction):
return await old_run_job(self, job_id, score)

Worker.run_job = _sentry_run_job
Expand Down Expand Up @@ -148,10 +143,10 @@ def event_processor(event, hint):
extra["arq-job"] = {
"task": ctx["job_name"],
"args": (
args if _should_send_default_pii() else SENSITIVE_DATA_SUBSTITUTE
args if should_send_default_pii() else SENSITIVE_DATA_SUBSTITUTE
),
"kwargs": (
kwargs if _should_send_default_pii() else SENSITIVE_DATA_SUBSTITUTE
kwargs if should_send_default_pii() else SENSITIVE_DATA_SUBSTITUTE
),
"retry": ctx["job_try"],
}
Expand All @@ -165,11 +160,10 @@ def _wrap_coroutine(name, coroutine):
# type: (str, WorkerCoroutine) -> WorkerCoroutine
async def _sentry_coroutine(ctx, *args, **kwargs):
# type: (Dict[Any, Any], *Any, **Any) -> Any
hub = Hub.current
if hub.get_integration(ArqIntegration) is None:
if sentry_sdk.get_client().get_integration(ArqIntegration) is None:
return await coroutine(ctx, *args, **kwargs)

hub.scope.add_event_processor(
Scope.get_current_scope().add_event_processor(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure this event processor belongs on the scope, and not the isolation scope?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This I wasn't sure about. Thinking about it again, I think isolation scope makes more sense. Will change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's also another current_scope a bit up. I think this is what confused me. It feels like both should be isolation scopes. What do you think @szokeasaurusrex @antonpirker?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think event processors should be on the isolation scope, because they should be valid for the whole request.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what @antonpirker said makes sense

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried it out now and changing either of them (or both) to work with the isolation scope makes the tests fail. 🙃 Investigating...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok this works: 9d197a3

  • the current transaction is saved on the current scope, so all scopes in the arq integration that try to update transaction attrs also have to go to the current scope
  • the event processor itself is now on the isolation scope

_make_event_processor({**ctx, "job_name": name}, *args, **kwargs)
)

Expand All @@ -191,9 +185,7 @@ def patch_create_worker():

def _sentry_create_worker(*args, **kwargs):
# type: (*Any, **Any) -> Worker
hub = Hub.current

if hub.get_integration(ArqIntegration) is None:
if sentry_sdk.get_client().get_integration(ArqIntegration) is None:
return old_create_worker(*args, **kwargs)

settings_cls = args[0]
Expand Down
4 changes: 2 additions & 2 deletions tests/integrations/arq/test_arq.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import pytest

from sentry_sdk import start_transaction, Hub
from sentry_sdk import get_client, start_transaction
from sentry_sdk.integrations.arq import ArqIntegration

import arq.worker
Expand Down Expand Up @@ -245,7 +245,7 @@ async def dummy_job(_ctx):

pool, worker = init_arq([dummy_job])
# remove the integration to trigger the edge case
Hub.current.client.integrations.pop("arq")
get_client().integrations.pop("arq")

job = await pool.enqueue_job("dummy_job")

Expand Down
Loading