Skip to content

Commit

Permalink
fix(celery): handle upstream celery patch and propagation during an e…
Browse files Browse the repository at this point in the history
…rror

Prior to this change two things weren't working:

1. An upstream Celery patch broke context propagation due
   to the way ddtrace propagates headers. Prior to this upstream patch,
   context had to be propagated under kwargs[headers][headers], but
   after the patch it can be propagated under kwargs[headers].  We now
   look in the right places for propagated information and retrieve from
   the right places.

2. Prior to this patch, If Task-A calls Task-B and Task-A errors out
   before Task-B creates its span, then Task-B would fail to parent
   itself on Task-A (instead going one level up).  This caused errant
   service-entry `celery.apply` spans on workers, which should always
   have `celery.run` service entry spans.
  • Loading branch information
tabgok committed Nov 1, 2024
1 parent 223c261 commit c112d7b
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 10 deletions.
3 changes: 2 additions & 1 deletion ddtrace/contrib/internal/celery/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@


# Celery Context key
CTX_KEY = "__dd_task_span"
SPAN_KEY = "__dd_task_span"
CTX_KEY = "__dd_task_context"

# Span names
PRODUCER_ROOT_SPAN = "celery.apply"
Expand Down
29 changes: 24 additions & 5 deletions ddtrace/contrib/internal/celery/signals.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import celery
from celery import registry
from celery.utils import nodenames

Expand All @@ -9,8 +10,10 @@
from ddtrace.contrib import trace_utils
from ddtrace.contrib.internal.celery import constants as c
from ddtrace.contrib.internal.celery.utils import attach_span
from ddtrace.contrib.internal.celery.utils import attach_span_context
from ddtrace.contrib.internal.celery.utils import detach_span
from ddtrace.contrib.internal.celery.utils import retrieve_span
from ddtrace.contrib.internal.celery.utils import retrieve_span_context
from ddtrace.contrib.internal.celery.utils import retrieve_task_id
from ddtrace.contrib.internal.celery.utils import set_tags_from_context
from ddtrace.ext import SpanKind
Expand Down Expand Up @@ -65,6 +68,8 @@ def trace_prerun(*args, **kwargs):

span.set_tag(SPAN_MEASURED_KEY)
attach_span(task, task_id, span)
if config.celery["distributed_tracing"]:
attach_span_context(task, task_id, span)


def trace_postrun(*args, **kwargs):
Expand Down Expand Up @@ -110,6 +115,13 @@ def trace_before_publish(*args, **kwargs):
if pin is None:
return

# The parent span may have closed (if it had an exception, so use distributed headers)
# If Task A calls Task B, and Task A excepts, then Task B may have no parent when apply is called.
# In this case, we don't use the attached span/tracer, we use the attached distributed context.
if config.celery["distributed_tracing"]:
request_headers = retrieve_span_context(task, task_id, is_publish=False)
trace_utils.activate_distributed_headers(pin.tracer, int_config=config.celery, request_headers=request_headers)

# apply some tags here because most of the data is not available
# in the task_after_publish signal
service = config.celery["producer_service_name"]
Expand Down Expand Up @@ -145,11 +157,18 @@ def trace_before_publish(*args, **kwargs):
trace_headers = {}
propagator.inject(span.context, trace_headers)

# put distributed trace headers where celery will propagate them
task_headers = kwargs.get("headers") or {}
task_headers.setdefault("headers", {})
task_headers["headers"].update(trace_headers)
kwargs["headers"] = task_headers
kwargs.setdefault("headers", {})

# This is a hack for other versions, such as https://github.com/celery/celery/issues/4875
# We put always inject it prior to 5.4 because backports have made it hard to track
# exact compatible versions. It should be safe to include for all versions.
kwargs["headers"].setdefault("headers", {})
kwargs["headers"]["headers"].update(trace_headers)

# Kwargs must be updated directly with headers since 4.5
if celery.version_info >= (4, 5, 0):
log.debug("Teague.bick - here")
kwargs["headers"].update(trace_headers)


def trace_after_publish(*args, **kwargs):
Expand Down
36 changes: 32 additions & 4 deletions ddtrace/contrib/internal/celery/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@

from ddtrace._trace.span import Span
from ddtrace.contrib.trace_utils import set_flattened_tags
from ddtrace.propagation.http import HTTPPropagator

from .constants import SPAN_KEY
from .constants import CTX_KEY
propagator = HTTPPropagator


TAG_KEYS = frozenset(
Expand Down Expand Up @@ -66,6 +69,31 @@ def set_tags_from_context(span: Span, context: Dict[str, Any]) -> None:
set_flattened_tags(span, context_tags)


def attach_span_context(task, task_id, span, is_publish=False):
trace_headers = {}
propagator.inject(span.context, trace_headers)

# put distributed trace headers where celery will propagate them
context_dict = getattr(task, CTX_KEY, None)
if context_dict is None:
context_dict = dict()
setattr(task, CTX_KEY, context_dict)

context_dict[(task_id, is_publish, "distributed_context")] = trace_headers


def retrieve_span_context(task, task_id, is_publish=False):
"""Helper to retrieve an active `Span` stored in a `Task`
instance
"""
context_dict = getattr(task, CTX_KEY, None)
if context_dict is None:
return
else:
# DEV: See note in `attach_span` for key info
return context_dict.get((task_id, is_publish, "distributed_context"))


def attach_span(task, task_id, span, is_publish=False):
"""Helper to propagate a `Span` for the given `Task` instance. This
function uses a `WeakValueDictionary` that stores a Datadog Span using
Expand All @@ -85,10 +113,10 @@ def attach_span(task, task_id, span, is_publish=False):
NOTE: We cannot test for this well yet, because we do not run a celery worker,
and cannot run `task.apply_async()`
"""
weak_dict = getattr(task, CTX_KEY, None)
weak_dict = getattr(task, SPAN_KEY, None)
if weak_dict is None:
weak_dict = WeakValueDictionary()
setattr(task, CTX_KEY, weak_dict)
setattr(task, SPAN_KEY, weak_dict)

weak_dict[(task_id, is_publish)] = span

Expand All @@ -97,7 +125,7 @@ def detach_span(task, task_id, is_publish=False):
"""Helper to remove a `Span` in a Celery task when it's propagated.
This function handles tasks where the `Span` is not attached.
"""
weak_dict = getattr(task, CTX_KEY, None)
weak_dict = getattr(task, SPAN_KEY, None)
if weak_dict is None:
return

Expand All @@ -112,7 +140,7 @@ def retrieve_span(task, task_id, is_publish=False):
"""Helper to retrieve an active `Span` stored in a `Task`
instance
"""
weak_dict = getattr(task, CTX_KEY, None)
weak_dict = getattr(task, SPAN_KEY, None)
if weak_dict is None:
return
else:
Expand Down

0 comments on commit c112d7b

Please sign in to comment.