From 1f354674d64af419a06668bd556005b3f280fc5e Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 24 Dec 2024 00:45:48 +0530 Subject: [PATCH] BatchSpanProcessor with dedicated thread. (#2456) --- examples/tracing-grpc/src/client.rs | 2 +- examples/tracing-grpc/src/server.rs | 2 +- examples/tracing-jaeger/src/main.rs | 2 +- .../examples/basic-otlp-http/src/main.rs | 5 +- .../examples/basic-otlp/src/main.rs | 2 +- .../tests/integration_test/tests/traces.rs | 46 +- opentelemetry-otlp/tests/smoke.rs | 1 - opentelemetry-sdk/CHANGELOG.md | 52 + opentelemetry-sdk/Cargo.toml | 2 + .../benches/batch_span_processor.rs | 23 +- .../src/testing/trace/in_memory_exporter.rs | 2 +- opentelemetry-sdk/src/trace/mod.rs | 5 + opentelemetry-sdk/src/trace/provider.rs | 9 +- opentelemetry-sdk/src/trace/runtime_tests.rs | 6 +- opentelemetry-sdk/src/trace/span_processor.rs | 1080 +++++++++-------- .../span_processor_with_async_runtime.rs | 619 ++++++++++ opentelemetry-zipkin/src/exporter/mod.rs | 6 +- scripts/integration_tests.sh | 15 +- 18 files changed, 1324 insertions(+), 555 deletions(-) create mode 100644 opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs diff --git a/examples/tracing-grpc/src/client.rs b/examples/tracing-grpc/src/client.rs index 29a9353621..18c42cec6f 100644 --- a/examples/tracing-grpc/src/client.rs +++ b/examples/tracing-grpc/src/client.rs @@ -13,7 +13,7 @@ fn init_tracer() -> sdktrace::TracerProvider { global::set_text_map_propagator(TraceContextPropagator::new()); // Install stdout exporter pipeline to be able to retrieve the collected spans. let provider = sdktrace::TracerProvider::builder() - .with_batch_exporter(SpanExporter::default(), Tokio) + .with_batch_exporter(SpanExporter::default()) .build(); global::set_tracer_provider(provider.clone()); diff --git a/examples/tracing-grpc/src/server.rs b/examples/tracing-grpc/src/server.rs index 24a9e09481..3dbb012321 100644 --- a/examples/tracing-grpc/src/server.rs +++ b/examples/tracing-grpc/src/server.rs @@ -15,7 +15,7 @@ fn init_tracer() -> TracerProvider { global::set_text_map_propagator(TraceContextPropagator::new()); // Install stdout exporter pipeline to be able to retrieve the collected spans. let provider = TracerProvider::builder() - .with_batch_exporter(SpanExporter::default(), Tokio) + .with_batch_exporter(SpanExporter::default()) .build(); global::set_tracer_provider(provider.clone()); diff --git a/examples/tracing-jaeger/src/main.rs b/examples/tracing-jaeger/src/main.rs index e015f9ab9f..7e06d8e921 100644 --- a/examples/tracing-jaeger/src/main.rs +++ b/examples/tracing-jaeger/src/main.rs @@ -14,7 +14,7 @@ fn init_tracer_provider() -> Result Result { .build()?; Ok(TracerProvider::builder() - // TODO: Enable BatchExporter after - // https://github.com/open-telemetry/opentelemetry-rust/pull/2456 - .with_simple_exporter(exporter) + .with_batch_exporter(exporter) .with_resource(RESOURCE.clone()) .build()) } @@ -73,7 +71,6 @@ fn init_metrics() -> Result Result<(), Box> { let logger_provider = init_logs()?; diff --git a/opentelemetry-otlp/examples/basic-otlp/src/main.rs b/opentelemetry-otlp/examples/basic-otlp/src/main.rs index c5425f8a9b..c75ed2eeb7 100644 --- a/opentelemetry-otlp/examples/basic-otlp/src/main.rs +++ b/opentelemetry-otlp/examples/basic-otlp/src/main.rs @@ -27,7 +27,7 @@ fn init_traces() -> Result { .build()?; Ok(sdktrace::TracerProvider::builder() .with_resource(RESOURCE.clone()) - .with_batch_exporter(exporter, runtime::Tokio) + .with_batch_exporter(exporter) .build()) } diff --git a/opentelemetry-otlp/tests/integration_test/tests/traces.rs b/opentelemetry-otlp/tests/integration_test/tests/traces.rs index 1601e04132..29bc93d77f 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/traces.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/traces.rs @@ -35,7 +35,7 @@ fn init_tracer_provider() -> Result { let exporter = exporter_builder.build()?; Ok(opentelemetry_sdk::trace::TracerProvider::builder() - .with_batch_exporter(exporter, runtime::Tokio) + .with_batch_exporter(exporter) .with_resource( Resource::builder_empty() .with_service_name("basic-otlp-tracing-example") @@ -141,6 +141,50 @@ pub fn test_serde() -> Result<()> { Ok(()) } +#[test] +#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))] +pub fn span_batch_non_tokio_main() -> Result<()> { + // Initialize the tracer provider inside a tokio runtime + // as this allows tonic client to capture the runtime, + // but actual export occurs from the dedicated std::thread + // created by BatchSpanProcessor. + + use anyhow::Ok; + let rt = tokio::runtime::Runtime::new()?; + let tracer_provider = rt.block_on(async { + // While we're here setup our collector container too, as this needs tokio to run + let _ = test_utils::start_collector_container().await; + init_tracer_provider() + })?; + + let tracer = global::tracer("ex.com/basic"); + + tracer.in_span("operation", |cx| { + let span = cx.span(); + span.add_event( + "Nice operation!".to_string(), + vec![KeyValue::new("bogons", 100)], + ); + span.set_attribute(KeyValue::new(ANOTHER_KEY, "yes")); + + tracer.in_span("Sub operation...", |cx| { + let span = cx.span(); + span.set_attribute(KeyValue::new(LEMONS_KEY, "five")); + + span.add_event("Sub span event", vec![]); + }); + }); + + tracer_provider.shutdown()?; + + // Give it a second to flush + std::thread::sleep(Duration::from_secs(2)); + + // Validate results + assert_traces_results(test_utils::TRACES_FILE, "./expected/traces.json")?; + Ok(()) +} + /// /// Make sure we stop the collector container, otherwise it will sit around hogging our /// ports and subsequent test runs will fail. diff --git a/opentelemetry-otlp/tests/smoke.rs b/opentelemetry-otlp/tests/smoke.rs index ba09407e1e..e9cd0da165 100644 --- a/opentelemetry-otlp/tests/smoke.rs +++ b/opentelemetry-otlp/tests/smoke.rs @@ -100,7 +100,6 @@ async fn smoke_tracer() { .with_metadata(metadata) .build() .expect("NON gzip-tonic SpanExporter failed to build"), - opentelemetry_sdk::runtime::Tokio, ) .build(); diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index df8a7aa2f0..140f41ed1f 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -159,6 +159,58 @@ metadata, a feature introduced in version 0.1.40. [#2418](https://github.com/ope - Continue enabling one of the async runtime feature flags: `rt-tokio`, `rt-tokio-current-thread`, or `rt-async-std`. +- **Breaking** [#2456](https://github.com/open-telemetry/opentelemetry-rust/pull/2456) + + `BatchSpanProcessor` no longer requires an async runtime by default. Instead, a dedicated + background thread is created to do the batch processing and exporting. + + For users who prefer the previous behavior of relying on a specific + `Runtime`, they can do so by enabling the feature flag + **`experimental_trace_batch_span_processor_with_async_runtime`**. + + 1. *Default Implementation, requires no async runtime* (**Recommended**) The + new default implementation does not require a runtime argument. Replace the + builder method accordingly: + - *Before:* + ```rust + let tracer_provider = TracerProvider::builder() + .with_span_processor(BatchSpanProcessor::builder(exporter, runtime::Tokio).build()) + .build(); + ``` + + - *After:* + ```rust + let tracer_provider = TracerProvider::builder() + .with_span_processor(BatchSpanProcessor::builder(exporter).build()) + .build(); + ``` + + 2. *Async Runtime Support* + If your application cannot spin up new threads or you prefer using async + runtimes, enable the + "experimental_trace_batch_span_processor_with_async_runtime" feature flag and + adjust code as below. + + - *Before:* + ```rust + let tracer_provider = TracerProvider::builder() + .with_span_processor(BatchSpanProcessor::builder(exporter, runtime::Tokio).build()) + .build(); + ``` + + - *After:* + ```rust + let tracer_provider = TracerProvider::builder() + .with_span_processor(span_processor_with_async_runtime::BatchSpanProcessor::builder(exporter, runtime::Tokio).build()) + .build(); + ``` + + *Requirements:* + - Enable the feature flag: + `experimental_trace_batch_span_processor_with_async_runtime`. + - Continue enabling one of the async runtime feature flags: `rt-tokio`, + `rt-tokio-current-thread`, or `rt-async-std`. + ## 0.27.1 Released 2024-Nov-27 diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index 1310189f89..167846dce1 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -56,6 +56,8 @@ internal-logs = ["tracing"] experimental_metrics_periodicreader_with_async_runtime = ["metrics"] spec_unstable_metrics_views = ["metrics"] experimental_logs_batch_log_processor_with_async_runtime = ["logs"] +experimental_trace_batch_span_processor_with_async_runtime = ["trace"] + [[bench]] name = "context" diff --git a/opentelemetry-sdk/benches/batch_span_processor.rs b/opentelemetry-sdk/benches/batch_span_processor.rs index ed20c45a06..d57ef26157 100644 --- a/opentelemetry-sdk/benches/batch_span_processor.rs +++ b/opentelemetry-sdk/benches/batch_span_processor.rs @@ -3,7 +3,6 @@ use opentelemetry::trace::{ SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState, }; use opentelemetry_sdk::export::trace::SpanData; -use opentelemetry_sdk::runtime::Tokio; use opentelemetry_sdk::testing::trace::NoopSpanExporter; use opentelemetry_sdk::trace::{ BatchConfigBuilder, BatchSpanProcessor, SpanEvents, SpanLinks, SpanProcessor, @@ -49,14 +48,13 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { let rt = Runtime::new().unwrap(); rt.block_on(async move { - let span_processor = - BatchSpanProcessor::builder(NoopSpanExporter::new(), Tokio) - .with_batch_config( - BatchConfigBuilder::default() - .with_max_queue_size(10_000) - .build(), - ) - .build(); + let span_processor = BatchSpanProcessor::builder(NoopSpanExporter::new()) + .with_batch_config( + BatchConfigBuilder::default() + .with_max_queue_size(10_000) + .build(), + ) + .build(); let mut shared_span_processor = Arc::new(span_processor); let mut handles = Vec::with_capacity(10); for _ in 0..task_num { @@ -70,10 +68,9 @@ fn criterion_benchmark(c: &mut Criterion) { })); } futures_util::future::join_all(handles).await; - let _ = - Arc::>::get_mut(&mut shared_span_processor) - .unwrap() - .shutdown(); + let _ = Arc::::get_mut(&mut shared_span_processor) + .unwrap() + .shutdown(); }); }) }, diff --git a/opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs index 0ae261916a..3645d9f6c2 100644 --- a/opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs @@ -22,7 +22,7 @@ use std::sync::{Arc, Mutex}; ///# async fn main() { /// let exporter = InMemorySpanExporterBuilder::new().build(); /// let provider = TracerProvider::builder() -/// .with_span_processor(BatchSpanProcessor::builder(exporter.clone(), runtime::Tokio).build()) +/// .with_span_processor(BatchSpanProcessor::builder(exporter.clone()).build()) /// .build(); /// /// global::set_tracer_provider(provider.clone()); diff --git a/opentelemetry-sdk/src/trace/mod.rs b/opentelemetry-sdk/src/trace/mod.rs index 43445c4a4c..4acf809022 100644 --- a/opentelemetry-sdk/src/trace/mod.rs +++ b/opentelemetry-sdk/src/trace/mod.rs @@ -15,6 +15,9 @@ mod sampler; mod span; mod span_limit; mod span_processor; +#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")] +/// Experimental feature to use async runtime with batch span processor. +pub mod span_processor_with_async_runtime; mod tracer; pub use config::{config, Config}; @@ -30,11 +33,13 @@ pub use span_processor::{ BatchConfig, BatchConfigBuilder, BatchSpanProcessor, BatchSpanProcessorBuilder, SimpleSpanProcessor, SpanProcessor, }; + pub use tracer::Tracer; #[cfg(feature = "jaeger_remote_sampler")] pub use sampler::{JaegerRemoteSampler, JaegerRemoteSamplerBuilder}; +#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")] #[cfg(test)] mod runtime_tests; diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index 4820d7e929..141850a759 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -62,7 +62,6 @@ /// provider.shutdown(); /// } /// ``` -use crate::runtime::RuntimeChannel; use crate::trace::{ BatchSpanProcessor, Config, RandomIdGenerator, Sampler, SimpleSpanProcessor, SpanLimits, Tracer, }; @@ -296,12 +295,8 @@ impl Builder { } /// The [`SpanExporter`] setup using a default [`BatchSpanProcessor`] that this provider should use. - pub fn with_batch_exporter( - self, - exporter: T, - runtime: R, - ) -> Self { - let batch = BatchSpanProcessor::builder(exporter, runtime).build(); + pub fn with_batch_exporter(self, exporter: T) -> Self { + let batch = BatchSpanProcessor::builder(exporter).build(); self.with_span_processor(batch) } diff --git a/opentelemetry-sdk/src/trace/runtime_tests.rs b/opentelemetry-sdk/src/trace/runtime_tests.rs index 75cb1b4475..ed1a8325d1 100644 --- a/opentelemetry-sdk/src/trace/runtime_tests.rs +++ b/opentelemetry-sdk/src/trace/runtime_tests.rs @@ -52,8 +52,12 @@ fn build_batch_tracer_provider( runtime: R, ) -> crate::trace::TracerProvider { use crate::trace::TracerProvider; + let processor = crate::trace::span_processor_with_async_runtime::BatchSpanProcessor::builder( + exporter, runtime, + ) + .build(); TracerProvider::builder() - .with_batch_exporter(exporter, runtime) + .with_span_processor(processor) .build() } diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 5023ca2bc5..3972ac65ee 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -34,18 +34,11 @@ //! [`is_recording`]: opentelemetry::trace::Span::is_recording() //! [`TracerProvider`]: opentelemetry::trace::TracerProvider -use crate::export::trace::{ExportResult, SpanData, SpanExporter}; +use crate::export::trace::{SpanData, SpanExporter}; use crate::resource::Resource; -use crate::runtime::{RuntimeChannel, TrySend}; use crate::trace::Span; -use futures_channel::oneshot; -use futures_util::{ - future::{self, BoxFuture, Either}, - select, - stream::{self, FusedStream, FuturesUnordered}, - StreamExt as _, -}; -use opentelemetry::{otel_debug, otel_error, otel_warn}; +use opentelemetry::otel_error; +use opentelemetry::{otel_debug, otel_warn}; use opentelemetry::{ trace::{TraceError, TraceResult}, Context, @@ -53,29 +46,33 @@ use opentelemetry::{ use std::cmp::min; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; -use std::{env, fmt, str::FromStr, time::Duration}; +use std::{env, str::FromStr, time::Duration}; + +use std::sync::atomic::AtomicBool; +use std::thread; +use std::time::Instant; /// Delay interval between two consecutive exports. -const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY"; +pub(crate) const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY"; /// Default delay interval between two consecutive exports. -const OTEL_BSP_SCHEDULE_DELAY_DEFAULT: u64 = 5_000; +pub(crate) const OTEL_BSP_SCHEDULE_DELAY_DEFAULT: u64 = 5_000; /// Maximum queue size -const OTEL_BSP_MAX_QUEUE_SIZE: &str = "OTEL_BSP_MAX_QUEUE_SIZE"; +pub(crate) const OTEL_BSP_MAX_QUEUE_SIZE: &str = "OTEL_BSP_MAX_QUEUE_SIZE"; /// Default maximum queue size -const OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048; +pub(crate) const OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048; /// Maximum batch size, must be less than or equal to OTEL_BSP_MAX_QUEUE_SIZE -const OTEL_BSP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE"; +pub(crate) const OTEL_BSP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE"; /// Default maximum batch size -const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512; +pub(crate) const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512; /// Maximum allowed time to export data. -const OTEL_BSP_EXPORT_TIMEOUT: &str = "OTEL_BSP_EXPORT_TIMEOUT"; +pub(crate) const OTEL_BSP_EXPORT_TIMEOUT: &str = "OTEL_BSP_EXPORT_TIMEOUT"; /// Default maximum allowed time to export data. -const OTEL_BSP_EXPORT_TIMEOUT_DEFAULT: u64 = 30_000; +pub(crate) const OTEL_BSP_EXPORT_TIMEOUT_DEFAULT: u64 = 30_000; /// Environment variable to configure max concurrent exports for batch span /// processor. -const OTEL_BSP_MAX_CONCURRENT_EXPORTS: &str = "OTEL_BSP_MAX_CONCURRENT_EXPORTS"; +pub(crate) const OTEL_BSP_MAX_CONCURRENT_EXPORTS: &str = "OTEL_BSP_MAX_CONCURRENT_EXPORTS"; /// Default max concurrent exports for BSP -const OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT: usize = 1; +pub(crate) const OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT: usize = 1; /// `SpanProcessor` is an interface which allows hooks for span start and end /// method invocations. The span processors are invoked only when is_recording @@ -96,7 +93,7 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// /// Implementation should make sure shutdown can be called multiple times. fn shutdown(&self) -> TraceResult<()>; - /// Set the resource for the log processor. + /// Set the resource for the span processor. fn set_resource(&mut self, _resource: &Resource) {} } @@ -166,140 +163,265 @@ impl SpanProcessor for SimpleSpanProcessor { } } -/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports -/// them at a preconfigured interval. -/// -/// Batch span processors need to run a background task to collect and send -/// spans. Different runtimes need different ways to handle the background task. -/// -/// Note: Configuring an opentelemetry `Runtime` that's not compatible with the -/// underlying runtime can cause deadlocks (see tokio section). +/// The `BatchSpanProcessor` collects finished spans in a buffer and exports them +/// in batches to the configured `SpanExporter`. This processor is ideal for +/// high-throughput environments, as it minimizes the overhead of exporting spans +/// individually. It uses a **dedicated background thread** to manage and export spans +/// asynchronously, ensuring that the application's main execution flow is not blocked. /// -/// ### Use with Tokio +/// /// # Example /// -/// Tokio currently offers two different schedulers. One is -/// `current_thread_scheduler`, the other is `multiple_thread_scheduler`. Both -/// of them default to use batch span processors to install span exporters. +/// This example demonstrates how to configure and use the `BatchSpanProcessor` +/// with a custom configuration. Note that a dedicated thread is used internally +/// to manage the export process. /// -/// Tokio's `current_thread_scheduler` can cause the program to hang forever if -/// blocking work is scheduled with other tasks in the same runtime. To avoid -/// this, be sure to enable the `rt-tokio-current-thread` feature in this crate -/// if you are using that runtime (e.g. users of actix-web), and blocking tasks -/// will then be scheduled on a different thread. -/// -/// # Examples -/// -/// This processor can be configured with an [`executor`] of your choice to -/// batch and upload spans asynchronously when they end. If you have added a -/// library like [`tokio`] or [`async-std`], you can pass in their respective -/// `spawn` and `interval` functions to have batching performed in those -/// contexts. -/// -/// ``` -/// # #[cfg(feature="tokio")] -/// # { +/// ```rust /// use opentelemetry::global; -/// use opentelemetry_sdk::{runtime, testing::trace::NoopSpanExporter, trace}; -/// use opentelemetry_sdk::trace::BatchConfigBuilder; +/// use opentelemetry_sdk::{ +/// trace::{BatchSpanProcessor, BatchConfigBuilder, TracerProvider}, +/// runtime, +/// testing::trace::NoopSpanExporter, +/// }; +/// use opentelemetry::trace::Tracer as _; +/// use opentelemetry::trace::Span; /// use std::time::Duration; /// -/// #[tokio::main] -/// async fn main() { -/// // Configure your preferred exporter +/// fn main() { +/// // Step 1: Create an exporter (e.g., a No-Op Exporter for demonstration). /// let exporter = NoopSpanExporter::new(); /// -/// // Create a batch span processor using an exporter and a runtime -/// let batch = trace::BatchSpanProcessor::builder(exporter, runtime::Tokio) -/// .with_batch_config(BatchConfigBuilder::default().with_max_queue_size(4096).build()) +/// // Step 2: Configure the BatchSpanProcessor. +/// let batch_processor = BatchSpanProcessor::builder(exporter) +/// .with_batch_config( +/// BatchConfigBuilder::default() +/// .with_max_queue_size(1024) // Buffer up to 1024 spans. +/// .with_max_export_batch_size(256) // Export in batches of up to 256 spans. +/// .with_scheduled_delay(Duration::from_secs(5)) // Export every 5 seconds. +/// .with_max_export_timeout(Duration::from_secs(10)) // Timeout after 10 seconds. +/// .build(), +/// ) /// .build(); /// -/// // Then use the `with_batch_exporter` method to have the provider export spans in batches. -/// let provider = trace::TracerProvider::builder() -/// .with_span_processor(batch) +/// // Step 3: Set up a TracerProvider with the configured processor. +/// let provider = TracerProvider::builder() +/// .with_span_processor(batch_processor) /// .build(); +/// global::set_tracer_provider(provider.clone()); /// -/// let _ = global::set_tracer_provider(provider); +/// // Step 4: Create spans and record operations. +/// let tracer = global::tracer("example-tracer"); +/// let mut span = tracer.start("example-span"); +/// span.end(); // Mark the span as completed. +/// +/// // Step 5: Ensure all spans are flushed before exiting. +/// provider.shutdown(); /// } -/// # } /// ``` -/// -/// [`executor`]: https://docs.rs/futures/0.3/futures/executor/index.html -/// [`tokio`]: https://tokio.rs -/// [`async-std`]: https://async.rs -pub struct BatchSpanProcessor { - message_sender: R::Sender, +use futures_executor::block_on; +use std::sync::mpsc::sync_channel; +use std::sync::mpsc::RecvTimeoutError; +use std::sync::mpsc::SyncSender; - // Track dropped spans - dropped_spans_count: AtomicUsize, +/// Messages exchanged between the main thread and the background thread. +#[allow(clippy::large_enum_variant)] +#[derive(Debug)] +enum BatchMessage { + ExportSpan(SpanData), + ForceFlush(SyncSender>), + Shutdown(SyncSender>), + SetResource(Arc), +} - // Track the maximum queue size that was configured for this processor - max_queue_size: usize, +/// A batch span processor with a dedicated background thread. +#[derive(Debug)] +pub struct BatchSpanProcessor { + message_sender: SyncSender, + handle: Mutex>>, + forceflush_timeout: Duration, + shutdown_timeout: Duration, + is_shutdown: AtomicBool, + dropped_span_count: Arc, } -impl fmt::Debug for BatchSpanProcessor { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("BatchSpanProcessor") - .field("message_sender", &self.message_sender) - .finish() +impl BatchSpanProcessor { + /// Creates a new instance of `BatchSpanProcessor`. + pub fn new( + mut exporter: E, + config: BatchConfig, + //max_queue_size: usize, + //scheduled_delay: Duration, + //shutdown_timeout: Duration, + ) -> Self + where + E: SpanExporter + Send + 'static, + { + let (message_sender, message_receiver) = sync_channel(config.max_queue_size); + + let handle = thread::Builder::new() + .name("BatchSpanProcessorThread".to_string()) + .spawn(move || { + let mut spans = Vec::with_capacity(config.max_export_batch_size); + let mut last_export_time = Instant::now(); + + loop { + let remaining_time_option = config + .scheduled_delay + .checked_sub(last_export_time.elapsed()); + let remaining_time = match remaining_time_option { + Some(remaining_time) => remaining_time, + None => config.scheduled_delay, + }; + match message_receiver.recv_timeout(remaining_time) { + Ok(message) => match message { + BatchMessage::ExportSpan(span) => { + spans.push(span); + if spans.len() >= config.max_queue_size + || last_export_time.elapsed() >= config.scheduled_delay + { + if let Err(err) = block_on(exporter.export(spans.split_off(0))) + { + otel_error!( + name: "BatchSpanProcessor.ExportError", + error = format!("{}", err) + ); + } + last_export_time = Instant::now(); + } + } + BatchMessage::ForceFlush(sender) => { + let result = block_on(exporter.export(spans.split_off(0))); + let _ = sender.send(result); + } + BatchMessage::Shutdown(sender) => { + let result = block_on(exporter.export(spans.split_off(0))); + let _ = sender.send(result); + break; + } + BatchMessage::SetResource(resource) => { + exporter.set_resource(&resource); + } + }, + Err(RecvTimeoutError::Timeout) => { + if last_export_time.elapsed() >= config.scheduled_delay { + if let Err(err) = block_on(exporter.export(spans.split_off(0))) { + otel_error!( + name: "BatchSpanProcessor.ExportError", + error = format!("{}", err) + ); + } + last_export_time = Instant::now(); + } + } + Err(RecvTimeoutError::Disconnected) => { + otel_error!( + name: "BatchSpanProcessor.InternalError.ChannelDisconnected", + message = "Channel disconnected, shutting down processor thread." + ); + break; + } + } + } + }) + .expect("Failed to spawn thread"); //TODO: Handle thread spawn failure + + Self { + message_sender, + handle: Mutex::new(Some(handle)), + forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable + shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable + is_shutdown: AtomicBool::new(false), + dropped_span_count: Arc::new(AtomicUsize::new(0)), + } + } + + /// builder + pub fn builder(exporter: E) -> BatchSpanProcessorBuilder + where + E: SpanExporter + Send + 'static, + { + BatchSpanProcessorBuilder { + exporter, + config: BatchConfig::default(), + } } } -impl SpanProcessor for BatchSpanProcessor { +impl SpanProcessor for BatchSpanProcessor { + /// Handles span start. fn on_start(&self, _span: &mut Span, _cx: &Context) { // Ignored } + /// Handles span end. fn on_end(&self, span: SpanData) { - if !span.span_context.is_sampled() { + if self.is_shutdown.load(Ordering::Relaxed) { + // this is a warning, as the user is trying to emit after the processor has been shutdown + otel_warn!( + name: "BatchSpanProcessor.Emit.ProcessorShutdown", + ); return; } - let result = self.message_sender.try_send(BatchMessage::ExportSpan(span)); - // If the queue is full, and we can't buffer a span + // TODO - Implement throttling to prevent error flooding when the queue is full or closed. if result.is_err() { - // Increment the number of dropped spans. If this is the first time we've had to drop, + // Increment dropped span count. The first time we have to drop a span, // emit a warning. - if self.dropped_spans_count.fetch_add(1, Ordering::Relaxed) == 0 { - otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted", - message = "Beginning to drop span messages due to full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total spans dropped."); + if self.dropped_span_count.fetch_add(1, Ordering::Relaxed) == 0 { + otel_warn!(name: "BatchSpanProcessorDedicatedThread.SpanDroppingStarted", + message = "BatchSpanProcessorDedicatedThread dropped a Span due to queue full/internal errors. No further span will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped."); } } } + /// Flushes all pending spans. fn force_flush(&self) -> TraceResult<()> { - let (res_sender, res_receiver) = oneshot::channel(); + if self.is_shutdown.load(Ordering::Relaxed) { + return Err(TraceError::Other("Processor already shutdown".into())); + } + let (sender, receiver) = sync_channel(1); self.message_sender - .try_send(BatchMessage::Flush(Some(res_sender))) - .map_err(|err| TraceError::Other(err.into()))?; + .try_send(BatchMessage::ForceFlush(sender)) + .map_err(|_| TraceError::Other("Failed to send ForceFlush message".into()))?; - futures_executor::block_on(res_receiver) - .map_err(|err| TraceError::Other(err.into())) - .and_then(|identity| identity) + receiver + .recv_timeout(self.forceflush_timeout) + .map_err(|_| TraceError::ExportTimedOut(self.forceflush_timeout))? } + /// Shuts down the processor. fn shutdown(&self) -> TraceResult<()> { - let dropped_spans = self.dropped_spans_count.load(Ordering::Relaxed); - let max_queue_size = self.max_queue_size; + let dropped_spans = self.dropped_span_count.load(Ordering::Relaxed); if dropped_spans > 0 { otel_warn!( - name: "BatchSpanProcessor.Shutdown", - dropped_spans = dropped_spans, - max_queue_size = max_queue_size, - message = "Spans were dropped due to a full or closed queue. The count represents the total count of span records dropped in the lifetime of the BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals." + name: "BatchSpanProcessor.LogsDropped", + dropped_span_count = dropped_spans, + message = "Spans were dropped due to a queue being full or other error. The count represents the total count of spans dropped in the lifetime of this BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals." ); } - - let (res_sender, res_receiver) = oneshot::channel(); + if self.is_shutdown.swap(true, Ordering::Relaxed) { + return Err(TraceError::Other("Processor already shutdown".into())); + } + let (sender, receiver) = sync_channel(1); self.message_sender - .try_send(BatchMessage::Shutdown(res_sender)) - .map_err(|err| TraceError::Other(err.into()))?; - - futures_executor::block_on(res_receiver) - .map_err(|err| TraceError::Other(err.into())) - .and_then(|identity| identity) + .try_send(BatchMessage::Shutdown(sender)) + .map_err(|_| TraceError::Other("Failed to send Shutdown message".into()))?; + + let result = receiver + .recv_timeout(self.shutdown_timeout) + .map_err(|_| TraceError::ExportTimedOut(self.shutdown_timeout))?; + if let Some(handle) = self.handle.lock().unwrap().take() { + if let Err(err) = handle.join() { + return Err(TraceError::Other(format!( + "Background thread failed to join during shutdown. This may indicate a panic or unexpected termination: {:?}", + err + ).into())); + } + } + result } + /// Set the resource for the processor. fn set_resource(&mut self, resource: &Resource) { let resource = Arc::new(resource.clone()); let _ = self @@ -308,231 +430,28 @@ impl SpanProcessor for BatchSpanProcessor { } } -/// Messages sent between application thread and batch span processor's work thread. -// In this enum the size difference is not a concern because: -// 1. If we wrap SpanData into a pointer, it will add overhead when processing. -// 2. Most of the messages will be ExportSpan. -#[allow(clippy::large_enum_variant)] -#[derive(Debug)] -enum BatchMessage { - /// Export spans, usually called when span ends - ExportSpan(SpanData), - /// Flush the current buffer to the backend, it can be triggered by - /// pre configured interval or a call to `force_push` function. - Flush(Option>), - /// Shut down the worker thread, push all spans in buffer to the backend. - Shutdown(oneshot::Sender), - /// Set the resource for the exporter. - SetResource(Arc), -} - -struct BatchSpanProcessorInternal { - spans: Vec, - export_tasks: FuturesUnordered>, - runtime: R, - exporter: Box, +/// Builder for `BatchSpanProcessorDedicatedThread`. +#[derive(Debug, Default)] +pub struct BatchSpanProcessorBuilder +where + E: SpanExporter + Send + 'static, +{ + exporter: E, config: BatchConfig, } -impl BatchSpanProcessorInternal { - async fn flush(&mut self, res_channel: Option>) { - let export_task = self.export(); - let task = Box::pin(async move { - let result = export_task.await; - - if let Some(channel) = res_channel { - // If a response channel is provided, attempt to send the export result through it. - if let Err(result) = channel.send(result) { - otel_debug!( - name: "BatchSpanProcessor.Flush.SendResultError", - reason = format!("{:?}", result) - ); - } - } else if let Err(err) = result { - // If no channel is provided and the export operation encountered an error, - // log the error directly here. - // TODO: Consider returning the status instead of logging it. - otel_error!( - name: "BatchSpanProcessor.Flush.ExportError", - reason = format!("{:?}", err), - message = "Failed during the export process" - ); - } - - Ok(()) - }); - - if self.config.max_concurrent_exports == 1 { - let _ = task.await; - } else { - self.export_tasks.push(task); - while self.export_tasks.next().await.is_some() {} - } - } - - /// Process a single message - /// - /// A return value of false indicates shutdown - async fn process_message(&mut self, message: BatchMessage) -> bool { - match message { - // Span has finished, add to buffer of pending spans. - BatchMessage::ExportSpan(span) => { - self.spans.push(span); - - if self.spans.len() == self.config.max_export_batch_size { - // If concurrent exports are saturated, wait for one to complete. - if !self.export_tasks.is_empty() - && self.export_tasks.len() == self.config.max_concurrent_exports - { - self.export_tasks.next().await; - } - - let export_task = self.export(); - let task = async move { - if let Err(err) = export_task.await { - otel_error!( - name: "BatchSpanProcessor.Export.Error", - reason = format!("{}", err) - ); - } - - Ok(()) - }; - // Special case when not using concurrent exports - if self.config.max_concurrent_exports == 1 { - let _ = task.await; - } else { - self.export_tasks.push(Box::pin(task)); - } - } - } - // Span batch interval time reached or a force flush has been invoked, export - // current spans. - // - // This is a hint to ensure that any tasks associated with Spans for which the - // SpanProcessor had already received events prior to the call to ForceFlush - // SHOULD be completed as soon as possible, preferably before returning from - // this method. - // - // In particular, if any SpanProcessor has any associated exporter, it SHOULD - // try to call the exporter's Export with all spans for which this was not - // already done and then invoke ForceFlush on it. The built-in SpanProcessors - // MUST do so. If a timeout is specified (see below), the SpanProcessor MUST - // prioritize honoring the timeout over finishing all calls. It MAY skip or - // abort some or all Export or ForceFlush calls it has made to achieve this - // goal. - // - // NB: `force_flush` is not currently implemented on exporters; the equivalent - // would be waiting for exporter tasks to complete. In the case of - // channel-coupled exporters, they will need a `force_flush` implementation to - // properly block. - BatchMessage::Flush(res_channel) => { - self.flush(res_channel).await; - } - // Stream has terminated or processor is shutdown, return to finish execution. - BatchMessage::Shutdown(ch) => { - self.flush(Some(ch)).await; - self.exporter.shutdown(); - return false; - } - // propagate the resource - BatchMessage::SetResource(resource) => { - self.exporter.set_resource(&resource); - } - } - true - } - - fn export(&mut self) -> BoxFuture<'static, ExportResult> { - // Batch size check for flush / shutdown. Those methods may be called - // when there's no work to do. - if self.spans.is_empty() { - return Box::pin(future::ready(Ok(()))); - } - - let export = self.exporter.export(self.spans.split_off(0)); - let timeout = self.runtime.delay(self.config.max_export_timeout); - let time_out = self.config.max_export_timeout; - - Box::pin(async move { - match future::select(export, timeout).await { - Either::Left((export_res, _)) => export_res, - Either::Right((_, _)) => ExportResult::Err(TraceError::ExportTimedOut(time_out)), - } - }) - } - - async fn run(mut self, mut messages: impl FusedStream + Unpin) { - loop { - select! { - // FuturesUnordered implements Fuse intelligently such that it - // will become eligible again once new tasks are added to it. - _ = self.export_tasks.next() => { - // An export task completed; do we need to do anything with it? - }, - message = messages.next() => { - match message { - Some(message) => { - if !self.process_message(message).await { - break; - } - }, - None => break, - } - }, - } - } - } -} - -impl BatchSpanProcessor { - pub(crate) fn new(exporter: Box, config: BatchConfig, runtime: R) -> Self { - let (message_sender, message_receiver) = - runtime.batch_message_channel(config.max_queue_size); - - let max_queue_size = config.max_queue_size; - - let inner_runtime = runtime.clone(); - // Spawn worker process via user-defined spawn function. - runtime.spawn(Box::pin(async move { - // Timer will take a reference to the current runtime, so its important we do this within the - // runtime.spawn() - let ticker = inner_runtime - .interval(config.scheduled_delay) - .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. - .map(|_| BatchMessage::Flush(None)); - let timeout_runtime = inner_runtime.clone(); - - let messages = Box::pin(stream::select(message_receiver, ticker)); - let processor = BatchSpanProcessorInternal { - spans: Vec::new(), - export_tasks: FuturesUnordered::new(), - runtime: timeout_runtime, - config, - exporter, - }; - - processor.run(messages).await - })); - - // Return batch processor with link to worker - BatchSpanProcessor { - message_sender, - dropped_spans_count: AtomicUsize::new(0), - max_queue_size, - } +impl BatchSpanProcessorBuilder +where + E: SpanExporter + Send + 'static, +{ + /// Set the BatchConfig for [BatchSpanProcessorBuilder] + pub fn with_batch_config(self, config: BatchConfig) -> Self { + BatchSpanProcessorBuilder { config, ..self } } - /// Create a new batch processor builder - pub fn builder(exporter: E, runtime: R) -> BatchSpanProcessorBuilder - where - E: SpanExporter, - { - BatchSpanProcessorBuilder { - exporter, - config: Default::default(), - runtime, - } + /// Build a new instance of `BatchSpanProcessor`. + pub fn build(self) -> BatchSpanProcessor { + BatchSpanProcessor::new(self.exporter, self.config) } } @@ -542,27 +461,30 @@ impl BatchSpanProcessor { pub struct BatchConfig { /// The maximum queue size to buffer spans for delayed processing. If the /// queue gets full it drops the spans. The default value of is 2048. - max_queue_size: usize, + pub(crate) max_queue_size: usize, /// The delay interval in milliseconds between two consecutive processing /// of batches. The default value is 5 seconds. - scheduled_delay: Duration, + pub(crate) scheduled_delay: Duration, + #[allow(dead_code)] /// The maximum number of spans to process in a single batch. If there are /// more than one batch worth of spans then it processes multiple batches /// of spans one batch after the other without any delay. The default value /// is 512. - max_export_batch_size: usize, + pub(crate) max_export_batch_size: usize, + #[allow(dead_code)] /// The maximum duration to export a batch of data. - max_export_timeout: Duration, + pub(crate) max_export_timeout: Duration, + #[allow(dead_code)] /// Maximum number of concurrent exports /// /// Limits the number of spawned tasks for exports and thus memory consumed /// by an exporter. A value of 1 will cause exports to be performed /// synchronously on the BatchSpanProcessor task. - max_concurrent_exports: usize, + pub(crate) max_concurrent_exports: usize, } impl Default for BatchConfig { @@ -711,31 +633,6 @@ impl BatchConfigBuilder { } } -/// A builder for creating [`BatchSpanProcessor`] instances. -/// -#[derive(Debug)] -pub struct BatchSpanProcessorBuilder { - exporter: E, - config: BatchConfig, - runtime: R, -} - -impl BatchSpanProcessorBuilder -where - E: SpanExporter + 'static, - R: RuntimeChannel, -{ - /// Set the BatchConfig for [BatchSpanProcessorBuilder] - pub fn with_batch_config(self, config: BatchConfig) -> Self { - BatchSpanProcessorBuilder { config, ..self } - } - - /// Build a batch processor - pub fn build(self) -> BatchSpanProcessor { - BatchSpanProcessor::new(Box::new(self.exporter), self.config, self.runtime) - } -} - #[cfg(all(test, feature = "testing", feature = "trace"))] mod tests { // cargo test trace::span_processor::tests:: --features=testing @@ -745,10 +642,7 @@ mod tests { OTEL_BSP_SCHEDULE_DELAY, OTEL_BSP_SCHEDULE_DELAY_DEFAULT, }; use crate::export::trace::{ExportResult, SpanData, SpanExporter}; - use crate::runtime; - use crate::testing::trace::{ - new_test_export_span_data, new_tokio_test_exporter, InMemorySpanExporterBuilder, - }; + use crate::testing::trace::{new_test_export_span_data, InMemorySpanExporterBuilder}; use crate::trace::span_processor::{ OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, OTEL_BSP_MAX_CONCURRENT_EXPORTS, OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT, @@ -756,7 +650,6 @@ mod tests { use crate::trace::{BatchConfig, BatchConfigBuilder, SpanEvents, SpanLinks}; use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status}; use std::fmt::Debug; - use std::future::Future; use std::time::Duration; #[test] @@ -903,187 +796,344 @@ mod tests { assert_eq!(batch.max_queue_size, 10); } - #[test] - fn test_build_batch_span_processor_builder() { - let mut env_vars = vec![ - (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("500")), - (OTEL_BSP_SCHEDULE_DELAY, Some("I am not number")), - (OTEL_BSP_EXPORT_TIMEOUT, Some("2046")), - ]; - temp_env::with_vars(env_vars.clone(), || { - let builder = BatchSpanProcessor::builder( - InMemorySpanExporterBuilder::new().build(), - runtime::Tokio, - ); - // export batch size cannot exceed max queue size - assert_eq!(builder.config.max_export_batch_size, 500); - assert_eq!( - builder.config.scheduled_delay, - Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT) - ); - assert_eq!( - builder.config.max_queue_size, - OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT - ); - assert_eq!( - builder.config.max_export_timeout, - Duration::from_millis(2046) - ); - }); + // Helper function to create a default test span + fn create_test_span(name: &str) -> SpanData { + SpanData { + span_context: SpanContext::empty_context(), + parent_span_id: SpanId::INVALID, + span_kind: SpanKind::Internal, + name: name.to_string().into(), + start_time: opentelemetry::time::now(), + end_time: opentelemetry::time::now(), + attributes: Vec::new(), + dropped_attributes_count: 0, + events: SpanEvents::default(), + links: SpanLinks::default(), + status: Status::Unset, + instrumentation_scope: Default::default(), + } + } - env_vars.push((OTEL_BSP_MAX_QUEUE_SIZE, Some("120"))); + use crate::Resource; + use futures_util::future::BoxFuture; + use futures_util::FutureExt; + use opentelemetry::{Key, KeyValue, Value}; + use std::sync::{atomic::Ordering, Arc, Mutex}; + + // Mock exporter to test functionality + #[derive(Debug)] + struct MockSpanExporter { + exported_spans: Arc>>, + exported_resource: Arc>>, + } - temp_env::with_vars(env_vars, || { - let builder = BatchSpanProcessor::builder( - InMemorySpanExporterBuilder::new().build(), - runtime::Tokio, - ); - assert_eq!(builder.config.max_export_batch_size, 120); - assert_eq!(builder.config.max_queue_size, 120); - }); + impl MockSpanExporter { + fn new() -> Self { + Self { + exported_spans: Arc::new(Mutex::new(Vec::new())), + exported_resource: Arc::new(Mutex::new(None)), + } + } } - #[tokio::test] - async fn test_batch_span_processor() { - let (exporter, mut export_receiver, _shutdown_receiver) = new_tokio_test_exporter(); - let config = BatchConfig { - scheduled_delay: Duration::from_secs(60 * 60 * 24), // set the tick to 24 hours so we know the span must be exported via force_flush - ..Default::default() - }; - let processor = - BatchSpanProcessor::new(Box::new(exporter), config, runtime::TokioCurrentThread); - let handle = tokio::spawn(async move { - loop { - if let Some(span) = export_receiver.recv().await { - assert_eq!(span.span_context, new_test_export_span_data().span_context); - break; - } + impl SpanExporter for MockSpanExporter { + fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult> { + let exported_spans = self.exported_spans.clone(); + async move { + exported_spans.lock().unwrap().extend(batch); + Ok(()) } - }); - tokio::time::sleep(Duration::from_secs(1)).await; // skip the first - processor.on_end(new_test_export_span_data()); - let flush_res = processor.force_flush(); - assert!(flush_res.is_ok()); - let _shutdown_result = processor.shutdown(); + .boxed() + } - assert!( - tokio::time::timeout(Duration::from_secs(5), handle) - .await - .is_ok(), - "timed out in 5 seconds. force_flush may not export any data when called" - ); + fn shutdown(&mut self) {} + fn set_resource(&mut self, resource: &Resource) { + let mut exported_resource = self.exported_resource.lock().unwrap(); + *exported_resource = Some(resource.clone()); + } } - struct BlockingExporter { - delay_for: Duration, - delay_fn: D, - } + #[test] + fn batchspanprocessor_handles_on_end() { + let exporter = MockSpanExporter::new(); + let exporter_shared = exporter.exported_spans.clone(); + let config = BatchConfigBuilder::default() + .with_max_queue_size(10) + .with_max_export_batch_size(10) + .with_scheduled_delay(Duration::from_secs(5)) + .with_max_export_timeout(Duration::from_secs(2)) + .build(); + let processor = BatchSpanProcessor::new(exporter, config); - impl Debug for BlockingExporter - where - D: Fn(Duration) -> DS + 'static + Send + Sync, - DS: Future + Send + Sync + 'static, - { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str("blocking exporter for testing") - } + let test_span = create_test_span("test_span"); + processor.on_end(test_span.clone()); + + // Wait for flush interval to ensure the span is processed + std::thread::sleep(Duration::from_secs(6)); + + let exported_spans = exporter_shared.lock().unwrap(); + assert_eq!(exported_spans.len(), 1); + assert_eq!(exported_spans[0].name, "test_span"); } - impl SpanExporter for BlockingExporter - where - D: Fn(Duration) -> DS + 'static + Send + Sync, - DS: Future + Send + Sync + 'static, - { - fn export( - &mut self, - _batch: Vec, - ) -> futures_util::future::BoxFuture<'static, ExportResult> { - use futures_util::FutureExt; - Box::pin((self.delay_fn)(self.delay_for).map(|_| Ok(()))) - } + #[test] + fn batchspanprocessor_force_flush() { + let exporter = MockSpanExporter::new(); + let exporter_shared = exporter.exported_spans.clone(); // Shared access to verify exported spans + let config = BatchConfigBuilder::default() + .with_max_queue_size(10) + .with_max_export_batch_size(10) + .with_scheduled_delay(Duration::from_secs(5)) + .with_max_export_timeout(Duration::from_secs(2)) + .build(); + let processor = BatchSpanProcessor::new(exporter, config); + + // Create a test span and send it to the processor + let test_span = create_test_span("force_flush_span"); + processor.on_end(test_span.clone()); + + // Call force_flush to immediately export the spans + let flush_result = processor.force_flush(); + assert!(flush_result.is_ok(), "Force flush failed unexpectedly"); + + // Verify the exported spans in the mock exporter + let exported_spans = exporter_shared.lock().unwrap(); + assert_eq!( + exported_spans.len(), + 1, + "Unexpected number of exported spans" + ); + assert_eq!(exported_spans[0].name, "force_flush_span"); } #[test] - fn test_timeout_tokio_timeout() { - // If time_out is true, then we ask exporter to block for 60s and set timeout to 5s. - // If time_out is false, then we ask the exporter to block for 5s and set timeout to 60s. - // Either way, the test should be finished within 5s. - let runtime = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap(); - runtime.block_on(timeout_test_tokio(true)); + fn batchspanprocessor_shutdown() { + let exporter = MockSpanExporter::new(); + let exporter_shared = exporter.exported_spans.clone(); // Shared access to verify exported spans + let config = BatchConfigBuilder::default() + .with_max_queue_size(10) + .with_max_export_batch_size(10) + .with_scheduled_delay(Duration::from_secs(5)) + .with_max_export_timeout(Duration::from_secs(2)) + .build(); + let processor = BatchSpanProcessor::new(exporter, config); + + // Create a test span and send it to the processor + let test_span = create_test_span("shutdown_span"); + processor.on_end(test_span.clone()); + + // Call shutdown to flush and export all pending spans + let shutdown_result = processor.shutdown(); + assert!(shutdown_result.is_ok(), "Shutdown failed unexpectedly"); + + // Verify the exported spans in the mock exporter + let exported_spans = exporter_shared.lock().unwrap(); + assert_eq!( + exported_spans.len(), + 1, + "Unexpected number of exported spans" + ); + assert_eq!(exported_spans[0].name, "shutdown_span"); + + // Ensure further calls to shutdown are idempotent + let second_shutdown_result = processor.shutdown(); + assert!( + second_shutdown_result.is_err(), + "Shutdown should fail when called a second time" + ); } #[test] - fn test_timeout_tokio_not_timeout() { - let runtime = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap(); - runtime.block_on(timeout_test_tokio(false)); + fn batchspanprocessor_handles_dropped_spans() { + let exporter = MockSpanExporter::new(); + let exporter_shared = exporter.exported_spans.clone(); // Shared access to verify exported spans + let config = BatchConfigBuilder::default() + .with_max_queue_size(2) // Small queue size to test span dropping + .with_scheduled_delay(Duration::from_secs(5)) + .with_max_export_timeout(Duration::from_secs(2)) + .build(); + let processor = BatchSpanProcessor::new(exporter, config); + + // Create test spans and send them to the processor + let span1 = create_test_span("span1"); + let span2 = create_test_span("span2"); + let span3 = create_test_span("span3"); // This span should be dropped + + processor.on_end(span1.clone()); + processor.on_end(span2.clone()); + processor.on_end(span3.clone()); // This span exceeds the queue size + + // Wait for the scheduled delay to expire + std::thread::sleep(Duration::from_secs(3)); + + let exported_spans = exporter_shared.lock().unwrap(); + + // Verify that only the first two spans are exported + assert_eq!( + exported_spans.len(), + 2, + "Unexpected number of exported spans" + ); + assert!(exported_spans.iter().any(|s| s.name == "span1")); + assert!(exported_spans.iter().any(|s| s.name == "span2")); + + // Ensure the third span is dropped + assert!( + !exported_spans.iter().any(|s| s.name == "span3"), + "Span3 should have been dropped" + ); + + // Verify dropped spans count (if accessible in your implementation) + let dropped_count = processor.dropped_span_count.load(Ordering::Relaxed); + assert_eq!(dropped_count, 1, "Unexpected number of dropped spans"); } #[test] - #[cfg(feature = "rt-async-std")] - fn test_timeout_async_std_timeout() { - async_std::task::block_on(timeout_test_std_async(true)); + fn validate_span_attributes_exported_correctly() { + let exporter = MockSpanExporter::new(); + let exporter_shared = exporter.exported_spans.clone(); + let config = BatchConfigBuilder::default().build(); + let processor = BatchSpanProcessor::new(exporter, config); + + // Create a span with attributes + let mut span_data = create_test_span("attribute_validation"); + span_data.attributes = vec![ + KeyValue::new("key1", "value1"), + KeyValue::new("key2", "value2"), + ]; + processor.on_end(span_data.clone()); + + // Force flush to export the span + let _ = processor.force_flush(); + + // Validate the exported attributes + let exported_spans = exporter_shared.lock().unwrap(); + assert_eq!(exported_spans.len(), 1); + let exported_span = &exported_spans[0]; + assert!(exported_span + .attributes + .contains(&KeyValue::new("key1", "value1"))); + assert!(exported_span + .attributes + .contains(&KeyValue::new("key2", "value2"))); } #[test] - #[cfg(feature = "rt-async-std")] - fn test_timeout_async_std_not_timeout() { - async_std::task::block_on(timeout_test_std_async(false)); + fn batchspanprocessor_sets_and_exports_with_resource() { + let exporter = MockSpanExporter::new(); + let exporter_shared = exporter.exported_spans.clone(); + let resource_shared = exporter.exported_resource.clone(); + let config = BatchConfigBuilder::default().build(); + let mut processor = BatchSpanProcessor::new(exporter, config); + + // Set a resource for the processor + let resource = Resource::new(vec![KeyValue::new("service.name", "test_service")]); + processor.set_resource(&resource); + + // Create a span and send it to the processor + let test_span = create_test_span("resource_test"); + processor.on_end(test_span.clone()); + + // Force flush to ensure the span is exported + let _ = processor.force_flush(); + + // Validate spans are exported + let exported_spans = exporter_shared.lock().unwrap(); + assert_eq!(exported_spans.len(), 1); + + // Validate the resource is correctly set in the exporter + let exported_resource = resource_shared.lock().unwrap(); + assert!(exported_resource.is_some()); + assert_eq!( + exported_resource + .as_ref() + .unwrap() + .get(Key::new("service.name")), + Some(Value::from("test_service")) + ); } - // If the time_out is true, then the result suppose to ended with timeout. - // otherwise the exporter should be able to export within time out duration. - #[cfg(feature = "rt-async-std")] - async fn timeout_test_std_async(time_out: bool) { - let config = BatchConfig { - max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }), - scheduled_delay: Duration::from_secs(60 * 60 * 24), // set the tick to 24 hours so we know the span must be exported via force_flush - ..Default::default() - }; - let exporter = BlockingExporter { - delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }), - delay_fn: async_std::task::sleep, - }; - let processor = BatchSpanProcessor::new(Box::new(exporter), config, runtime::AsyncStd); - processor.on_end(new_test_export_span_data()); - let flush_res = processor.force_flush(); - if time_out { - assert!(flush_res.is_err()); - } else { - assert!(flush_res.is_ok()); + #[tokio::test(flavor = "current_thread")] + async fn test_batch_processor_current_thread_runtime() { + let exporter = MockSpanExporter::new(); + let exporter_shared = exporter.exported_spans.clone(); + + let config = BatchConfigBuilder::default() + .with_max_queue_size(5) + .with_max_export_batch_size(3) + .with_scheduled_delay(Duration::from_millis(50)) + .build(); + + let processor = BatchSpanProcessor::new(exporter, config); + + for _ in 0..4 { + let span = new_test_export_span_data(); + processor.on_end(span); } - let shutdown_res = processor.shutdown(); - assert!(shutdown_res.is_ok()); + + tokio::time::sleep(Duration::from_millis(200)).await; + + let exported_spans = exporter_shared.lock().unwrap(); + assert_eq!(exported_spans.len(), 4); } - // If the time_out is true, then the result suppose to ended with timeout. - // otherwise the exporter should be able to export within time out duration. - async fn timeout_test_tokio(time_out: bool) { - let config = BatchConfig { - max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }), - scheduled_delay: Duration::from_secs(60 * 60 * 24), // set the tick to 24 hours so we know the span must be exported via force_flush, - ..Default::default() - }; - let exporter = BlockingExporter { - delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }), - delay_fn: tokio::time::sleep, - }; - let processor = - BatchSpanProcessor::new(Box::new(exporter), config, runtime::TokioCurrentThread); - tokio::time::sleep(Duration::from_secs(1)).await; // skip the first - processor.on_end(new_test_export_span_data()); - let flush_res = processor.force_flush(); - if time_out { - assert!(flush_res.is_err()); - } else { - assert!(flush_res.is_ok()); + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn test_batch_processor_multi_thread_count_1_runtime() { + let exporter = MockSpanExporter::new(); + let exporter_shared = exporter.exported_spans.clone(); + + let config = BatchConfigBuilder::default() + .with_max_queue_size(5) + .with_max_export_batch_size(3) + .with_scheduled_delay(Duration::from_millis(50)) + .build(); + + let processor = BatchSpanProcessor::new(exporter, config); + + for _ in 0..4 { + let span = new_test_export_span_data(); + processor.on_end(span); } - let shutdown_res = processor.shutdown(); - assert!(shutdown_res.is_ok()); + + tokio::time::sleep(Duration::from_millis(200)).await; + + let exported_spans = exporter_shared.lock().unwrap(); + assert_eq!(exported_spans.len(), 4); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_batch_processor_multi_thread() { + let exporter = MockSpanExporter::new(); + let exporter_shared = exporter.exported_spans.clone(); + + let config = BatchConfigBuilder::default() + .with_max_queue_size(20) + .with_max_export_batch_size(5) + .with_scheduled_delay(Duration::from_millis(50)) + .build(); + + // Create the processor with the thread-safe exporter + let processor = Arc::new(BatchSpanProcessor::new(exporter, config)); + + let mut handles = vec![]; + for _ in 0..10 { + let processor_clone = Arc::clone(&processor); + let handle = tokio::spawn(async move { + let span = new_test_export_span_data(); + processor_clone.on_end(span); + }); + handles.push(handle); + } + + for handle in handles { + handle.await.unwrap(); + } + + // Allow time for batching and export + tokio::time::sleep(Duration::from_millis(200)).await; + + // Verify exported spans + let exported_spans = exporter_shared.lock().unwrap(); + assert_eq!(exported_spans.len(), 10); } } diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs new file mode 100644 index 0000000000..c3c241c776 --- /dev/null +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -0,0 +1,619 @@ +use crate::export::trace::{ExportResult, SpanData, SpanExporter}; +use crate::resource::Resource; +use crate::runtime::{RuntimeChannel, TrySend}; +use crate::trace::BatchConfig; +use crate::trace::Span; +use crate::trace::SpanProcessor; +use futures_channel::oneshot; +use futures_util::{ + future::{self, BoxFuture, Either}, + select, + stream::{self, FusedStream, FuturesUnordered}, + StreamExt as _, +}; +use opentelemetry::{otel_debug, otel_error, otel_warn}; +use opentelemetry::{ + trace::{TraceError, TraceResult}, + Context, +}; +use std::fmt; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports +/// them at a preconfigured interval. +/// +/// Batch span processors need to run a background task to collect and send +/// spans. Different runtimes need different ways to handle the background task. +/// +/// Note: Configuring an opentelemetry `Runtime` that's not compatible with the +/// underlying runtime can cause deadlocks (see tokio section). +/// +/// ### Use with Tokio +/// +/// Tokio currently offers two different schedulers. One is +/// `current_thread_scheduler`, the other is `multiple_thread_scheduler`. Both +/// of them default to use batch span processors to install span exporters. +/// +/// Tokio's `current_thread_scheduler` can cause the program to hang forever if +/// blocking work is scheduled with other tasks in the same runtime. To avoid +/// this, be sure to enable the `rt-tokio-current-thread` feature in this crate +/// if you are using that runtime (e.g. users of actix-web), and blocking tasks +/// will then be scheduled on a different thread. +/// +/// # Examples +/// +/// This processor can be configured with an [`executor`] of your choice to +/// batch and upload spans asynchronously when they end. If you have added a +/// library like [`tokio`] or [`async-std`], you can pass in their respective +/// `spawn` and `interval` functions to have batching performed in those +/// contexts. +/// +/// ``` +/// # #[cfg(feature="tokio")] +/// # { +/// use opentelemetry::global; +/// use opentelemetry_sdk::{runtime, testing::trace::NoopSpanExporter, trace}; +/// use opentelemetry_sdk::trace::BatchConfigBuilder; +/// use std::time::Duration; +/// use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor; +/// +/// #[tokio::main] +/// async fn main() { +/// // Configure your preferred exporter +/// let exporter = NoopSpanExporter::new(); +/// +/// // Create a batch span processor using an exporter and a runtime +/// let batch = BatchSpanProcessor::builder(exporter, runtime::Tokio) +/// .with_batch_config(BatchConfigBuilder::default().with_max_queue_size(4096).build()) +/// .build(); +/// +/// // Then use the `with_batch_exporter` method to have the provider export spans in batches. +/// let provider = trace::TracerProvider::builder() +/// .with_span_processor(batch) +/// .build(); +/// +/// let _ = global::set_tracer_provider(provider); +/// } +/// # } +/// ``` +/// +/// [`executor`]: https://docs.rs/futures/0.3/futures/executor/index.html +/// [`tokio`]: https://tokio.rs +/// [`async-std`]: https://async.rs +pub struct BatchSpanProcessor { + message_sender: R::Sender, + + // Track dropped spans + dropped_spans_count: AtomicUsize, + + // Track the maximum queue size that was configured for this processor + max_queue_size: usize, +} + +impl fmt::Debug for BatchSpanProcessor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BatchSpanProcessor") + .field("message_sender", &self.message_sender) + .finish() + } +} + +impl SpanProcessor for BatchSpanProcessor { + fn on_start(&self, _span: &mut Span, _cx: &Context) { + // Ignored + } + + fn on_end(&self, span: SpanData) { + if !span.span_context.is_sampled() { + return; + } + + let result = self.message_sender.try_send(BatchMessage::ExportSpan(span)); + + // If the queue is full, and we can't buffer a span + if result.is_err() { + // Increment the number of dropped spans. If this is the first time we've had to drop, + // emit a warning. + if self.dropped_spans_count.fetch_add(1, Ordering::Relaxed) == 0 { + otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted", + message = "Beginning to drop span messages due to full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total spans dropped."); + } + } + } + + fn force_flush(&self) -> TraceResult<()> { + let (res_sender, res_receiver) = oneshot::channel(); + self.message_sender + .try_send(BatchMessage::Flush(Some(res_sender))) + .map_err(|err| TraceError::Other(err.into()))?; + + futures_executor::block_on(res_receiver) + .map_err(|err| TraceError::Other(err.into())) + .and_then(|identity| identity) + } + + fn shutdown(&self) -> TraceResult<()> { + let dropped_spans = self.dropped_spans_count.load(Ordering::Relaxed); + let max_queue_size = self.max_queue_size; + if dropped_spans > 0 { + otel_warn!( + name: "BatchSpanProcessor.Shutdown", + dropped_spans = dropped_spans, + max_queue_size = max_queue_size, + message = "Spans were dropped due to a full or closed queue. The count represents the total count of span records dropped in the lifetime of the BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals." + ); + } + + let (res_sender, res_receiver) = oneshot::channel(); + self.message_sender + .try_send(BatchMessage::Shutdown(res_sender)) + .map_err(|err| TraceError::Other(err.into()))?; + + futures_executor::block_on(res_receiver) + .map_err(|err| TraceError::Other(err.into())) + .and_then(|identity| identity) + } + + fn set_resource(&mut self, resource: &Resource) { + let resource = Arc::new(resource.clone()); + let _ = self + .message_sender + .try_send(BatchMessage::SetResource(resource)); + } +} + +/// Messages sent between application thread and batch span processor's work thread. +// In this enum the size difference is not a concern because: +// 1. If we wrap SpanData into a pointer, it will add overhead when processing. +// 2. Most of the messages will be ExportSpan. +#[allow(clippy::large_enum_variant)] +#[derive(Debug)] +enum BatchMessage { + /// Export spans, usually called when span ends + ExportSpan(SpanData), + /// Flush the current buffer to the backend, it can be triggered by + /// pre configured interval or a call to `force_push` function. + Flush(Option>), + /// Shut down the worker thread, push all spans in buffer to the backend. + Shutdown(oneshot::Sender), + /// Set the resource for the exporter. + SetResource(Arc), +} + +struct BatchSpanProcessorInternal { + spans: Vec, + export_tasks: FuturesUnordered>, + runtime: R, + exporter: Box, + config: BatchConfig, +} + +impl BatchSpanProcessorInternal { + async fn flush(&mut self, res_channel: Option>) { + let export_task = self.export(); + let task = Box::pin(async move { + let result = export_task.await; + + if let Some(channel) = res_channel { + // If a response channel is provided, attempt to send the export result through it. + if let Err(result) = channel.send(result) { + otel_debug!( + name: "BatchSpanProcessor.Flush.SendResultError", + reason = format!("{:?}", result) + ); + } + } else if let Err(err) = result { + // If no channel is provided and the export operation encountered an error, + // log the error directly here. + // TODO: Consider returning the status instead of logging it. + otel_error!( + name: "BatchSpanProcessor.Flush.ExportError", + reason = format!("{:?}", err), + message = "Failed during the export process" + ); + } + + Ok(()) + }); + + if self.config.max_concurrent_exports == 1 { + let _ = task.await; + } else { + self.export_tasks.push(task); + while self.export_tasks.next().await.is_some() {} + } + } + + /// Process a single message + /// + /// A return value of false indicates shutdown + async fn process_message(&mut self, message: BatchMessage) -> bool { + match message { + // Span has finished, add to buffer of pending spans. + BatchMessage::ExportSpan(span) => { + self.spans.push(span); + + if self.spans.len() == self.config.max_export_batch_size { + // If concurrent exports are saturated, wait for one to complete. + if !self.export_tasks.is_empty() + && self.export_tasks.len() == self.config.max_concurrent_exports + { + self.export_tasks.next().await; + } + + let export_task = self.export(); + let task = async move { + if let Err(err) = export_task.await { + otel_error!( + name: "BatchSpanProcessor.Export.Error", + reason = format!("{}", err) + ); + } + + Ok(()) + }; + // Special case when not using concurrent exports + if self.config.max_concurrent_exports == 1 { + let _ = task.await; + } else { + self.export_tasks.push(Box::pin(task)); + } + } + } + // Span batch interval time reached or a force flush has been invoked, export + // current spans. + // + // This is a hint to ensure that any tasks associated with Spans for which the + // SpanProcessor had already received events prior to the call to ForceFlush + // SHOULD be completed as soon as possible, preferably before returning from + // this method. + // + // In particular, if any SpanProcessor has any associated exporter, it SHOULD + // try to call the exporter's Export with all spans for which this was not + // already done and then invoke ForceFlush on it. The built-in SpanProcessors + // MUST do so. If a timeout is specified (see below), the SpanProcessor MUST + // prioritize honoring the timeout over finishing all calls. It MAY skip or + // abort some or all Export or ForceFlush calls it has made to achieve this + // goal. + // + // NB: `force_flush` is not currently implemented on exporters; the equivalent + // would be waiting for exporter tasks to complete. In the case of + // channel-coupled exporters, they will need a `force_flush` implementation to + // properly block. + BatchMessage::Flush(res_channel) => { + self.flush(res_channel).await; + } + // Stream has terminated or processor is shutdown, return to finish execution. + BatchMessage::Shutdown(ch) => { + self.flush(Some(ch)).await; + self.exporter.shutdown(); + return false; + } + // propagate the resource + BatchMessage::SetResource(resource) => { + self.exporter.set_resource(&resource); + } + } + true + } + + fn export(&mut self) -> BoxFuture<'static, ExportResult> { + // Batch size check for flush / shutdown. Those methods may be called + // when there's no work to do. + if self.spans.is_empty() { + return Box::pin(future::ready(Ok(()))); + } + + let export = self.exporter.export(self.spans.split_off(0)); + let timeout = self.runtime.delay(self.config.max_export_timeout); + let time_out = self.config.max_export_timeout; + + Box::pin(async move { + match future::select(export, timeout).await { + Either::Left((export_res, _)) => export_res, + Either::Right((_, _)) => ExportResult::Err(TraceError::ExportTimedOut(time_out)), + } + }) + } + + async fn run(mut self, mut messages: impl FusedStream + Unpin) { + loop { + select! { + // FuturesUnordered implements Fuse intelligently such that it + // will become eligible again once new tasks are added to it. + _ = self.export_tasks.next() => { + // An export task completed; do we need to do anything with it? + }, + message = messages.next() => { + match message { + Some(message) => { + if !self.process_message(message).await { + break; + } + }, + None => break, + } + }, + } + } + } +} + +impl BatchSpanProcessor { + pub(crate) fn new(exporter: Box, config: BatchConfig, runtime: R) -> Self { + let (message_sender, message_receiver) = + runtime.batch_message_channel(config.max_queue_size); + + let max_queue_size = config.max_queue_size; + + let inner_runtime = runtime.clone(); + // Spawn worker process via user-defined spawn function. + runtime.spawn(Box::pin(async move { + // Timer will take a reference to the current runtime, so its important we do this within the + // runtime.spawn() + let ticker = inner_runtime + .interval(config.scheduled_delay) + .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. + .map(|_| BatchMessage::Flush(None)); + let timeout_runtime = inner_runtime.clone(); + + let messages = Box::pin(stream::select(message_receiver, ticker)); + let processor = BatchSpanProcessorInternal { + spans: Vec::new(), + export_tasks: FuturesUnordered::new(), + runtime: timeout_runtime, + config, + exporter, + }; + + processor.run(messages).await + })); + + // Return batch processor with link to worker + BatchSpanProcessor { + message_sender, + dropped_spans_count: AtomicUsize::new(0), + max_queue_size, + } + } + + /// Create a new batch processor builder + pub fn builder(exporter: E, runtime: R) -> BatchSpanProcessorBuilder + where + E: SpanExporter, + { + BatchSpanProcessorBuilder { + exporter, + config: Default::default(), + runtime, + } + } +} + +/// A builder for creating [`BatchSpanProcessor`] instances. +/// +#[derive(Debug)] +pub struct BatchSpanProcessorBuilder { + exporter: E, + config: BatchConfig, + runtime: R, +} + +impl BatchSpanProcessorBuilder +where + E: SpanExporter + 'static, + R: RuntimeChannel, +{ + /// Set the BatchConfig for [BatchSpanProcessorBuilder] + pub fn with_batch_config(self, config: BatchConfig) -> Self { + BatchSpanProcessorBuilder { config, ..self } + } + + /// Build a batch processor + pub fn build(self) -> BatchSpanProcessor { + BatchSpanProcessor::new(Box::new(self.exporter), self.config, self.runtime) + } +} + +#[cfg(all(test, feature = "testing", feature = "trace"))] +mod tests { + // cargo test trace::span_processor::tests:: --features=testing + use super::{BatchSpanProcessor, SpanProcessor}; + use crate::export::trace::{ExportResult, SpanData, SpanExporter}; + use crate::runtime; + use crate::testing::trace::{ + new_test_export_span_data, new_tokio_test_exporter, InMemorySpanExporterBuilder, + }; + use crate::trace::span_processor::{ + OTEL_BSP_EXPORT_TIMEOUT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, + OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BSP_SCHEDULE_DELAY, OTEL_BSP_SCHEDULE_DELAY_DEFAULT, + }; + use crate::trace::{BatchConfig, BatchConfigBuilder}; + use futures_util::Future; + use std::fmt::Debug; + use std::time::Duration; + + struct BlockingExporter { + delay_for: Duration, + delay_fn: D, + } + + impl Debug for BlockingExporter + where + D: Fn(Duration) -> DS + 'static + Send + Sync, + DS: Future + Send + Sync + 'static, + { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("blocking exporter for testing") + } + } + + impl SpanExporter for BlockingExporter + where + D: Fn(Duration) -> DS + 'static + Send + Sync, + DS: Future + Send + Sync + 'static, + { + fn export( + &mut self, + _batch: Vec, + ) -> futures_util::future::BoxFuture<'static, ExportResult> { + use futures_util::FutureExt; + Box::pin((self.delay_fn)(self.delay_for).map(|_| Ok(()))) + } + } + + #[test] + fn test_build_batch_span_processor_builder() { + let mut env_vars = vec![ + (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("500")), + (OTEL_BSP_SCHEDULE_DELAY, Some("I am not number")), + (OTEL_BSP_EXPORT_TIMEOUT, Some("2046")), + ]; + temp_env::with_vars(env_vars.clone(), || { + let builder = BatchSpanProcessor::builder( + InMemorySpanExporterBuilder::new().build(), + runtime::Tokio, + ); + // export batch size cannot exceed max queue size + assert_eq!(builder.config.max_export_batch_size, 500); + assert_eq!( + builder.config.scheduled_delay, + Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT) + ); + assert_eq!( + builder.config.max_queue_size, + OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT + ); + assert_eq!( + builder.config.max_export_timeout, + Duration::from_millis(2046) + ); + }); + + env_vars.push((OTEL_BSP_MAX_QUEUE_SIZE, Some("120"))); + + temp_env::with_vars(env_vars, || { + let builder = BatchSpanProcessor::builder( + InMemorySpanExporterBuilder::new().build(), + runtime::Tokio, + ); + assert_eq!(builder.config.max_export_batch_size, 120); + assert_eq!(builder.config.max_queue_size, 120); + }); + } + + #[tokio::test] + async fn test_batch_span_processor() { + let (exporter, mut export_receiver, _shutdown_receiver) = new_tokio_test_exporter(); + let config = BatchConfigBuilder::default() + .with_scheduled_delay(Duration::from_secs(60 * 60 * 24)) // set the tick to 24 hours so we know the span must be exported via force_flush + .build(); + let processor = + BatchSpanProcessor::new(Box::new(exporter), config, runtime::TokioCurrentThread); + let handle = tokio::spawn(async move { + loop { + if let Some(span) = export_receiver.recv().await { + assert_eq!(span.span_context, new_test_export_span_data().span_context); + break; + } + } + }); + tokio::time::sleep(Duration::from_secs(1)).await; // skip the first + processor.on_end(new_test_export_span_data()); + let flush_res = processor.force_flush(); + assert!(flush_res.is_ok()); + let _shutdown_result = processor.shutdown(); + + assert!( + tokio::time::timeout(Duration::from_secs(5), handle) + .await + .is_ok(), + "timed out in 5 seconds. force_flush may not export any data when called" + ); + } + + // If the time_out is true, then the result suppose to ended with timeout. + // otherwise the exporter should be able to export within time out duration. + async fn timeout_test_tokio(time_out: bool) { + let config = BatchConfig { + max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }), + scheduled_delay: Duration::from_secs(60 * 60 * 24), // set the tick to 24 hours so we know the span must be exported via force_flush, + ..Default::default() + }; + let exporter = BlockingExporter { + delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }), + delay_fn: tokio::time::sleep, + }; + let processor = + BatchSpanProcessor::new(Box::new(exporter), config, runtime::TokioCurrentThread); + tokio::time::sleep(Duration::from_secs(1)).await; // skip the first + processor.on_end(new_test_export_span_data()); + let flush_res = processor.force_flush(); + if time_out { + assert!(flush_res.is_err()); + } else { + assert!(flush_res.is_ok()); + } + let shutdown_res = processor.shutdown(); + assert!(shutdown_res.is_ok()); + } + + #[test] + fn test_timeout_tokio_timeout() { + // If time_out is true, then we ask exporter to block for 60s and set timeout to 5s. + // If time_out is false, then we ask the exporter to block for 5s and set timeout to 60s. + // Either way, the test should be finished within 5s. + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + runtime.block_on(timeout_test_tokio(true)); + } + + #[test] + fn test_timeout_tokio_not_timeout() { + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + runtime.block_on(timeout_test_tokio(false)); + } + + #[test] + #[cfg(feature = "rt-async-std")] + fn test_timeout_async_std_timeout() { + async_std::task::block_on(timeout_test_std_async(true)); + } + + #[test] + #[cfg(feature = "rt-async-std")] + fn test_timeout_async_std_not_timeout() { + async_std::task::block_on(timeout_test_std_async(false)); + } + + // If the time_out is true, then the result suppose to ended with timeout. + // otherwise the exporter should be able to export within time out duration. + #[cfg(feature = "rt-async-std")] + async fn timeout_test_std_async(time_out: bool) { + let config = BatchConfig { + max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }), + scheduled_delay: Duration::from_secs(60 * 60 * 24), // set the tick to 24 hours so we know the span must be exported via force_flush + ..Default::default() + }; + let exporter = BlockingExporter { + delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }), + delay_fn: async_std::task::sleep, + }; + let processor = BatchSpanProcessor::new(Box::new(exporter), config, runtime::AsyncStd); + processor.on_end(new_test_export_span_data()); + let flush_res = processor.force_flush(); + if time_out { + assert!(flush_res.is_err()); + } else { + assert!(flush_res.is_ok()); + } + let shutdown_res = processor.shutdown(); + assert!(shutdown_res.is_ok()); + } +} diff --git a/opentelemetry-zipkin/src/exporter/mod.rs b/opentelemetry-zipkin/src/exporter/mod.rs index bb18c1473b..53890c02f0 100644 --- a/opentelemetry-zipkin/src/exporter/mod.rs +++ b/opentelemetry-zipkin/src/exporter/mod.rs @@ -11,7 +11,6 @@ use opentelemetry_http::HttpClient; use opentelemetry_sdk::{ export::{trace, ExportError}, resource::{ResourceDetector, SdkProvidedResourceDetector}, - runtime::RuntimeChannel, trace::{Config, Tracer, TracerProvider}, Resource, }; @@ -165,13 +164,12 @@ impl ZipkinPipelineBuilder { /// Install the Zipkin trace exporter pipeline with a batch span processor using the specified /// runtime. #[allow(deprecated)] - pub fn install_batch( + pub fn install_batch( mut self, - runtime: R, ) -> Result<(Tracer, opentelemetry_sdk::trace::TracerProvider), TraceError> { let (config, endpoint) = self.init_config_and_endpoint(); let exporter = self.init_exporter_with_endpoint(endpoint)?; - let mut provider_builder = TracerProvider::builder().with_batch_exporter(exporter, runtime); + let mut provider_builder = TracerProvider::builder().with_batch_exporter(exporter); provider_builder = provider_builder.with_config(config); let provider = provider_builder.build(); let scope = InstrumentationScope::builder("opentelemetry-zipkin") diff --git a/scripts/integration_tests.sh b/scripts/integration_tests.sh index b984cc023f..9835ffe5b8 100755 --- a/scripts/integration_tests.sh +++ b/scripts/integration_tests.sh @@ -19,10 +19,16 @@ if [ -d "$TEST_DIR" ]; then echo Integration Tests: Reqwest Client echo #### echo - cargo test --no-default-features --features "reqwest-client","internal-logs" + # TODO: reqwest client is not supported with thread based processor and reader. Enable this test once it is supported. + #cargo test --no-default-features --features "reqwest-client","internal-logs" - # TODO - Uncomment the following lines once the reqwest-blocking-client feature is working. - # cargo test --no-default-features --features "reqwest-blocking-client" + # Run tests with the reqwest-client feature + echo + echo #### + echo Integration Tests: Reqwest Blocking Client + echo #### + echo + cargo test --no-default-features --features "reqwest-blocking-client" # Run tests with the hyper-client feature echo @@ -30,7 +36,8 @@ if [ -d "$TEST_DIR" ]; then echo Integration Tests: Hyper Client echo #### echo - cargo test --no-default-features --features "hyper-client","internal-logs" + # TODO: reqwest client is not supported with thread based processor and reader. Enable this test once it is supported. + #cargo test --no-default-features --features "hyper-client","internal-logs" else echo "Directory $TEST_DIR does not exist. Skipping tests." exit 1