Skip to content

Commit

Permalink
Add distributed_tracing argument to configure and new docs about …
Browse files Browse the repository at this point in the history
…context propagation (#773)

Co-authored-by: David Montague <35119617+dmontagu@users.noreply.github.com>
  • Loading branch information
alexmojaki and dmontagu authored Jan 6, 2025
1 parent 8284b3f commit 1fb4694
Showing 7 changed files with 454 additions and 7 deletions.
81 changes: 81 additions & 0 deletions docs/how-to-guides/distributed-tracing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
**Logfire** builds on OpenTelemetry, which keeps track of *context* to determine the parent trace/span of a new span/log and whether it should be included by sampling. *Context propagation* is when this context is serialized and sent to another process, so that tracing can be distributed across services while allowing the full tree of spans to be cleanly reconstructed and viewed together.

## Manual Context Propagation

**Logfire** provides a thin wrapper around the OpenTelemetry context propagation API to make manual distributed tracing easier. You shouldn't usually need to do this yourself, but it demonstrates the concept nicely. Here's an example:

```python
from logfire.propagate import attach_context, get_context
import logfire

logfire.configure()

with logfire.span('parent'):
ctx = get_context()

print(ctx)

# Attach the context in another execution environment
with attach_context(ctx):
logfire.info('child') # This log will be a child of the parent span.
```

`ctx` will look something like this:

```python
{'traceparent': '00-d1b9e555b056907ee20b0daebf62282c-7dcd821387246e1c-01'}
```

This contains 4 fields:

- `00` is a version number which you can ignore.
- `d1b9e555b056907ee20b0daebf62282c` is the `trace_id`.
- `7dcd821387246e1c` is the `span_id` of the parent span, i.e. the `parent_span_id` of the child log.
- `01` is the `trace_flags` field and indicates that the trace should be included by sampling.

See the [API reference](../reference/api/propagate.md) for more details about these functions.

## Integrations

OpenTelemetry instrumentation libraries (which **Logfire** uses for its integrations) handle context propagation automatically, even across different programming languages. For example:

- Instrumented HTTP clients such as [`requests`](../integrations/http-clients/requests.md) and [`httpx`](../integrations/http-clients/httpx.md) will automatically set the `traceparent` header when making requests.
- Instrumented web servers such as [`flask`](../integrations/web-frameworks/flask.md) and [`fastapi`](../integrations/web-frameworks/fastapi.md) will automatically extract the `traceparent` header and use it to set the context for server spans.
- The [`celery` integration](../integrations/event-streams/celery.md) will automatically propagate the context to child tasks.

## Thread and Pool executors

**Logfire** automatically patches [`ThreadPoolExecutor`][concurrent.futures.ThreadPoolExecutor] and [`ProcessPoolExecutor`][concurrent.futures.ProcessPoolExecutor] to propagate context to child threads and processes. This means that logs and spans created in child threads and processes will be correctly associated with the parent span. Here's an example to demonstrate:

```python
import logfire
from concurrent.futures import ThreadPoolExecutor

logfire.configure()


@logfire.instrument("Doubling {x}")
def double(x: int):
return x * 2


with logfire.span("Doubling everything") as span:
executor = ThreadPoolExecutor()
results = list(executor.map(double, range(3)))
span.set_attribute("results", results)
```

## Unintentional Distributed Tracing

Because instrumented web servers automatically extract the `traceparent` header by default, your spans can accidentally pick up the wrong context from an externally instrumented client, or from your cloud provider such as Google Cloud Run. This can lead to:

- Spans missing their parent.
- Spans being mysteriously grouped together.
- Spans missing entirely because the original trace was excluded by sampling.

By default, **Logfire** warns you when trace context is extracted, e.g. when server instrumentation finds a `traceparent` header. You can deal with this by setting the [`distributed_tracing` argument of `logfire.configure()`][logfire.configure(distributed_tracing)] or by setting the `LOGFIRE_DISTRIBUTED_TRACING` environment variable:

- Setting to `False` will prevent trace context from being extracted. This is recommended for web services exposed to the public internet. You can still attach/inject context to propagate to other services and create distributed traces with the web service as the root.
- Setting to `True` implies that the context propagation is intentional and will silence the warning.

The `distributed_tracing` configuration (including the warning by default) only applies when the raw OpenTelemetry API is used to extract context, as this is typically done by third-party libraries. By default, [`logfire.propagate.attach_context`][logfire.propagate.attach_context] assumes that context propagation is intended by the application. If you are writing a library, use `attach_context(context, third_party=True)` to respect the `distributed_tracing` configuration.
30 changes: 30 additions & 0 deletions logfire/_internal/config.py
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.metrics import NoOpMeterProvider, set_meter_provider
from opentelemetry.propagate import get_global_textmap, set_global_textmap
from opentelemetry.sdk.environment_variables import (
OTEL_BSP_SCHEDULE_DELAY,
OTEL_EXPORTER_OTLP_ENDPOINT,
@@ -57,6 +58,7 @@
from logfire.sampling._tail_sampling import TailSamplingProcessor
from logfire.version import VERSION

from ..propagate import NoExtractTraceContextPropagator, WarnOnExtractTraceContextPropagator
from .auth import DEFAULT_FILE, DefaultFile, is_logged_in
from .config_params import ParamManager, PydanticPluginRecordValues
from .constants import (
@@ -236,6 +238,7 @@ def configure( # noqa: D417
inspect_arguments: bool | None = None,
sampling: SamplingOptions | None = None,
code_source: CodeSource | None = None,
distributed_tracing: bool | None = None,
advanced: AdvancedOptions | None = None,
**deprecated_kwargs: Unpack[DeprecatedKwargs],
) -> Logfire:
@@ -287,6 +290,12 @@ def configure( # noqa: D417
sampling: Sampling options. See the [sampling guide](https://logfire.pydantic.dev/docs/guides/advanced/sampling/).
code_source: Settings for the source code of the project.
distributed_tracing: By default, incoming trace context is extracted, but generates a warning.
Set to `True` to disable the warning.
Set to `False` to suppress extraction of incoming trace context.
See [Unintentional Distributed Tracing](https://logfire.pydantic.dev/docs/how-to-guides/distributed-tracing/#unintentional-distributed-tracing)
for more information.
This setting always applies globally, and the last value set is used, including the default value.
advanced: Advanced options primarily used for testing by Logfire developers.
"""
from .. import DEFAULT_LOGFIRE_INSTANCE, Logfire
@@ -417,6 +426,7 @@ def configure( # noqa: D417
inspect_arguments=inspect_arguments,
sampling=sampling,
code_source=code_source,
distributed_tracing=distributed_tracing,
advanced=advanced,
)

@@ -481,6 +491,9 @@ class _LogfireConfigData:
code_source: CodeSource | None
"""Settings for the source code of the project."""

distributed_tracing: bool | None
"""Whether to extract incoming trace context."""

advanced: AdvancedOptions
"""Advanced options primarily used for testing by Logfire developers."""

@@ -503,6 +516,7 @@ def _load_configuration(
inspect_arguments: bool | None,
sampling: SamplingOptions | None,
code_source: CodeSource | None,
distributed_tracing: bool | None,
advanced: AdvancedOptions | None,
) -> None:
"""Merge the given parameters with the environment variables file configurations."""
@@ -515,6 +529,7 @@ def _load_configuration(
self.environment = param_manager.load_param('environment', environment)
self.data_dir = param_manager.load_param('data_dir', data_dir)
self.inspect_arguments = param_manager.load_param('inspect_arguments', inspect_arguments)
self.distributed_tracing = param_manager.load_param('distributed_tracing', distributed_tracing)
self.ignore_no_config = param_manager.load_param('ignore_no_config')
if self.inspect_arguments and sys.version_info[:2] <= (3, 8):
raise LogfireConfigError(
@@ -605,6 +620,7 @@ def __init__(
inspect_arguments: bool | None = None,
sampling: SamplingOptions | None = None,
code_source: CodeSource | None = None,
distributed_tracing: bool | None = None,
advanced: AdvancedOptions | None = None,
) -> None:
"""Create a new LogfireConfig.
@@ -630,6 +646,7 @@ def __init__(
inspect_arguments=inspect_arguments,
sampling=sampling,
code_source=code_source,
distributed_tracing=distributed_tracing,
advanced=advanced,
)
# initialize with no-ops so that we don't impact OTEL's global config just because logfire is installed
@@ -659,6 +676,7 @@ def configure(
inspect_arguments: bool | None,
sampling: SamplingOptions | None,
code_source: CodeSource | None,
distributed_tracing: bool | None,
advanced: AdvancedOptions | None,
) -> None:
with self._lock:
@@ -678,6 +696,7 @@ def configure(
inspect_arguments,
sampling,
code_source,
distributed_tracing,
advanced,
)
self.initialize()
@@ -947,6 +966,17 @@ def _exit_open_spans(): # type: ignore[reportUnusedFunction] # pragma: no cove
# set up context propagation for ThreadPoolExecutor and ProcessPoolExecutor
instrument_executors()

current_textmap = get_global_textmap()
while isinstance(current_textmap, (WarnOnExtractTraceContextPropagator, NoExtractTraceContextPropagator)):
current_textmap = current_textmap.wrapped
if self.distributed_tracing is None:
new_textmap = WarnOnExtractTraceContextPropagator(current_textmap)
elif self.distributed_tracing:
new_textmap = current_textmap
else:
new_textmap = NoExtractTraceContextPropagator(current_textmap)
set_global_textmap(new_textmap)

self._ensure_flush_after_aws_lambda()

def force_flush(self, timeout_millis: int = 30_000) -> bool:
3 changes: 3 additions & 0 deletions logfire/_internal/config_params.py
Original file line number Diff line number Diff line change
@@ -98,6 +98,8 @@ class _DefaultCallback:
"""Whether to show a warning message if logfire if used without calling logfire.configure()"""
BASE_URL = ConfigParam(env_vars=['LOGFIRE_BASE_URL'], allow_file_config=True, default=LOGFIRE_BASE_URL)
"""The base URL of the Logfire backend. Primarily for testing purposes."""
DISTRIBUTED_TRACING = ConfigParam(env_vars=['LOGFIRE_DISTRIBUTED_TRACING'], allow_file_config=True, default=None, tp=bool)
"""Whether to extract incoming trace context. By default, will extract but warn about it."""
# fmt: on

CONFIG_PARAMS = {
@@ -121,6 +123,7 @@ class _DefaultCallback:
'pydantic_plugin_exclude': PYDANTIC_PLUGIN_EXCLUDE,
'inspect_arguments': INSPECT_ARGUMENTS,
'ignore_no_config': IGNORE_NO_CONFIG,
'distributed_tracing': DISTRIBUTED_TRACING,
}


93 changes: 87 additions & 6 deletions logfire/propagate.py
Original file line number Diff line number Diff line change
@@ -10,10 +10,17 @@
[httpx](https://pypi.org/project/opentelemetry-instrumentation-httpx/).
""" # noqa: D205

from __future__ import annotations

from contextlib import contextmanager
from dataclasses import dataclass
from typing import Any, Iterator, Mapping

from opentelemetry import context, propagate
from opentelemetry import context as otel_context, propagate, trace
from opentelemetry.propagators.textmap import TextMapPropagator

import logfire
from logfire._internal.stack_info import warn_at_user_stacklevel

# anything that can be used to carry context, e.g. Headers or a dict
ContextCarrier = Mapping[str, Any]
@@ -57,16 +64,90 @@ def get_context() -> ContextCarrier:


@contextmanager
def attach_context(carrier: ContextCarrier) -> Iterator[None]:
def attach_context(carrier: ContextCarrier, *, third_party: bool = False) -> Iterator[None]:
"""Attach a context as generated by [`get_context`][logfire.propagate.get_context] to the current execution context.
Since `attach_context` is a context manager, it restores the previous context when exiting.
Set `third_party` to `True` if using this inside a library intended to be used by others.
This will respect the [`distributed_tracing` argument of `logfire.configure()`][logfire.configure(distributed_tracing)],
so users will be warned about unintentional distributed tracing by default and they can suppress it.
See [Unintentional Distributed Tracing](https://logfire.pydantic.dev/docs/how-to-guides/distributed-tracing/#unintentional-distributed-tracing) for more information.
"""
# capture the current context to restore it later
old_context = context.get_current()
new_context = propagate.extract(carrier=carrier)
old_context = otel_context.get_current()
propagator = propagate.get_global_textmap()
if not third_party:
while isinstance(propagator, (WarnOnExtractTraceContextPropagator, NoExtractTraceContextPropagator)):
propagator = propagator.wrapped
new_context = propagator.extract(carrier=carrier)
try:
context.attach(new_context)
otel_context.attach(new_context)
yield
finally:
context.attach(old_context)
otel_context.attach(old_context)


@dataclass
class WrapperPropagator(TextMapPropagator):
"""Helper base class to wrap another propagator."""

wrapped: TextMapPropagator

def extract(self, *args: Any, **kwargs: Any) -> otel_context.Context:
return self.wrapped.extract(*args, **kwargs)

def inject(self, *args: Any, **kwargs: Any):
return self.wrapped.inject(*args, **kwargs)

@property
def fields(self):
return self.wrapped.fields


class NoExtractTraceContextPropagator(WrapperPropagator):
"""A propagator that ignores any trace context that was extracted by the wrapped propagator.
Used when `logfire.configure(distributed_tracing=False)` is called.
"""

def extract(
self,
carrier: Any,
context: otel_context.Context | None = None,
*args: Any,
**kwargs: Any,
) -> otel_context.Context:
result = super().extract(carrier, context, *args, **kwargs)
if result == context: # pragma: no cover
# Optimization: nothing was extracted by the wrapped propagator
return result
return trace.set_span_in_context(trace.get_current_span(context), result)


@dataclass
class WarnOnExtractTraceContextPropagator(WrapperPropagator):
"""A propagator that warns the first time that trace context is extracted by the wrapped propagator.
Used when `logfire.configure(distributed_tracing=None)` is called. This is the default behavior.
"""

warned: bool = False

def extract(
self,
carrier: Any,
context: otel_context.Context | None = None,
*args: Any,
**kwargs: Any,
) -> otel_context.Context:
result = super().extract(carrier, context, *args, **kwargs)
if not self.warned and result != context and trace.get_current_span(context) != trace.get_current_span(result):
self.warned = True
message = (
'Found propagated trace context. See '
'https://logfire.pydantic.dev/docs/how-to-guides/distributed-tracing/#unintentional-distributed-tracing.'
)
warn_at_user_stacklevel(message, RuntimeWarning)
logfire.warn(message)
return result
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
@@ -88,6 +88,7 @@ nav:
- Implement Sampling Strategies: how-to-guides/sampling.md
- Export your Logfire Data: how-to-guides/query-api.md
- Scrub Sensitive Data: how-to-guides/scrubbing.md
- Trace across Multiple Services: how-to-guides/distributed-tracing.md
- Detect Service is Down: how-to-guides/detect-service-is-down.md
- Suppress Spans and Metrics: how-to-guides/suppress.md
- Integrations:
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -74,6 +74,7 @@ def config_kwargs(
# Ensure that inspect_arguments doesn't break things in most versions
# (it's off by default for <3.11) but it's completely forbidden for 3.8.
inspect_arguments=sys.version_info[:2] >= (3, 9),
distributed_tracing=True,
)


Loading

0 comments on commit 1fb4694

Please sign in to comment.