From 9e629dacfcebbc8c6ac1378072439783c59d8467 Mon Sep 17 00:00:00 2001 From: Alex Hall Date: Wed, 18 Dec 2024 13:05:16 +0200 Subject: [PATCH] Rearrange span processors to avoid repeating scrubbing and other tweaking (#658) --- logfire/_internal/config.py | 32 ++-- .../_internal/exporters/processor_wrapper.py | 37 +++-- logfire/_internal/exporters/wrapper.py | 5 +- logfire/_internal/tracer.py | 12 +- tests/test_configure.py | 65 ++++---- tests/test_tail_sampling.py | 140 +++++++++++------- 6 files changed, 172 insertions(+), 119 deletions(-) diff --git a/logfire/_internal/config.py b/logfire/_internal/config.py index 753b8b70b..fbe70dd29 100644 --- a/logfire/_internal/config.py +++ b/logfire/_internal/config.py @@ -44,7 +44,7 @@ from opentelemetry.sdk.metrics.export import AggregationTemporality, MetricReader, PeriodicExportingMetricReader from opentelemetry.sdk.metrics.view import ExponentialBucketHistogramAggregation, View from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.trace import SpanProcessor, TracerProvider as SDKTracerProvider +from opentelemetry.sdk.trace import SpanProcessor, SynchronousMultiSpanProcessor, TracerProvider as SDKTracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor from opentelemetry.sdk.trace.id_generator import IdGenerator from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio, Sampler @@ -79,7 +79,7 @@ from .exporters.fallback import FallbackSpanExporter from .exporters.file import FileSpanExporter from .exporters.otlp import OTLPExporterHttpSession, RetryFewerSpansSpanExporter -from .exporters.processor_wrapper import MainSpanProcessorWrapper +from .exporters.processor_wrapper import CheckSuppressInstrumentationProcessorWrapper, MainSpanProcessorWrapper from .exporters.quiet_metrics import QuietMetricExporter from .exporters.remove_pending import RemovePendingSpansExporter from .exporters.test import TestExporter @@ -764,20 +764,22 @@ def _initialize(self) -> None: self._tracer_provider.set_provider(tracer_provider) # do we need to shut down the existing one??? processors_with_pending_spans: list[SpanProcessor] = [] + root_processor = main_multiprocessor = SynchronousMultiSpanProcessor() + if self.sampling.tail: + root_processor = TailSamplingProcessor(root_processor, self.sampling.tail) + tracer_provider.add_span_processor( + CheckSuppressInstrumentationProcessorWrapper( + MainSpanProcessorWrapper(root_processor, self.scrubber), + ) + ) def add_span_processor(span_processor: SpanProcessor) -> None: - # Some span processors added to the tracer provider should also be recorded in - # `processors_with_pending_spans` so that they can be used by the final pending span processor. - # This means that `tracer_provider.add_span_processor` should only appear in two places. + main_multiprocessor.add_span_processor(span_processor) + has_pending = isinstance( getattr(span_processor, 'span_exporter', None), (TestExporter, RemovePendingSpansExporter, SimpleConsoleSpanExporter), ) - - if self.sampling.tail: - span_processor = TailSamplingProcessor(span_processor, self.sampling.tail) - span_processor = MainSpanProcessorWrapper(span_processor, self.scrubber) - tracer_provider.add_span_processor(span_processor) if has_pending: processors_with_pending_spans.append(span_processor) @@ -877,8 +879,14 @@ def check_token(): ] if processors_with_pending_spans: - tracer_provider.add_span_processor( - PendingSpanProcessor(self.advanced.id_generator, tuple(processors_with_pending_spans)) + pending_multiprocessor = SynchronousMultiSpanProcessor() + for processor in processors_with_pending_spans: + pending_multiprocessor.add_span_processor(processor) + + main_multiprocessor.add_span_processor( + PendingSpanProcessor( + self.advanced.id_generator, MainSpanProcessorWrapper(pending_multiprocessor, self.scrubber) + ) ) otlp_endpoint = os.getenv(OTEL_EXPORTER_OTLP_ENDPOINT) diff --git a/logfire/_internal/exporters/processor_wrapper.py b/logfire/_internal/exporters/processor_wrapper.py index 2d77e7fe3..2c0fe4160 100644 --- a/logfire/_internal/exporters/processor_wrapper.py +++ b/logfire/_internal/exporters/processor_wrapper.py @@ -1,9 +1,10 @@ from __future__ import annotations +from dataclasses import dataclass from urllib.parse import parse_qs, urlparse from opentelemetry import context -from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor +from opentelemetry.sdk.trace import ReadableSpan, Span from opentelemetry.sdk.util.instrumentation import InstrumentationScope from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.trace import Status, StatusCode @@ -31,6 +32,26 @@ from .wrapper import WrapperSpanProcessor +class CheckSuppressInstrumentationProcessorWrapper(WrapperSpanProcessor): + """Checks if instrumentation is suppressed, then suppresses instrumentation itself. + + Placed at the root of the tree of processors. + """ + + def on_start(self, span: Span, parent_context: context.Context | None = None) -> None: + if is_instrumentation_suppressed(): + return + with logfire.suppress_instrumentation(): + super().on_start(span, parent_context) + + def on_end(self, span: ReadableSpan) -> None: + if is_instrumentation_suppressed(): + return + with logfire.suppress_instrumentation(): + super().on_end(span) + + +@dataclass class MainSpanProcessorWrapper(WrapperSpanProcessor): """Wrapper around other processors to intercept starting and ending spans with our own global logic. @@ -38,24 +59,17 @@ class MainSpanProcessorWrapper(WrapperSpanProcessor): Tweaks the send/receive span names generated by the ASGI middleware. """ - def __init__(self, processor: SpanProcessor, scrubber: BaseScrubber) -> None: - super().__init__(processor) - self.scrubber = scrubber + scrubber: BaseScrubber def on_start( self, span: Span, parent_context: context.Context | None = None, ) -> None: - if is_instrumentation_suppressed(): - return _set_log_level_on_asgi_send_receive_spans(span) - with logfire.suppress_instrumentation(): - super().on_start(span, parent_context) + super().on_start(span, parent_context) def on_end(self, span: ReadableSpan) -> None: - if is_instrumentation_suppressed(): - return span_dict = span_to_dict(span) _tweak_asgi_send_receive_spans(span_dict) _tweak_sqlalchemy_connect_spans(span_dict) @@ -64,8 +78,7 @@ def on_end(self, span: ReadableSpan) -> None: _set_error_level_and_status(span_dict) self.scrubber.scrub_span(span_dict) span = ReadableSpan(**span_dict) - with logfire.suppress_instrumentation(): - super().on_end(span) + super().on_end(span) def _set_error_level_and_status(span: ReadableSpanDict) -> None: diff --git a/logfire/_internal/exporters/wrapper.py b/logfire/_internal/exporters/wrapper.py index 0f95cece6..83cf31a4f 100644 --- a/logfire/_internal/exporters/wrapper.py +++ b/logfire/_internal/exporters/wrapper.py @@ -1,5 +1,6 @@ from __future__ import annotations +from dataclasses import dataclass from typing import Any, Sequence from opentelemetry import context @@ -49,11 +50,11 @@ def shutdown(self, timeout_millis: float = 30_000, **kwargs: Any) -> None: self.wrapped_exporter.shutdown(timeout_millis, **kwargs) # type: ignore +@dataclass class WrapperSpanProcessor(SpanProcessor): """A base class for SpanProcessors that wrap another processor.""" - def __init__(self, processor: SpanProcessor) -> None: - self.processor = processor + processor: SpanProcessor def on_start(self, span: Span, parent_context: context.Context | None = None) -> None: self.processor.on_start(span, parent_context) diff --git a/logfire/_internal/tracer.py b/logfire/_internal/tracer.py index 5d86c66a4..d4b86979f 100644 --- a/logfire/_internal/tracer.py +++ b/logfire/_internal/tracer.py @@ -237,11 +237,14 @@ def start_span(self, name: str, context: Context | None = None, *args: Any, **kw class PendingSpanProcessor(SpanProcessor): """Span processor that emits an extra pending span for each span as it starts. - The pending span is emitted by calling `on_end` on all other processors. + The pending span is emitted by calling `on_end` on the inner `processor`. + This is intentionally not a `WrapperSpanProcessor` to avoid the default implementations of `on_end` + and `shutdown`. This processor is expected to contain processors which are already included + elsewhere in the pipeline where `on_end` and `shutdown` are called normally. """ id_generator: IdGenerator - other_processors: tuple[SpanProcessor, ...] + processor: SpanProcessor def on_start( self, @@ -250,7 +253,7 @@ def on_start( ) -> None: assert isinstance(span, ReadableSpan) and isinstance(span, Span) if not span.is_recording(): # pragma: no cover - # Span was sampled out + # Span was sampled out, or has finished already (happens with tail sampling) return attributes = span.attributes @@ -295,8 +298,7 @@ def on_start( end_time=start_and_end_time, instrumentation_scope=span.instrumentation_scope, ) - for processor in self.other_processors: - processor.on_end(pending_span) + self.processor.on_end(pending_span) def should_sample(span_context: SpanContext, attributes: Mapping[str, otel_types.AttributeValue]) -> bool: diff --git a/tests/test_configure.py b/tests/test_configure.py index 40e6276a2..ae105afdb 100644 --- a/tests/test_configure.py +++ b/tests/test_configure.py @@ -21,7 +21,7 @@ from opentelemetry.metrics import NoOpMeterProvider, get_meter_provider from opentelemetry.sdk.metrics._internal.export import PeriodicExportingMetricReader from opentelemetry.sdk.metrics.export import InMemoryMetricReader -from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor +from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor, SynchronousMultiSpanProcessor from opentelemetry.sdk.trace.export import ( BatchSpanProcessor, ConsoleSpanExporter, @@ -46,7 +46,10 @@ from logfire._internal.exporters.console import ShowParentsConsoleSpanExporter from logfire._internal.exporters.fallback import FallbackSpanExporter from logfire._internal.exporters.file import WritingFallbackWarning -from logfire._internal.exporters.processor_wrapper import MainSpanProcessorWrapper +from logfire._internal.exporters.processor_wrapper import ( + CheckSuppressInstrumentationProcessorWrapper, + MainSpanProcessorWrapper, +) from logfire._internal.exporters.quiet_metrics import QuietMetricExporter from logfire._internal.exporters.remove_pending import RemovePendingSpansExporter from logfire._internal.exporters.wrapper import WrapperSpanExporter @@ -570,11 +573,9 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: ) wait_for_check_token_thread() - send_to_logfire_processor, *_ = get_span_processors() + batch_span_processor, *_ = get_span_processors() # It's OK if these processor/exporter types change. # We just need access to the FallbackSpanExporter either way to swap out its underlying exporter. - assert isinstance(send_to_logfire_processor, MainSpanProcessorWrapper) - batch_span_processor = send_to_logfire_processor.processor assert isinstance(batch_span_processor, BatchSpanProcessor) exporter = batch_span_processor.span_exporter assert isinstance(exporter, WrapperSpanExporter) @@ -622,9 +623,7 @@ def configure_tracking_exporter(): ) wait_for_check_token_thread() - send_to_logfire_processor, *_ = get_span_processors() - assert isinstance(send_to_logfire_processor, MainSpanProcessorWrapper) - batch_span_processor = send_to_logfire_processor.processor + batch_span_processor, *_ = get_span_processors() assert isinstance(batch_span_processor, BatchSpanProcessor) batch_span_processor.span_exporter = TrackingExporter() @@ -1460,16 +1459,19 @@ def test_default_exporters(monkeypatch: pytest.MonkeyPatch): [console_processor, send_to_logfire_processor, pending_span_processor] = get_span_processors() - assert isinstance(console_processor, MainSpanProcessorWrapper) - assert isinstance(console_processor.processor, SimpleSpanProcessor) - assert isinstance(console_processor.processor.span_exporter, ShowParentsConsoleSpanExporter) + assert isinstance(console_processor, SimpleSpanProcessor) + assert isinstance(console_processor.span_exporter, ShowParentsConsoleSpanExporter) - assert isinstance(send_to_logfire_processor, MainSpanProcessorWrapper) - assert isinstance(send_to_logfire_processor.processor, BatchSpanProcessor) - assert isinstance(send_to_logfire_processor.processor.span_exporter, RemovePendingSpansExporter) + assert isinstance(send_to_logfire_processor, BatchSpanProcessor) + assert isinstance(send_to_logfire_processor.span_exporter, RemovePendingSpansExporter) assert isinstance(pending_span_processor, PendingSpanProcessor) - assert pending_span_processor.other_processors == (console_processor, send_to_logfire_processor) + assert isinstance(pending_span_processor.processor, MainSpanProcessorWrapper) + assert isinstance(pending_span_processor.processor.processor, SynchronousMultiSpanProcessor) + assert pending_span_processor.processor.processor._span_processors == ( # type: ignore + console_processor, + send_to_logfire_processor, + ) [logfire_metric_reader] = get_metric_readers() assert isinstance(logfire_metric_reader, PeriodicExportingMetricReader) @@ -1487,9 +1489,8 @@ def test_custom_exporters(): metrics=logfire.MetricsOptions(additional_readers=[custom_metric_reader]), ) - [custom_processor_wrapper] = get_span_processors() - assert isinstance(custom_processor_wrapper, MainSpanProcessorWrapper) - assert custom_processor_wrapper.processor is custom_span_processor + [custom_span_processor2] = get_span_processors() + assert custom_span_processor2 is custom_span_processor [custom_metric_reader2] = get_metric_readers() assert custom_metric_reader2 is custom_metric_reader @@ -1501,10 +1502,9 @@ def test_otel_exporter_otlp_endpoint_env_var(): logfire.configure(send_to_logfire=False, console=False) [otel_processor] = get_span_processors() - assert isinstance(otel_processor, MainSpanProcessorWrapper) - assert isinstance(otel_processor.processor, BatchSpanProcessor) - assert isinstance(otel_processor.processor.span_exporter, OTLPSpanExporter) - assert otel_processor.processor.span_exporter._endpoint == 'otel_endpoint/v1/traces' # type: ignore + assert isinstance(otel_processor, BatchSpanProcessor) + assert isinstance(otel_processor.span_exporter, OTLPSpanExporter) + assert otel_processor.span_exporter._endpoint == 'otel_endpoint/v1/traces' # type: ignore [otel_metric_reader] = get_metric_readers() assert isinstance(otel_metric_reader, PeriodicExportingMetricReader) @@ -1531,10 +1531,9 @@ def test_otel_metrics_exporter_env_var(): logfire.configure(send_to_logfire=False, console=False) [otel_processor] = get_span_processors() - assert isinstance(otel_processor, MainSpanProcessorWrapper) - assert isinstance(otel_processor.processor, BatchSpanProcessor) - assert isinstance(otel_processor.processor.span_exporter, OTLPSpanExporter) - assert otel_processor.processor.span_exporter._endpoint == 'otel_endpoint3/v1/traces' # type: ignore + assert isinstance(otel_processor, BatchSpanProcessor) + assert isinstance(otel_processor.span_exporter, OTLPSpanExporter) + assert otel_processor.span_exporter._endpoint == 'otel_endpoint3/v1/traces' # type: ignore assert len(list(get_metric_readers())) == 0 @@ -1545,10 +1544,9 @@ def test_otel_exporter_otlp_traces_endpoint_env_var(): logfire.configure(send_to_logfire=False, console=False) [otel_processor] = get_span_processors() - assert isinstance(otel_processor, MainSpanProcessorWrapper) - assert isinstance(otel_processor.processor, BatchSpanProcessor) - assert isinstance(otel_processor.processor.span_exporter, OTLPSpanExporter) - assert otel_processor.processor.span_exporter._endpoint == 'otel_traces_endpoint' # type: ignore + assert isinstance(otel_processor, BatchSpanProcessor) + assert isinstance(otel_processor.span_exporter, OTLPSpanExporter) + assert otel_processor.span_exporter._endpoint == 'otel_traces_endpoint' # type: ignore assert len(list(get_metric_readers())) == 0 @@ -1576,7 +1574,12 @@ def test_metrics_false(monkeypatch: pytest.MonkeyPatch): def get_span_processors() -> Iterable[SpanProcessor]: - return get_tracer_provider().provider._active_span_processor._span_processors # type: ignore + [first, *rest] = get_tracer_provider().provider._active_span_processor._span_processors # type: ignore + assert isinstance(first, CheckSuppressInstrumentationProcessorWrapper) + assert isinstance(first.processor, MainSpanProcessorWrapper) + assert isinstance(first.processor.processor, SynchronousMultiSpanProcessor) + + return [*first.processor.processor._span_processors, *rest] # type: ignore def get_metric_readers() -> Iterable[SpanProcessor]: diff --git a/tests/test_tail_sampling.py b/tests/test_tail_sampling.py index 8a4f0015b..a57c1f689 100644 --- a/tests/test_tail_sampling.py +++ b/tests/test_tail_sampling.py @@ -11,7 +11,7 @@ import logfire from logfire._internal.constants import LEVEL_NUMBERS from logfire.sampling import SpanLevel, TailSamplingSpanInfo -from logfire.testing import SeededRandomIdGenerator, TestExporter +from logfire.testing import SeededRandomIdGenerator, TestExporter, TimeGenerator def test_level_threshold(config_kwargs: dict[str, Any], exporter: TestExporter): @@ -39,7 +39,7 @@ def test_level_threshold(config_kwargs: dict[str, Any], exporter: TestExporter): [ { 'name': 'notice', - 'context': {'trace_id': 2, 'span_id': 4, 'is_remote': False}, + 'context': {'trace_id': 2, 'span_id': 3, 'is_remote': False}, 'parent': None, 'start_time': 4000000000, 'end_time': 4000000000, @@ -55,7 +55,7 @@ def test_level_threshold(config_kwargs: dict[str, Any], exporter: TestExporter): }, { 'name': 'warn', - 'context': {'trace_id': 4, 'span_id': 8, 'is_remote': False}, + 'context': {'trace_id': 4, 'span_id': 6, 'is_remote': False}, 'parent': None, 'start_time': 8000000000, 'end_time': 8000000000, @@ -72,7 +72,7 @@ def test_level_threshold(config_kwargs: dict[str, Any], exporter: TestExporter): { 'name': 'span (pending)', 'context': {'trace_id': 5, 'span_id': 10, 'is_remote': False}, - 'parent': {'trace_id': 5, 'span_id': 9, 'is_remote': False}, + 'parent': {'trace_id': 5, 'span_id': 7, 'is_remote': False}, 'start_time': 9000000000, 'end_time': 9000000000, 'attributes': { @@ -87,8 +87,8 @@ def test_level_threshold(config_kwargs: dict[str, Any], exporter: TestExporter): }, { 'name': 'span2 (pending)', - 'context': {'trace_id': 5, 'span_id': 12, 'is_remote': False}, - 'parent': {'trace_id': 5, 'span_id': 11, 'is_remote': False}, + 'context': {'trace_id': 5, 'span_id': 11, 'is_remote': False}, + 'parent': {'trace_id': 5, 'span_id': 8, 'is_remote': False}, 'start_time': 10000000000, 'end_time': 10000000000, 'attributes': { @@ -98,13 +98,13 @@ def test_level_threshold(config_kwargs: dict[str, Any], exporter: TestExporter): 'logfire.msg_template': 'span2', 'logfire.msg': 'span2', 'logfire.span_type': 'pending_span', - 'logfire.pending_parent_id': '0000000000000009', + 'logfire.pending_parent_id': '0000000000000007', }, }, { 'name': 'error', - 'context': {'trace_id': 5, 'span_id': 13, 'is_remote': False}, - 'parent': {'trace_id': 5, 'span_id': 11, 'is_remote': False}, + 'context': {'trace_id': 5, 'span_id': 9, 'is_remote': False}, + 'parent': {'trace_id': 5, 'span_id': 8, 'is_remote': False}, 'start_time': 11000000000, 'end_time': 11000000000, 'attributes': { @@ -119,8 +119,8 @@ def test_level_threshold(config_kwargs: dict[str, Any], exporter: TestExporter): }, { 'name': 'span2', - 'context': {'trace_id': 5, 'span_id': 11, 'is_remote': False}, - 'parent': {'trace_id': 5, 'span_id': 9, 'is_remote': False}, + 'context': {'trace_id': 5, 'span_id': 8, 'is_remote': False}, + 'parent': {'trace_id': 5, 'span_id': 7, 'is_remote': False}, 'start_time': 10000000000, 'end_time': 12000000000, 'attributes': { @@ -134,7 +134,7 @@ def test_level_threshold(config_kwargs: dict[str, Any], exporter: TestExporter): }, { 'name': 'span', - 'context': {'trace_id': 5, 'span_id': 9, 'is_remote': False}, + 'context': {'trace_id': 5, 'span_id': 7, 'is_remote': False}, 'parent': None, 'start_time': 9000000000, 'end_time': 13000000000, @@ -149,8 +149,8 @@ def test_level_threshold(config_kwargs: dict[str, Any], exporter: TestExporter): }, { 'name': 'span3 (pending)', - 'context': {'trace_id': 6, 'span_id': 15, 'is_remote': False}, - 'parent': {'trace_id': 6, 'span_id': 14, 'is_remote': False}, + 'context': {'trace_id': 6, 'span_id': 13, 'is_remote': False}, + 'parent': {'trace_id': 6, 'span_id': 12, 'is_remote': False}, 'start_time': 14000000000, 'end_time': 14000000000, 'attributes': { @@ -166,8 +166,8 @@ def test_level_threshold(config_kwargs: dict[str, Any], exporter: TestExporter): }, { 'name': 'trace', - 'context': {'trace_id': 6, 'span_id': 16, 'is_remote': False}, - 'parent': {'trace_id': 6, 'span_id': 14, 'is_remote': False}, + 'context': {'trace_id': 6, 'span_id': 14, 'is_remote': False}, + 'parent': {'trace_id': 6, 'span_id': 12, 'is_remote': False}, 'start_time': 15000000000, 'end_time': 15000000000, 'attributes': { @@ -182,7 +182,7 @@ def test_level_threshold(config_kwargs: dict[str, Any], exporter: TestExporter): }, { 'name': 'span3', - 'context': {'trace_id': 6, 'span_id': 14, 'is_remote': False}, + 'context': {'trace_id': 6, 'span_id': 12, 'is_remote': False}, 'parent': None, 'start_time': 14000000000, 'end_time': 16000000000, @@ -200,7 +200,11 @@ def test_level_threshold(config_kwargs: dict[str, Any], exporter: TestExporter): ) -def test_duration_threshold(config_kwargs: dict[str, Any], exporter: TestExporter): +def test_duration_threshold( + config_kwargs: dict[str, Any], + exporter: TestExporter, + time_generator: TimeGenerator, +): # Set level to None to not include spans merely based on a high level. logfire.configure( **config_kwargs, sampling=logfire.SamplingOptions.level_or_duration(level_threshold=None, duration_threshold=3) @@ -220,44 +224,20 @@ def test_duration_threshold(config_kwargs: dict[str, Any], exporter: TestExporte with logfire.span('span5'): logfire.info('long1') + # This reaches the duration threshold when span7 ends but before span6 ends. + # This means that a pending span is created for span6 but not for span7, + # because PendingSpanProcessor doesn't create pending spans for spans that have finished. + with logfire.span('span6'): + time_generator() + with logfire.span('span7'): + time_generator() + assert exporter.exported_spans_as_dict(_include_pending_spans=True) == snapshot( [ - { - 'name': 'span4 (pending)', - 'context': {'trace_id': 4, 'span_id': 10, 'is_remote': False}, - 'parent': {'trace_id': 4, 'span_id': 9, 'is_remote': False}, - 'start_time': 9000000000, - 'end_time': 9000000000, - 'attributes': { - 'code.filepath': 'test_tail_sampling.py', - 'code.function': 'test_duration_threshold', - 'code.lineno': 123, - 'logfire.msg_template': 'span4', - 'logfire.msg': 'span4', - 'logfire.span_type': 'pending_span', - 'logfire.pending_parent_id': '0000000000000000', - }, - }, - { - 'name': 'span5 (pending)', - 'context': {'trace_id': 4, 'span_id': 12, 'is_remote': False}, - 'parent': {'trace_id': 4, 'span_id': 11, 'is_remote': False}, - 'start_time': 10000000000, - 'end_time': 10000000000, - 'attributes': { - 'code.filepath': 'test_tail_sampling.py', - 'code.function': 'test_duration_threshold', - 'code.lineno': 123, - 'logfire.msg_template': 'span5', - 'logfire.msg': 'span5', - 'logfire.span_type': 'pending_span', - 'logfire.pending_parent_id': '0000000000000009', - }, - }, { 'name': 'long1', - 'context': {'trace_id': 4, 'span_id': 13, 'is_remote': False}, - 'parent': {'trace_id': 4, 'span_id': 11, 'is_remote': False}, + 'context': {'trace_id': 4, 'span_id': 8, 'is_remote': False}, + 'parent': {'trace_id': 4, 'span_id': 7, 'is_remote': False}, 'start_time': 11000000000, 'end_time': 11000000000, 'attributes': { @@ -272,8 +252,8 @@ def test_duration_threshold(config_kwargs: dict[str, Any], exporter: TestExporte }, { 'name': 'span5', - 'context': {'trace_id': 4, 'span_id': 11, 'is_remote': False}, - 'parent': {'trace_id': 4, 'span_id': 9, 'is_remote': False}, + 'context': {'trace_id': 4, 'span_id': 7, 'is_remote': False}, + 'parent': {'trace_id': 4, 'span_id': 6, 'is_remote': False}, 'start_time': 10000000000, 'end_time': 12000000000, 'attributes': { @@ -287,7 +267,7 @@ def test_duration_threshold(config_kwargs: dict[str, Any], exporter: TestExporte }, { 'name': 'span4', - 'context': {'trace_id': 4, 'span_id': 9, 'is_remote': False}, + 'context': {'trace_id': 4, 'span_id': 6, 'is_remote': False}, 'parent': None, 'start_time': 9000000000, 'end_time': 13000000000, @@ -300,6 +280,52 @@ def test_duration_threshold(config_kwargs: dict[str, Any], exporter: TestExporte 'logfire.span_type': 'span', }, }, + { + 'name': 'span6 (pending)', + 'context': {'trace_id': 5, 'span_id': 11, 'is_remote': False}, + 'parent': {'trace_id': 5, 'span_id': 9, 'is_remote': False}, + 'start_time': 14000000000, + 'end_time': 14000000000, + 'attributes': { + 'code.filepath': 'test_tail_sampling.py', + 'code.function': 'test_duration_threshold', + 'code.lineno': 123, + 'logfire.msg_template': 'span6', + 'logfire.msg': 'span6', + 'logfire.span_type': 'pending_span', + 'logfire.pending_parent_id': '0000000000000000', + }, + }, + { + 'name': 'span7', + 'context': {'trace_id': 5, 'span_id': 10, 'is_remote': False}, + 'parent': {'trace_id': 5, 'span_id': 9, 'is_remote': False}, + 'start_time': 16000000000, + 'end_time': 18000000000, + 'attributes': { + 'code.filepath': 'test_tail_sampling.py', + 'code.function': 'test_duration_threshold', + 'code.lineno': 123, + 'logfire.msg_template': 'span7', + 'logfire.msg': 'span7', + 'logfire.span_type': 'span', + }, + }, + { + 'name': 'span6', + 'context': {'trace_id': 5, 'span_id': 9, 'is_remote': False}, + 'parent': None, + 'start_time': 14000000000, + 'end_time': 19000000000, + 'attributes': { + 'code.filepath': 'test_tail_sampling.py', + 'code.function': 'test_duration_threshold', + 'code.lineno': 123, + 'logfire.msg_template': 'span6', + 'logfire.msg': 'span6', + 'logfire.span_type': 'span', + }, + }, ] ) @@ -414,12 +440,12 @@ def get_tail_sample_rate(span_info: TailSamplingSpanInfo) -> float: for _ in range(1000): with logfire.span('span'): pass - assert len(exporter.exported_spans_as_dict()) == snapshot(506) + assert len(exporter.exported_spans_as_dict()) == snapshot(505) exporter.clear() for _ in range(1000): logfire.error('error') - assert len(exporter.exported_spans) == snapshot(291) + assert len(exporter.exported_spans) == snapshot(282) def test_span_levels():