Skip to content

Commit

Permalink
Rearrange span processors to avoid repeating scrubbing and other twea…
Browse files Browse the repository at this point in the history
…king (#658)
  • Loading branch information
alexmojaki authored Dec 18, 2024
1 parent 7e6b140 commit 9e629da
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 119 deletions.
32 changes: 20 additions & 12 deletions logfire/_internal/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from opentelemetry.sdk.metrics.export import AggregationTemporality, MetricReader, PeriodicExportingMetricReader
from opentelemetry.sdk.metrics.view import ExponentialBucketHistogramAggregation, View
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import SpanProcessor, TracerProvider as SDKTracerProvider
from opentelemetry.sdk.trace import SpanProcessor, SynchronousMultiSpanProcessor, TracerProvider as SDKTracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor
from opentelemetry.sdk.trace.id_generator import IdGenerator
from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio, Sampler
Expand Down Expand Up @@ -79,7 +79,7 @@
from .exporters.fallback import FallbackSpanExporter
from .exporters.file import FileSpanExporter
from .exporters.otlp import OTLPExporterHttpSession, RetryFewerSpansSpanExporter
from .exporters.processor_wrapper import MainSpanProcessorWrapper
from .exporters.processor_wrapper import CheckSuppressInstrumentationProcessorWrapper, MainSpanProcessorWrapper
from .exporters.quiet_metrics import QuietMetricExporter
from .exporters.remove_pending import RemovePendingSpansExporter
from .exporters.test import TestExporter
Expand Down Expand Up @@ -764,20 +764,22 @@ def _initialize(self) -> None:
self._tracer_provider.set_provider(tracer_provider) # do we need to shut down the existing one???

processors_with_pending_spans: list[SpanProcessor] = []
root_processor = main_multiprocessor = SynchronousMultiSpanProcessor()
if self.sampling.tail:
root_processor = TailSamplingProcessor(root_processor, self.sampling.tail)
tracer_provider.add_span_processor(
CheckSuppressInstrumentationProcessorWrapper(
MainSpanProcessorWrapper(root_processor, self.scrubber),
)
)

def add_span_processor(span_processor: SpanProcessor) -> None:
# Some span processors added to the tracer provider should also be recorded in
# `processors_with_pending_spans` so that they can be used by the final pending span processor.
# This means that `tracer_provider.add_span_processor` should only appear in two places.
main_multiprocessor.add_span_processor(span_processor)

has_pending = isinstance(
getattr(span_processor, 'span_exporter', None),
(TestExporter, RemovePendingSpansExporter, SimpleConsoleSpanExporter),
)

if self.sampling.tail:
span_processor = TailSamplingProcessor(span_processor, self.sampling.tail)
span_processor = MainSpanProcessorWrapper(span_processor, self.scrubber)
tracer_provider.add_span_processor(span_processor)
if has_pending:
processors_with_pending_spans.append(span_processor)

Expand Down Expand Up @@ -877,8 +879,14 @@ def check_token():
]

