diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index 1fbe6ff72e..0b7fc8117c 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -388,6 +388,7 @@ class OP: LANGCHAIN_AGENT = "ai.agent.langchain" LANGCHAIN_CHAT_COMPLETIONS_CREATE = "ai.chat_completions.create.langchain" QUEUE_PROCESS = "queue.process" + QUEUE_PUBLISH = "queue.publish" QUEUE_SUBMIT_ARQ = "queue.submit.arq" QUEUE_TASK_ARQ = "queue.task.arq" QUEUE_SUBMIT_CELERY = "queue.submit.celery" diff --git a/sentry_sdk/integrations/celery/__init__.py b/sentry_sdk/integrations/celery/__init__.py index 521d37dc86..d75950392f 100644 --- a/sentry_sdk/integrations/celery/__init__.py +++ b/sentry_sdk/integrations/celery/__init__.py @@ -1,4 +1,5 @@ import sys +from collections.abc import Mapping from functools import wraps import sentry_sdk @@ -47,6 +48,7 @@ Retry, SoftTimeLimitExceeded, ) + from kombu import Producer # type: ignore except ImportError: raise DidNotEnable("Celery not installed") @@ -82,6 +84,7 @@ def setup_once(): _patch_build_tracer() _patch_task_apply_async() _patch_worker_exit() + _patch_producer_publish() # This logger logs every status of every task that ran on the worker. # Meaning that every task's breadcrumbs are full of stuff like "Task @@ -433,3 +436,44 @@ def sentry_workloop(*args, **kwargs): sentry_sdk.flush() Worker.workloop = sentry_workloop + + +def _patch_producer_publish(): + # type: () -> None + original_publish = Producer.publish + + @ensure_integration_enabled(CeleryIntegration, original_publish) + def sentry_publish(self, *args, **kwargs): + # type: (Producer, *Any, **Any) -> Any + kwargs_headers = kwargs.get("headers", {}) + if not isinstance(kwargs_headers, Mapping): + # Ensure kwargs_headers is a Mapping, so we can safely call get() + kwargs_headers = {} + + task_name = kwargs_headers.get("task") + task_id = kwargs_headers.get("id") + retries = kwargs_headers.get("retries") + + routing_key = kwargs.get("routing_key") + exchange = kwargs.get("exchange") + + with sentry_sdk.start_span(op=OP.QUEUE_PUBLISH, description=task_name) as span: + if task_id is not None: + span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task_id) + + if exchange == "" and routing_key is not None: + # Empty exchange indicates the default exchange, meaning messages are + # routed to the queue with the same name as the routing key. + span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key) + + if retries is not None: + span.set_data(SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, retries) + + with capture_internal_exceptions(): + span.set_data( + SPANDATA.MESSAGING_SYSTEM, self.connection.transport.driver_type + ) + + return original_publish(self, *args, **kwargs) + + Producer.publish = sentry_publish diff --git a/tests/integrations/celery/test_celery.py b/tests/integrations/celery/test_celery.py index 4f71d84809..d8308c5978 100644 --- a/tests/integrations/celery/test_celery.py +++ b/tests/integrations/celery/test_celery.py @@ -1,4 +1,5 @@ import threading +import kombu from unittest import mock import pytest @@ -722,3 +723,34 @@ def task(): ... (event,) = events (span,) = event["spans"] assert span["data"]["messaging.system"] == system + + +@pytest.mark.parametrize("system", ("amqp", "redis")) +def test_producer_span_data(system, monkeypatch, sentry_init, capture_events): + old_publish = kombu.messaging.Producer._publish + + def publish(*args, **kwargs): + pass + + monkeypatch.setattr(kombu.messaging.Producer, "_publish", publish) + + sentry_init(integrations=[CeleryIntegration()], enable_tracing=True) + celery = Celery(__name__, broker=f"{system}://example.com") # noqa: E231 + events = capture_events() + + @celery.task() + def task(): ... + + with start_transaction(): + task.apply_async() + + (event,) = events + span = next(span for span in event["spans"] if span["op"] == "queue.publish") + + assert span["data"]["messaging.system"] == system + + assert span["data"]["messaging.destination.name"] == "celery" + assert "messaging.message.id" in span["data"] + assert span["data"]["messaging.message.retry.count"] == 0 + + monkeypatch.setattr(kombu.messaging.Producer, "_publish", old_publish)