Skip to content

Commit

Permalink
Add test_flush_on_shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
scottgerring committed Dec 16, 2024
1 parent 5707bfe commit 6904b97
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 57 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
{
"resourceMetrics": [
{
"resource": {
"attributes": [
{
"key": "service.name",
"value": {
"stringValue": "metrics-integration-test"
}
}
]
},
"scopeMetrics": [
{
"scope": {
"name": "test_flush_on_shutdown"
},
"metrics": [
{
"name": "counter_",
"sum": {
"dataPoints": [
{
"startTimeUnixNano": "1734370440803831000",
"timeUnixNano": "1734370440803905000",
"asInt": "123"
}
],
"aggregationTemporality": 2,
"isMonotonic": true
}
}
]
}
]
}
]
}
184 changes: 127 additions & 57 deletions opentelemetry-otlp/tests/integration_test/tests/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,32 +27,20 @@ static RESULT_PATH: &str = "actual/metrics.json";

/// Initializes the OpenTelemetry metrics pipeline
async fn init_metrics() -> SdkMeterProvider {
let exporter_builder = MetricExporter::builder();

#[cfg(feature = "tonic-client")]
let exporter_builder = exporter_builder.with_tonic();
#[cfg(not(feature = "tonic-client"))]
#[cfg(any(
feature = "hyper-client",
feature = "reqwest-client",
feature = "reqwest-blocking-client"
))]
let exporter_builder = exporter_builder.with_http();

let exporter = exporter_builder
.build()
.expect("Failed to build MetricExporter");
let exporter = create_exporter();

let reader = PeriodicReader::builder(exporter)
.with_interval(Duration::from_millis(100))
.with_timeout(Duration::from_secs(1))
.build();

let resource = Resource::new(vec![KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
"metrics-integration-test",
)]);

let meter_provider = MeterProviderBuilder::default()
.with_resource(Resource::new(vec![KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
"metrics-integration-test",
)]))
.with_resource(resource)
.with_reader(reader)
.build();

Expand All @@ -61,6 +49,28 @@ async fn init_metrics() -> SdkMeterProvider {
meter_provider
}

///
/// Creates an exporter using the appropriate HTTP or gRPC client based on
/// the configured features.
///
fn create_exporter() -> MetricExporter {
let exporter_builder = MetricExporter::builder();

#[cfg(feature = "tonic-client")]
let exporter_builder = exporter_builder.with_tonic();
#[cfg(not(feature = "tonic-client"))]
#[cfg(any(
feature = "hyper-client",
feature = "reqwest-client",
feature = "reqwest-blocking-client"
))]
let exporter_builder = exporter_builder.with_http();

exporter_builder
.build()
.expect("Failed to build MetricExporter")
}

///
/// Retrieves the latest metrics for the given scope. Each test should use
/// its own scope, so that we can easily pull the data for it out from the rest
Expand All @@ -72,55 +82,71 @@ pub fn fetch_latest_metrics_for_scope(scope_name: &str) -> Result<Value> {
// Open the file and fetch the contents
let contents = fs::read_to_string(test_utils::METRICS_FILE)?;

// Find the last complete metrics line. Work backwards until one parses.
// Find the last parseable metrics line that contains the desired scope
let json_line = contents
.lines()
.rev()
.find_map(|line| serde_json::from_str::<Value>(line).ok())
.with_context(|| "No valid JSON line found in the metrics file.")?;

// Parse the JSON and filter metrics strictly by the scope name
let mut filtered_json = json_line;
if let Some(resource_metrics) = filtered_json
.get_mut("resourceMetrics")
.and_then(|v| v.as_array_mut())
{
resource_metrics.retain_mut(|resource| {
if let Some(scope_metrics) = resource
.get_mut("scopeMetrics")
.and_then(|v| v.as_array_mut())
{
// Retain only `ScopeMetrics` that match the specified scope_name
scope_metrics.retain(|scope| {
scope
.get("scope")
.and_then(|s| s.get("name"))
.and_then(|name| name.as_str())
.map_or(false, |n| n == scope_name)
});

// Keep the resource only if it has any matching `ScopeMetrics`
!scope_metrics.is_empty()
} else {
false
}
});
}

Ok(filtered_json)
.find_map(|line| {
// Attempt to parse the line as JSON
serde_json::from_str::<Value>(line)
.ok()
.and_then(|mut json_line| {
// Check if it contains the specified scope
if let Some(resource_metrics) = json_line
.get_mut("resourceMetrics")
.and_then(|v| v.as_array_mut())
{
resource_metrics.retain_mut(|resource| {
if let Some(scope_metrics) = resource
.get_mut("scopeMetrics")
.and_then(|v| v.as_array_mut())
{
scope_metrics.retain(|scope| {
scope
.get("scope")
.and_then(|s| s.get("name"))
.and_then(|name| name.as_str())
.map_or(false, |n| n == scope_name)
});

// Keep the resource only if it has any matching `ScopeMetrics`
!scope_metrics.is_empty()
} else {
false
}
});

// If any resource metrics remain, return this line
if !resource_metrics.is_empty() {
return Some(json_line);
}
}

None
})
})
.with_context(|| {
format!(
"No valid JSON line containing scope `{}` found.",
scope_name
)
})?;

Ok(json_line)
}

/// Performs setup for metrics tests, including environment setup and data seeding.
/// This only needs to be done once for the whole test suite
///
/// Performs setup for metrics tests
///
async fn setup_metrics_test() {
// Make sure the collector container is running
start_collector_container().await;

let mut done = SETUP_DONE.lock().unwrap();
if !*done {
println!("Running setup before any tests...");
*done = true; // Mark setup as done

// Make sure the collector container is running
start_collector_container().await;

// Initialise the metrics subsystem
_ = init_metrics().await;
}
Expand Down Expand Up @@ -158,7 +184,9 @@ pub fn validate_metrics_against_results(scope_name: &str) -> Result<()> {

#[cfg(test)]
mod tests {

use super::*;
use opentelemetry::metrics::MeterProvider;

///
/// JSON doesn't roundtrip through the MetricsData models properly.
Expand Down Expand Up @@ -239,6 +267,47 @@ mod tests {

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_flush_on_shutdown() -> Result<()> {
const METER_NAME: &str = "test_flush_on_shutdown";

// Set everything up by hand, so that we can shutdown() the exporter
// and make sure our data is flushed through.

// Make sure the collector is running
start_collector_container().await;

// Set up the exporter
let exporter = create_exporter();
let reader = PeriodicReader::builder(exporter)
.with_interval(Duration::from_millis(100))
.with_timeout(Duration::from_secs(1))
.build();
let resource = Resource::new(vec![KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
"metrics-integration-test",
)]);
let meter_provider = MeterProviderBuilder::default()
.with_resource(resource)
.with_reader(reader)
.build();

// Send something
let meter = meter_provider.meter(METER_NAME);
let counter = meter.u64_counter("counter_").build();
counter.add(123, &[]);

// Shutdown
meter_provider.shutdown()?;

// We still need to sleep, to give otel-collector a chance to flush to disk
tokio::time::sleep(Duration::from_secs(2)).await;

validate_metrics_against_results(METER_NAME)?;

Ok(())
}
}

///
Expand All @@ -247,5 +316,6 @@ mod tests {
///
#[dtor]
fn shutdown() {
println!("metrics::shutdown");
test_utils::stop_collector_container();
}

0 comments on commit 6904b97

Please sign in to comment.