Skip to content

Commit

Permalink
BatchSpanProcessor with dedicated thread. (#2456)
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb authored Dec 23, 2024
1 parent 6209c06 commit 1f35467
Show file tree
Hide file tree
Showing 18 changed files with 1,324 additions and 555 deletions.
2 changes: 1 addition & 1 deletion examples/tracing-grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ fn init_tracer() -> sdktrace::TracerProvider {
global::set_text_map_propagator(TraceContextPropagator::new());
// Install stdout exporter pipeline to be able to retrieve the collected spans.
let provider = sdktrace::TracerProvider::builder()
.with_batch_exporter(SpanExporter::default(), Tokio)
.with_batch_exporter(SpanExporter::default())
.build();

global::set_tracer_provider(provider.clone());
Expand Down
2 changes: 1 addition & 1 deletion examples/tracing-grpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ fn init_tracer() -> TracerProvider {
global::set_text_map_propagator(TraceContextPropagator::new());
// Install stdout exporter pipeline to be able to retrieve the collected spans.
let provider = TracerProvider::builder()
.with_batch_exporter(SpanExporter::default(), Tokio)
.with_batch_exporter(SpanExporter::default())
.build();

global::set_tracer_provider(provider.clone());
Expand Down
2 changes: 1 addition & 1 deletion examples/tracing-jaeger/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ fn init_tracer_provider() -> Result<opentelemetry_sdk::trace::TracerProvider, Tr
.build()?;

Ok(TracerProvider::builder()
.with_batch_exporter(exporter, runtime::Tokio)
.with_batch_exporter(exporter)
.with_resource(
Resource::builder()
.with_service_name("tracing-jaeger")
Expand Down
5 changes: 1 addition & 4 deletions opentelemetry-otlp/examples/basic-otlp-http/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ fn init_traces() -> Result<sdktrace::TracerProvider, TraceError> {
.build()?;

Ok(TracerProvider::builder()
// TODO: Enable BatchExporter after
// https://github.com/open-telemetry/opentelemetry-rust/pull/2456
.with_simple_exporter(exporter)
.with_batch_exporter(exporter)
.with_resource(RESOURCE.clone())
.build())
}
Expand All @@ -73,7 +71,6 @@ fn init_metrics() -> Result<opentelemetry_sdk::metrics::SdkMeterProvider, Metric

// #[tokio::main]
// TODO: Re-enable tokio::main, if needed, after
// https://github.com/open-telemetry/opentelemetry-rust/pull/2456
fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let logger_provider = init_logs()?;

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-otlp/examples/basic-otlp/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fn init_traces() -> Result<sdktrace::TracerProvider, TraceError> {
.build()?;
Ok(sdktrace::TracerProvider::builder()
.with_resource(RESOURCE.clone())
.with_batch_exporter(exporter, runtime::Tokio)
.with_batch_exporter(exporter)
.build())
}

Expand Down
46 changes: 45 additions & 1 deletion opentelemetry-otlp/tests/integration_test/tests/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn init_tracer_provider() -> Result<sdktrace::TracerProvider, TraceError> {
let exporter = exporter_builder.build()?;

Ok(opentelemetry_sdk::trace::TracerProvider::builder()
.with_batch_exporter(exporter, runtime::Tokio)
.with_batch_exporter(exporter)
.with_resource(
Resource::builder_empty()
.with_service_name("basic-otlp-tracing-example")
Expand Down Expand Up @@ -141,6 +141,50 @@ pub fn test_serde() -> Result<()> {
Ok(())
}

#[test]
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
pub fn span_batch_non_tokio_main() -> Result<()> {
// Initialize the tracer provider inside a tokio runtime
// as this allows tonic client to capture the runtime,
// but actual export occurs from the dedicated std::thread
// created by BatchSpanProcessor.

use anyhow::Ok;
let rt = tokio::runtime::Runtime::new()?;
let tracer_provider = rt.block_on(async {
// While we're here setup our collector container too, as this needs tokio to run
let _ = test_utils::start_collector_container().await;
init_tracer_provider()
})?;

let tracer = global::tracer("ex.com/basic");

tracer.in_span("operation", |cx| {
let span = cx.span();
span.add_event(
"Nice operation!".to_string(),
vec![KeyValue::new("bogons", 100)],
);
span.set_attribute(KeyValue::new(ANOTHER_KEY, "yes"));

tracer.in_span("Sub operation...", |cx| {
let span = cx.span();
span.set_attribute(KeyValue::new(LEMONS_KEY, "five"));

span.add_event("Sub span event", vec![]);
});
});

tracer_provider.shutdown()?;

// Give it a second to flush
std::thread::sleep(Duration::from_secs(2));

// Validate results
assert_traces_results(test_utils::TRACES_FILE, "./expected/traces.json")?;
Ok(())
}

///
/// Make sure we stop the collector container, otherwise it will sit around hogging our
/// ports and subsequent test runs will fail.
Expand Down
1 change: 0 additions & 1 deletion opentelemetry-otlp/tests/smoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ async fn smoke_tracer() {
.with_metadata(metadata)
.build()
.expect("NON gzip-tonic SpanExporter failed to build"),
opentelemetry_sdk::runtime::Tokio,
)
.build();

Expand Down
52 changes: 52 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,58 @@ metadata, a feature introduced in version 0.1.40. [#2418](https://github.com/ope
- Continue enabling one of the async runtime feature flags: `rt-tokio`,
`rt-tokio-current-thread`, or `rt-async-std`.

- **Breaking** [#2456](https://github.com/open-telemetry/opentelemetry-rust/pull/2456)

`BatchSpanProcessor` no longer requires an async runtime by default. Instead, a dedicated
background thread is created to do the batch processing and exporting.

For users who prefer the previous behavior of relying on a specific
`Runtime`, they can do so by enabling the feature flag
**`experimental_trace_batch_span_processor_with_async_runtime`**.

1. *Default Implementation, requires no async runtime* (**Recommended**) The
new default implementation does not require a runtime argument. Replace the
builder method accordingly:
- *Before:*
```rust
let tracer_provider = TracerProvider::builder()
.with_span_processor(BatchSpanProcessor::builder(exporter, runtime::Tokio).build())
.build();
```

- *After:*
```rust
let tracer_provider = TracerProvider::builder()
.with_span_processor(BatchSpanProcessor::builder(exporter).build())
.build();
```

2. *Async Runtime Support*
If your application cannot spin up new threads or you prefer using async
runtimes, enable the
"experimental_trace_batch_span_processor_with_async_runtime" feature flag and
adjust code as below.

- *Before:*
```rust
let tracer_provider = TracerProvider::builder()
.with_span_processor(BatchSpanProcessor::builder(exporter, runtime::Tokio).build())
.build();
```

- *After:*
```rust
let tracer_provider = TracerProvider::builder()
.with_span_processor(span_processor_with_async_runtime::BatchSpanProcessor::builder(exporter, runtime::Tokio).build())
.build();
```

*Requirements:*
- Enable the feature flag:
`experimental_trace_batch_span_processor_with_async_runtime`.
- Continue enabling one of the async runtime feature flags: `rt-tokio`,
`rt-tokio-current-thread`, or `rt-async-std`.

## 0.27.1

Released 2024-Nov-27
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ internal-logs = ["tracing"]
experimental_metrics_periodicreader_with_async_runtime = ["metrics"]
spec_unstable_metrics_views = ["metrics"]
experimental_logs_batch_log_processor_with_async_runtime = ["logs"]
experimental_trace_batch_span_processor_with_async_runtime = ["trace"]


[[bench]]
name = "context"
Expand Down
23 changes: 10 additions & 13 deletions opentelemetry-sdk/benches/batch_span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use opentelemetry::trace::{
SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState,
};
use opentelemetry_sdk::export::trace::SpanData;
use opentelemetry_sdk::runtime::Tokio;
use opentelemetry_sdk::testing::trace::NoopSpanExporter;
use opentelemetry_sdk::trace::{
BatchConfigBuilder, BatchSpanProcessor, SpanEvents, SpanLinks, SpanProcessor,
Expand Down Expand Up @@ -49,14 +48,13 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
let rt = Runtime::new().unwrap();
rt.block_on(async move {
let span_processor =
BatchSpanProcessor::builder(NoopSpanExporter::new(), Tokio)
.with_batch_config(
BatchConfigBuilder::default()
.with_max_queue_size(10_000)
.build(),
)
.build();
let span_processor = BatchSpanProcessor::builder(NoopSpanExporter::new())
.with_batch_config(
BatchConfigBuilder::default()
.with_max_queue_size(10_000)
.build(),
)
.build();
let mut shared_span_processor = Arc::new(span_processor);
let mut handles = Vec::with_capacity(10);
for _ in 0..task_num {
Expand All @@ -70,10 +68,9 @@ fn criterion_benchmark(c: &mut Criterion) {
}));
}
futures_util::future::join_all(handles).await;
let _ =
Arc::<BatchSpanProcessor<Tokio>>::get_mut(&mut shared_span_processor)
.unwrap()
.shutdown();
let _ = Arc::<BatchSpanProcessor>::get_mut(&mut shared_span_processor)
.unwrap()
.shutdown();
});
})
},
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::{Arc, Mutex};
///# async fn main() {
/// let exporter = InMemorySpanExporterBuilder::new().build();
/// let provider = TracerProvider::builder()
/// .with_span_processor(BatchSpanProcessor::builder(exporter.clone(), runtime::Tokio).build())
/// .with_span_processor(BatchSpanProcessor::builder(exporter.clone()).build())
/// .build();
///
/// global::set_tracer_provider(provider.clone());
Expand Down
5 changes: 5 additions & 0 deletions opentelemetry-sdk/src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ mod sampler;
mod span;
mod span_limit;
mod span_processor;
#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
/// Experimental feature to use async runtime with batch span processor.
pub mod span_processor_with_async_runtime;
mod tracer;

pub use config::{config, Config};
Expand All @@ -30,11 +33,13 @@ pub use span_processor::{
BatchConfig, BatchConfigBuilder, BatchSpanProcessor, BatchSpanProcessorBuilder,
SimpleSpanProcessor, SpanProcessor,
};

pub use tracer::Tracer;

#[cfg(feature = "jaeger_remote_sampler")]
pub use sampler::{JaegerRemoteSampler, JaegerRemoteSamplerBuilder};

#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
#[cfg(test)]
mod runtime_tests;

Expand Down
9 changes: 2 additions & 7 deletions opentelemetry-sdk/src/trace/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
/// provider.shutdown();
/// }
/// ```
use crate::runtime::RuntimeChannel;
use crate::trace::{
BatchSpanProcessor, Config, RandomIdGenerator, Sampler, SimpleSpanProcessor, SpanLimits, Tracer,
};
Expand Down Expand Up @@ -296,12 +295,8 @@ impl Builder {
}

/// The [`SpanExporter`] setup using a default [`BatchSpanProcessor`] that this provider should use.
pub fn with_batch_exporter<T: SpanExporter + 'static, R: RuntimeChannel>(
self,
exporter: T,
runtime: R,
) -> Self {
let batch = BatchSpanProcessor::builder(exporter, runtime).build();
pub fn with_batch_exporter<T: SpanExporter + 'static>(self, exporter: T) -> Self {
let batch = BatchSpanProcessor::builder(exporter).build();
self.with_span_processor(batch)
}

Expand Down
6 changes: 5 additions & 1 deletion opentelemetry-sdk/src/trace/runtime_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,12 @@ fn build_batch_tracer_provider<R: RuntimeChannel>(
runtime: R,
) -> crate::trace::TracerProvider {
use crate::trace::TracerProvider;
let processor = crate::trace::span_processor_with_async_runtime::BatchSpanProcessor::builder(
exporter, runtime,
)
.build();
TracerProvider::builder()
.with_batch_exporter(exporter, runtime)
.with_span_processor(processor)
.build()
}

Expand Down
Loading

0 comments on commit 1f35467

Please sign in to comment.