if processors_with_pending_spans:
tracer_provider.add_span_processor(
PendingSpanProcessor(self.advanced.id_generator, tuple(processors_with_pending_spans))
pending_multiprocessor = SynchronousMultiSpanProcessor()
for processor in processors_with_pending_spans:
pending_multiprocessor.add_span_processor(processor)

main_multiprocessor.add_span_processor(
PendingSpanProcessor(
self.advanced.id_generator, MainSpanProcessorWrapper(pending_multiprocessor, self.scrubber)
)
)

otlp_endpoint = os.getenv(OTEL_EXPORTER_OTLP_ENDPOINT)
Expand Down
37 changes: 25 additions & 12 deletions logfire/_internal/exporters/processor_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

from dataclasses import dataclass
from urllib.parse import parse_qs, urlparse

from opentelemetry import context
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
from opentelemetry.sdk.trace import ReadableSpan, Span
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import Status, StatusCode
Expand Down Expand Up @@ -31,31 +32,44 @@
from .wrapper import WrapperSpanProcessor


class CheckSuppressInstrumentationProcessorWrapper(WrapperSpanProcessor):
"""Checks if instrumentation is suppressed, then suppresses instrumentation itself.
Placed at the root of the tree of processors.
"""

def on_start(self, span: Span, parent_context: context.Context | None = None) -> None:
if is_instrumentation_suppressed():
return
with logfire.suppress_instrumentation():
super().on_start(span, parent_context)

def on_end(self, span: ReadableSpan) -> None:
if is_instrumentation_suppressed():
return
with logfire.suppress_instrumentation():
super().on_end(span)


@dataclass
class MainSpanProcessorWrapper(WrapperSpanProcessor):
"""Wrapper around other processors to intercept starting and ending spans with our own global logic.
Suppresses starting/ending if the current context has a `suppress_instrumentation` value.
Tweaks the send/receive span names generated by the ASGI middleware.
"""

def __init__(self, processor: SpanProcessor, scrubber: BaseScrubber) -> None:
super().__init__(processor)
self.scrubber = scrubber
scrubber: BaseScrubber

def on_start(
self,
span: Span,
parent_context: context.Context | None = None,
) -> None:
if is_instrumentation_suppressed():
return
_set_log_level_on_asgi_send_receive_spans(span)
with logfire.suppress_instrumentation():
super().on_start(span, parent_context)
super().on_start(span, parent_context)

def on_end(self, span: ReadableSpan) -> None:
if is_instrumentation_suppressed():
return
span_dict = span_to_dict(span)
_tweak_asgi_send_receive_spans(span_dict)
_tweak_sqlalchemy_connect_spans(span_dict)
Expand All @@ -64,8 +78,7 @@ def on_end(self, span: ReadableSpan) -> None:
_set_error_level_and_status(span_dict)
self.scrubber.scrub_span(span_dict)
span = ReadableSpan(**span_dict)
with logfire.suppress_instrumentation():
super().on_end(span)
super().on_end(span)


def _set_error_level_and_status(span: ReadableSpanDict) -> None:
Expand Down
5 changes: 3 additions & 2 deletions logfire/_internal/exporters/wrapper.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Sequence

from opentelemetry import context
Expand Down Expand Up @@ -49,11 +50,11 @@ def shutdown(self, timeout_millis: float = 30_000, **kwargs: Any) -> None:
self.wrapped_exporter.shutdown(timeout_millis, **kwargs) # type: ignore


@dataclass
class WrapperSpanProcessor(SpanProcessor):
"""A base class for SpanProcessors that wrap another processor."""

def __init__(self, processor: SpanProcessor) -> None:
self.processor = processor
processor: SpanProcessor

def on_start(self, span: Span, parent_context: context.Context | None = None) -> None:
self.processor.on_start(span, parent_context)
Expand Down
12 changes: 7 additions & 5 deletions logfire/_internal/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,14 @@ def start_span(self, name: str, context: Context | None = None, *args: Any, **kw
class PendingSpanProcessor(SpanProcessor):
"""Span processor that emits an extra pending span for each span as it starts.
The pending span is emitted by calling `on_end` on all other processors.
The pending span is emitted by calling `on_end` on the inner `processor`.
This is intentionally not a `WrapperSpanProcessor` to avoid the default implementations of `on_end`
and `shutdown`. This processor is expected to contain processors which are already included
elsewhere in the pipeline where `on_end` and `shutdown` are called normally.
"""

id_generator: IdGenerator
other_processors: tuple[SpanProcessor, ...]
processor: SpanProcessor

def on_start(
self,
Expand All @@ -250,7 +253,7 @@ def on_start(
) -> None:
assert isinstance(span, ReadableSpan) and isinstance(span, Span)
if not span.is_recording(): # pragma: no cover
# Span was sampled out
# Span was sampled out, or has finished already (happens with tail sampling)
return

attributes = span.attributes
Expand Down Expand Up @@ -295,8 +298,7 @@ def on_start(
end_time=start_and_end_time,
instrumentation_scope=span.instrumentation_scope,
)
for processor in self.other_processors:
processor.on_end(pending_span)
self.processor.on_end(pending_span)


def should_sample(span_context: SpanContext, attributes: Mapping[str, otel_types.AttributeValue]) -> bool:
Expand Down
65 changes: 34 additions & 31 deletions tests/test_configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from opentelemetry.metrics import NoOpMeterProvider, get_meter_provider
from opentelemetry.sdk.metrics._internal.export import PeriodicExportingMetricReader
from opentelemetry.sdk.metrics.export import InMemoryMetricReader
from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor
from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor, SynchronousMultiSpanProcessor
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
ConsoleSpanExporter,
Expand All @@ -46,7 +46,10 @@
from logfire._internal.exporters.console import ShowParentsConsoleSpanExporter
from logfire._internal.exporters.fallback import FallbackSpanExporter
from logfire._internal.exporters.file import WritingFallbackWarning
from logfire._internal.exporters.processor_wrapper import MainSpanProcessorWrapper
from logfire._internal.exporters.processor_wrapper import (
CheckSuppressInstrumentationProcessorWrapper,
MainSpanProcessorWrapper,
)
from logfire._internal.exporters.quiet_metrics import QuietMetricExporter
from logfire._internal.exporters.remove_pending import RemovePendingSpansExporter
from logfire._internal.exporters.wrapper import WrapperSpanExporter
Expand Down Expand Up @@ -570,11 +573,9 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
)
wait_for_check_token_thread()

send_to_logfire_processor, *_ = get_span_processors()
batch_span_processor, *_ = get_span_processors()
# It's OK if these processor/exporter types change.
# We just need access to the FallbackSpanExporter either way to swap out its underlying exporter.
assert isinstance(send_to_logfire_processor, MainSpanProcessorWrapper)
batch_span_processor = send_to_logfire_processor.processor
assert isinstance(batch_span_processor, BatchSpanProcessor)
exporter = batch_span_processor.span_exporter
assert isinstance(exporter, WrapperSpanExporter)
Expand Down Expand Up @@ -622,9 +623,7 @@ def configure_tracking_exporter():
)
wait_for_check_token_thread()

send_to_logfire_processor, *_ = get_span_processors()
assert isinstance(send_to_logfire_processor, MainSpanProcessorWrapper)
batch_span_processor = send_to_logfire_processor.processor
batch_span_processor, *_ = get_span_processors()
assert isinstance(batch_span_processor, BatchSpanProcessor)

batch_span_processor.span_exporter = TrackingExporter()
Expand Down Expand Up @@ -1460,16 +1459,19 @@ def test_default_exporters(monkeypatch: pytest.MonkeyPatch):

[console_processor, send_to_logfire_processor, pending_span_processor] = get_span_processors()

assert isinstance(console_processor, MainSpanProcessorWrapper)
assert isinstance(console_processor.processor, SimpleSpanProcessor)
assert isinstance(console_processor.processor.span_exporter, ShowParentsConsoleSpanExporter)
assert isinstance(console_processor, SimpleSpanProcessor)
assert isinstance(console_processor.span_exporter, ShowParentsConsoleSpanExporter)

assert isinstance(send_to_logfire_processor, MainSpanProcessorWrapper)
assert isinstance(send_to_logfire_processor.processor, BatchSpanProcessor)
assert isinstance(send_to_logfire_processor.processor.span_exporter, RemovePendingSpansExporter)
assert isinstance(send_to_logfire_processor, BatchSpanProcessor)
assert isinstance(send_to_logfire_processor.span_exporter, RemovePendingSpansExporter)

assert isinstance(pending_span_processor, PendingSpanProcessor)
assert pending_span_processor.other_processors == (console_processor, send_to_logfire_processor)
assert isinstance(pending_span_processor.processor, MainSpanProcessorWrapper)
assert isinstance(pending_span_processor.processor.processor, SynchronousMultiSpanProcessor)
assert pending_span_processor.processor.processor._span_processors == ( # type: ignore
console_processor,
send_to_logfire_processor,
)

[logfire_metric_reader] = get_metric_readers()
assert isinstance(logfire_metric_reader, PeriodicExportingMetricReader)
Expand All @@ -1487,9 +1489,8 @@ def test_custom_exporters():
metrics=logfire.MetricsOptions(additional_readers=[custom_metric_reader]),
)

