diff --git a/ddtrace/contrib/internal/celery/constants.py b/ddtrace/contrib/internal/celery/constants.py index 39923a08bc7..3bf25235e88 100644 --- a/ddtrace/contrib/internal/celery/constants.py +++ b/ddtrace/contrib/internal/celery/constants.py @@ -2,7 +2,8 @@ # Celery Context key -CTX_KEY = "__dd_task_span" +SPAN_KEY = "__dd_task_span" +CTX_KEY = "__dd_task_context" # Span names PRODUCER_ROOT_SPAN = "celery.apply" diff --git a/ddtrace/contrib/internal/celery/signals.py b/ddtrace/contrib/internal/celery/signals.py index 6341bed9bbf..df3f8ad6864 100644 --- a/ddtrace/contrib/internal/celery/signals.py +++ b/ddtrace/contrib/internal/celery/signals.py @@ -1,3 +1,4 @@ +import celery from celery import registry from celery.utils import nodenames @@ -9,8 +10,10 @@ from ddtrace.contrib import trace_utils from ddtrace.contrib.internal.celery import constants as c from ddtrace.contrib.internal.celery.utils import attach_span +from ddtrace.contrib.internal.celery.utils import attach_span_context from ddtrace.contrib.internal.celery.utils import detach_span from ddtrace.contrib.internal.celery.utils import retrieve_span +from ddtrace.contrib.internal.celery.utils import retrieve_span_context from ddtrace.contrib.internal.celery.utils import retrieve_task_id from ddtrace.contrib.internal.celery.utils import set_tags_from_context from ddtrace.ext import SpanKind @@ -65,6 +68,8 @@ def trace_prerun(*args, **kwargs): span.set_tag(SPAN_MEASURED_KEY) attach_span(task, task_id, span) + if config.celery["distributed_tracing"]: + attach_span_context(task, task_id, span) def trace_postrun(*args, **kwargs): @@ -110,6 +115,13 @@ def trace_before_publish(*args, **kwargs): if pin is None: return + # The parent span may have closed (if it had an exception, so use distributed headers) + # If Task A calls Task B, and Task A excepts, then Task B may have no parent when apply is called. + # In this case, we don't use the attached span/tracer, we use the attached distributed context. + if config.celery["distributed_tracing"]: + request_headers = retrieve_span_context(task, task_id, is_publish=False) + trace_utils.activate_distributed_headers(pin.tracer, int_config=config.celery, request_headers=request_headers) + # apply some tags here because most of the data is not available # in the task_after_publish signal service = config.celery["producer_service_name"] @@ -145,11 +157,18 @@ def trace_before_publish(*args, **kwargs): trace_headers = {} propagator.inject(span.context, trace_headers) - # put distributed trace headers where celery will propagate them - task_headers = kwargs.get("headers") or {} - task_headers.setdefault("headers", {}) - task_headers["headers"].update(trace_headers) - kwargs["headers"] = task_headers + kwargs.setdefault("headers", {}) + + # This is a hack for other versions, such as https://github.com/celery/celery/issues/4875 + # We put always inject it prior to 5.4 because backports have made it hard to track + # exact compatible versions. It should be safe to include for all versions. + kwargs["headers"].setdefault("headers", {}) + kwargs["headers"]["headers"].update(trace_headers) + + # Kwargs must be updated directly with headers since 4.5 + if celery.version_info >= (4, 5, 0): + log.debug("Teague.bick - here") + kwargs["headers"].update(trace_headers) def trace_after_publish(*args, **kwargs): diff --git a/ddtrace/contrib/internal/celery/utils.py b/ddtrace/contrib/internal/celery/utils.py index e18f83ba0dc..0c1b001a36e 100644 --- a/ddtrace/contrib/internal/celery/utils.py +++ b/ddtrace/contrib/internal/celery/utils.py @@ -4,8 +4,11 @@ from ddtrace._trace.span import Span from ddtrace.contrib.trace_utils import set_flattened_tags +from ddtrace.propagation.http import HTTPPropagator +from .constants import SPAN_KEY from .constants import CTX_KEY +propagator = HTTPPropagator TAG_KEYS = frozenset( @@ -66,6 +69,31 @@ def set_tags_from_context(span: Span, context: Dict[str, Any]) -> None: set_flattened_tags(span, context_tags) +def attach_span_context(task, task_id, span, is_publish=False): + trace_headers = {} + propagator.inject(span.context, trace_headers) + + # put distributed trace headers where celery will propagate them + context_dict = getattr(task, CTX_KEY, None) + if context_dict is None: + context_dict = dict() + setattr(task, CTX_KEY, context_dict) + + context_dict[(task_id, is_publish, "distributed_context")] = trace_headers + + +def retrieve_span_context(task, task_id, is_publish=False): + """Helper to retrieve an active `Span` stored in a `Task` + instance + """ + context_dict = getattr(task, CTX_KEY, None) + if context_dict is None: + return + else: + # DEV: See note in `attach_span` for key info + return context_dict.get((task_id, is_publish, "distributed_context")) + + def attach_span(task, task_id, span, is_publish=False): """Helper to propagate a `Span` for the given `Task` instance. This function uses a `WeakValueDictionary` that stores a Datadog Span using @@ -85,10 +113,10 @@ def attach_span(task, task_id, span, is_publish=False): NOTE: We cannot test for this well yet, because we do not run a celery worker, and cannot run `task.apply_async()` """ - weak_dict = getattr(task, CTX_KEY, None) + weak_dict = getattr(task, SPAN_KEY, None) if weak_dict is None: weak_dict = WeakValueDictionary() - setattr(task, CTX_KEY, weak_dict) + setattr(task, SPAN_KEY, weak_dict) weak_dict[(task_id, is_publish)] = span @@ -97,7 +125,7 @@ def detach_span(task, task_id, is_publish=False): """Helper to remove a `Span` in a Celery task when it's propagated. This function handles tasks where the `Span` is not attached. """ - weak_dict = getattr(task, CTX_KEY, None) + weak_dict = getattr(task, SPAN_KEY, None) if weak_dict is None: return @@ -112,7 +140,7 @@ def retrieve_span(task, task_id, is_publish=False): """Helper to retrieve an active `Span` stored in a `Task` instance """ - weak_dict = getattr(task, CTX_KEY, None) + weak_dict = getattr(task, SPAN_KEY, None) if weak_dict is None: return else: