diff --git a/opentelemetry-otlp/tests/integration_test/expected/metrics/test_flush_on_shutdown.json b/opentelemetry-otlp/tests/integration_test/expected/metrics/test_flush_on_shutdown.json new file mode 100644 index 0000000000..c390a70664 --- /dev/null +++ b/opentelemetry-otlp/tests/integration_test/expected/metrics/test_flush_on_shutdown.json @@ -0,0 +1,39 @@ +{ + "resourceMetrics": [ + { + "resource": { + "attributes": [ + { + "key": "service.name", + "value": { + "stringValue": "metrics-integration-test" + } + } + ] + }, + "scopeMetrics": [ + { + "scope": { + "name": "test_flush_on_shutdown" + }, + "metrics": [ + { + "name": "counter_", + "sum": { + "dataPoints": [ + { + "startTimeUnixNano": "1734370440803831000", + "timeUnixNano": "1734370440803905000", + "asInt": "123" + } + ], + "aggregationTemporality": 2, + "isMonotonic": true + } + } + ] + } + ] + } + ] +} diff --git a/opentelemetry-otlp/tests/integration_test/tests/metrics.rs b/opentelemetry-otlp/tests/integration_test/tests/metrics.rs index fc8ecb1d56..68fc423ec5 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/metrics.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/metrics.rs @@ -27,32 +27,20 @@ static RESULT_PATH: &str = "actual/metrics.json"; /// Initializes the OpenTelemetry metrics pipeline async fn init_metrics() -> SdkMeterProvider { - let exporter_builder = MetricExporter::builder(); - - #[cfg(feature = "tonic-client")] - let exporter_builder = exporter_builder.with_tonic(); - #[cfg(not(feature = "tonic-client"))] - #[cfg(any( - feature = "hyper-client", - feature = "reqwest-client", - feature = "reqwest-blocking-client" - ))] - let exporter_builder = exporter_builder.with_http(); - - let exporter = exporter_builder - .build() - .expect("Failed to build MetricExporter"); + let exporter = create_exporter(); let reader = PeriodicReader::builder(exporter) .with_interval(Duration::from_millis(100)) .with_timeout(Duration::from_secs(1)) .build(); + let resource = Resource::new(vec![KeyValue::new( + opentelemetry_semantic_conventions::resource::SERVICE_NAME, + "metrics-integration-test", + )]); + let meter_provider = MeterProviderBuilder::default() - .with_resource(Resource::new(vec![KeyValue::new( - opentelemetry_semantic_conventions::resource::SERVICE_NAME, - "metrics-integration-test", - )])) + .with_resource(resource) .with_reader(reader) .build(); @@ -61,6 +49,28 @@ async fn init_metrics() -> SdkMeterProvider { meter_provider } +/// +/// Creates an exporter using the appropriate HTTP or gRPC client based on +/// the configured features. +/// +fn create_exporter() -> MetricExporter { + let exporter_builder = MetricExporter::builder(); + + #[cfg(feature = "tonic-client")] + let exporter_builder = exporter_builder.with_tonic(); + #[cfg(not(feature = "tonic-client"))] + #[cfg(any( + feature = "hyper-client", + feature = "reqwest-client", + feature = "reqwest-blocking-client" + ))] + let exporter_builder = exporter_builder.with_http(); + + exporter_builder + .build() + .expect("Failed to build MetricExporter") +} + /// /// Retrieves the latest metrics for the given scope. Each test should use /// its own scope, so that we can easily pull the data for it out from the rest @@ -72,55 +82,71 @@ pub fn fetch_latest_metrics_for_scope(scope_name: &str) -> Result { // Open the file and fetch the contents let contents = fs::read_to_string(test_utils::METRICS_FILE)?; - // Find the last complete metrics line. Work backwards until one parses. + // Find the last parseable metrics line that contains the desired scope let json_line = contents .lines() .rev() - .find_map(|line| serde_json::from_str::(line).ok()) - .with_context(|| "No valid JSON line found in the metrics file.")?; - - // Parse the JSON and filter metrics strictly by the scope name - let mut filtered_json = json_line; - if let Some(resource_metrics) = filtered_json - .get_mut("resourceMetrics") - .and_then(|v| v.as_array_mut()) - { - resource_metrics.retain_mut(|resource| { - if let Some(scope_metrics) = resource - .get_mut("scopeMetrics") - .and_then(|v| v.as_array_mut()) - { - // Retain only `ScopeMetrics` that match the specified scope_name - scope_metrics.retain(|scope| { - scope - .get("scope") - .and_then(|s| s.get("name")) - .and_then(|name| name.as_str()) - .map_or(false, |n| n == scope_name) - }); - - // Keep the resource only if it has any matching `ScopeMetrics` - !scope_metrics.is_empty() - } else { - false - } - }); - } - - Ok(filtered_json) + .find_map(|line| { + // Attempt to parse the line as JSON + serde_json::from_str::(line) + .ok() + .and_then(|mut json_line| { + // Check if it contains the specified scope + if let Some(resource_metrics) = json_line + .get_mut("resourceMetrics") + .and_then(|v| v.as_array_mut()) + { + resource_metrics.retain_mut(|resource| { + if let Some(scope_metrics) = resource + .get_mut("scopeMetrics") + .and_then(|v| v.as_array_mut()) + { + scope_metrics.retain(|scope| { + scope + .get("scope") + .and_then(|s| s.get("name")) + .and_then(|name| name.as_str()) + .map_or(false, |n| n == scope_name) + }); + + // Keep the resource only if it has any matching `ScopeMetrics` + !scope_metrics.is_empty() + } else { + false + } + }); + + // If any resource metrics remain, return this line + if !resource_metrics.is_empty() { + return Some(json_line); + } + } + + None + }) + }) + .with_context(|| { + format!( + "No valid JSON line containing scope `{}` found.", + scope_name + ) + })?; + + Ok(json_line) } -/// Performs setup for metrics tests, including environment setup and data seeding. -/// This only needs to be done once for the whole test suite +/// +/// Performs setup for metrics tests +/// async fn setup_metrics_test() { + // Make sure the collector container is running + start_collector_container().await; + let mut done = SETUP_DONE.lock().unwrap(); if !*done { println!("Running setup before any tests..."); *done = true; // Mark setup as done - // Make sure the collector container is running - start_collector_container().await; - // Initialise the metrics subsystem _ = init_metrics().await; } @@ -158,7 +184,9 @@ pub fn validate_metrics_against_results(scope_name: &str) -> Result<()> { #[cfg(test)] mod tests { + use super::*; + use opentelemetry::metrics::MeterProvider; /// /// JSON doesn't roundtrip through the MetricsData models properly. @@ -239,6 +267,47 @@ mod tests { Ok(()) } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_flush_on_shutdown() -> Result<()> { + const METER_NAME: &str = "test_flush_on_shutdown"; + + // Set everything up by hand, so that we can shutdown() the exporter + // and make sure our data is flushed through. + + // Make sure the collector is running + start_collector_container().await; + + // Set up the exporter + let exporter = create_exporter(); + let reader = PeriodicReader::builder(exporter) + .with_interval(Duration::from_millis(100)) + .with_timeout(Duration::from_secs(1)) + .build(); + let resource = Resource::new(vec![KeyValue::new( + opentelemetry_semantic_conventions::resource::SERVICE_NAME, + "metrics-integration-test", + )]); + let meter_provider = MeterProviderBuilder::default() + .with_resource(resource) + .with_reader(reader) + .build(); + + // Send something + let meter = meter_provider.meter(METER_NAME); + let counter = meter.u64_counter("counter_").build(); + counter.add(123, &[]); + + // Shutdown + meter_provider.shutdown()?; + + // We still need to sleep, to give otel-collector a chance to flush to disk + tokio::time::sleep(Duration::from_secs(2)).await; + + validate_metrics_against_results(METER_NAME)?; + + Ok(()) + } } /// @@ -247,5 +316,6 @@ mod tests { /// #[dtor] fn shutdown() { + println!("metrics::shutdown"); test_utils::stop_collector_container(); }