[custom_processor_wrapper] = get_span_processors()
assert isinstance(custom_processor_wrapper, MainSpanProcessorWrapper)
assert custom_processor_wrapper.processor is custom_span_processor
[custom_span_processor2] = get_span_processors()
assert custom_span_processor2 is custom_span_processor

[custom_metric_reader2] = get_metric_readers()
assert custom_metric_reader2 is custom_metric_reader
Expand All @@ -1501,10 +1502,9 @@ def test_otel_exporter_otlp_endpoint_env_var():
logfire.configure(send_to_logfire=False, console=False)

[otel_processor] = get_span_processors()
assert isinstance(otel_processor, MainSpanProcessorWrapper)
assert isinstance(otel_processor.processor, BatchSpanProcessor)
assert isinstance(otel_processor.processor.span_exporter, OTLPSpanExporter)
assert otel_processor.processor.span_exporter._endpoint == 'otel_endpoint/v1/traces' # type: ignore
assert isinstance(otel_processor, BatchSpanProcessor)
assert isinstance(otel_processor.span_exporter, OTLPSpanExporter)
assert otel_processor.span_exporter._endpoint == 'otel_endpoint/v1/traces' # type: ignore

[otel_metric_reader] = get_metric_readers()
assert isinstance(otel_metric_reader, PeriodicExportingMetricReader)
Expand All @@ -1531,10 +1531,9 @@ def test_otel_metrics_exporter_env_var():
logfire.configure(send_to_logfire=False, console=False)

