Skip to content

Commit

Permalink
feat(celery): Queues module producer implementation
Browse files Browse the repository at this point in the history
Fixes GH-3078
  • Loading branch information
szokeasaurusrex committed May 17, 2024
1 parent b922f40 commit 0daa1b4
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 0 deletions.
1 change: 1 addition & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ class OP:
LANGCHAIN_AGENT = "ai.agent.langchain"
LANGCHAIN_CHAT_COMPLETIONS_CREATE = "ai.chat_completions.create.langchain"
QUEUE_PROCESS = "queue.process"
QUEUE_PUBLISH = "queue.publish"
QUEUE_SUBMIT_ARQ = "queue.submit.arq"
QUEUE_TASK_ARQ = "queue.task.arq"
QUEUE_SUBMIT_CELERY = "queue.submit.celery"
Expand Down
44 changes: 44 additions & 0 deletions sentry_sdk/integrations/celery/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sys
from collections.abc import Mapping
from functools import wraps

import sentry_sdk
Expand Down Expand Up @@ -47,6 +48,7 @@
Retry,
SoftTimeLimitExceeded,
)
from kombu import Producer
except ImportError:
raise DidNotEnable("Celery not installed")

Expand Down Expand Up @@ -82,6 +84,7 @@ def setup_once():
_patch_build_tracer()
_patch_task_apply_async()
_patch_worker_exit()
_patch_producer_publish()

# This logger logs every status of every task that ran on the worker.
# Meaning that every task's breadcrumbs are full of stuff like "Task
Expand Down Expand Up @@ -433,3 +436,44 @@ def sentry_workloop(*args, **kwargs):
sentry_sdk.flush()

Worker.workloop = sentry_workloop


def _patch_producer_publish():
# type: () -> None
original_publish = Producer.publish

@ensure_integration_enabled(CeleryIntegration, original_publish)
def sentry_publish(self, *args, **kwargs):
# type: (Producer, *Any, **Any) -> Any
kwargs_headers = kwargs.get("headers", {})
if not isinstance(kwargs_headers, Mapping):
# Ensure kwargs_headers is a Mapping, so we can safely call get()
kwargs_headers = {}

task_name = kwargs_headers.get("task")
task_id = kwargs_headers.get("id")
retries = kwargs_headers.get("retries")

routing_key = kwargs.get("routing_key")
exchange = kwargs.get("exchange")

with sentry_sdk.start_span(op=OP.QUEUE_PUBLISH, description=task_name) as span:
if task_id is not None:
span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task_id)

if exchange == "" and routing_key is not None:
# Empty exchange indicates the default exchange, meaning messages are
# routed to the queue with the same name as the routing key.
span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)

if retries is not None:
span.set_data(SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, retries)

with capture_internal_exceptions():
span.set_data(
SPANDATA.MESSAGING_SYSTEM, self.connection.transport.driver_type
)

return original_publish(self, *args, **kwargs)

Producer.publish = sentry_publish
32 changes: 32 additions & 0 deletions tests/integrations/celery/test_celery.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import threading
import kombu
from unittest import mock

import pytest
Expand Down Expand Up @@ -722,3 +723,34 @@ def task(): ...
(event,) = events
(span,) = event["spans"]
assert span["data"]["messaging.system"] == system


@pytest.mark.parametrize("system", ("amqp", "redis"))
def test_producer_span_data(system, monkeypatch, sentry_init, capture_events):
old_publish = kombu.messaging.Producer._publish

def publish(*args, **kwargs):
pass

monkeypatch.setattr(kombu.messaging.Producer, "_publish", publish)

sentry_init(integrations=[CeleryIntegration()], enable_tracing=True)
celery = Celery(__name__, broker=f"{system}://example.com") # noqa: E231
events = capture_events()

@celery.task()
def task(): ...

with start_transaction():
task.apply_async()

(event,) = events
span = next(span for span in event["spans"] if span["op"] == "queue.publish")

assert span["data"]["messaging.system"] == system

assert span["data"]["messaging.destination.name"] == "celery"
assert "messaging.message.id" in span["data"]
assert span["data"]["messaging.message.retry.count"] == 0

monkeypatch.setattr(kombu.messaging.Producer, "_publish", old_publish)

0 comments on commit 0daa1b4

Please sign in to comment.