[otel_processor] = get_span_processors()
assert isinstance(otel_processor, MainSpanProcessorWrapper)
assert isinstance(otel_processor.processor, BatchSpanProcessor)
assert isinstance(otel_processor.processor.span_exporter, OTLPSpanExporter)
assert otel_processor.processor.span_exporter._endpoint == 'otel_endpoint3/v1/traces' # type: ignore
assert isinstance(otel_processor, BatchSpanProcessor)
assert isinstance(otel_processor.span_exporter, OTLPSpanExporter)
assert otel_processor.span_exporter._endpoint == 'otel_endpoint3/v1/traces' # type: ignore

assert len(list(get_metric_readers())) == 0

Expand All @@ -1545,10 +1544,9 @@ def test_otel_exporter_otlp_traces_endpoint_env_var():
logfire.configure(send_to_logfire=False, console=False)

[otel_processor] = get_span_processors()
assert isinstance(otel_processor, MainSpanProcessorWrapper)
assert isinstance(otel_processor.processor, BatchSpanProcessor)
assert isinstance(otel_processor.processor.span_exporter, OTLPSpanExporter)
assert otel_processor.processor.span_exporter._endpoint == 'otel_traces_endpoint' # type: ignore
assert isinstance(otel_processor, BatchSpanProcessor)
assert isinstance(otel_processor.span_exporter, OTLPSpanExporter)
assert otel_processor.span_exporter._endpoint == 'otel_traces_endpoint' # type: ignore

assert len(list(get_metric_readers())) == 0

Expand Down Expand Up @@ -1576,7 +1574,12 @@ def test_metrics_false(monkeypatch: pytest.MonkeyPatch):


def get_span_processors() -> Iterable[SpanProcessor]:
return get_tracer_provider().provider._active_span_processor._span_processors # type: ignore
[first, *rest] = get_tracer_provider().provider._active_span_processor._span_processors # type: ignore
assert isinstance(first, CheckSuppressInstrumentationProcessorWrapper)
assert isinstance(first.processor, MainSpanProcessorWrapper)
assert isinstance(first.processor.processor, SynchronousMultiSpanProcessor)

return [*first.processor.processor._span_processors, *rest] # type: ignore


def get_metric_readers() -> Iterable[SpanProcessor]:
Expand Down
Loading

0 comments on commit 9e629da

Please sign in to comment.