From bd56070fb1757c19e8a7ac97094f29eafa938dd0 Mon Sep 17 00:00:00 2001 From: Julian Tescher Date: Fri, 16 Sep 2022 13:29:35 -0700 Subject: [PATCH] opentelemetry: update otel to 0.18.0 (#2303) ## Motivation Support the latest OpenTelemetry specification. ## Solution Update `opentelemetry` to the latest `0.18.x` release. Breaking changes in the metrics spec have removed value recorders and added histograms so the metrics layer's `value.` prefix has been changed to `histogram.` and behaves accordingly. Additionally the `PushController` configuration for the metrics layer has been simplified to accept a `BasicController` that can act in either push or pull modes. Finally trace sampling in the sdk's `PreSampledTracer` impl has been updated to match the sampling logic in https://github.com/open-telemetry/opentelemetry-rust/pull/839. * Update MSRV to 1.56 * Update examples * Fix async-trait dep * Update msrv action Co-authored-by: Eliza Weisman --- .github/workflows/CI.yml | 1 + examples/Cargo.toml | 4 +- examples/examples/opentelemetry.rs | 2 +- tracing-opentelemetry/Cargo.toml | 8 +- tracing-opentelemetry/README.md | 4 +- tracing-opentelemetry/src/layer.rs | 140 +++++----- tracing-opentelemetry/src/lib.rs | 4 +- tracing-opentelemetry/src/metrics.rs | 93 ++++--- tracing-opentelemetry/src/tracer.rs | 26 +- .../tests/metrics_publishing.rs | 248 +++++++++--------- .../tests/trace_state_propagation.rs | 21 +- 11 files changed, 288 insertions(+), 263 deletions(-) diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 43fd26fbc1..b7532f0639 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -214,6 +214,7 @@ jobs: --exclude=tracing-appender --exclude=tracing-examples --exclude=tracing-futures + --exclude=tracing-opentelemetry toolchain: ${{ env.MSRV }} # TODO: remove this once tracing's MSRV is bumped. diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 28a12f4456..c08515bc79 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -52,8 +52,8 @@ inferno = "0.11.6" tempfile = "3" # opentelemetry example -opentelemetry = { version = "0.17.0", default-features = false, features = ["trace"] } -opentelemetry-jaeger = "0.16.0" +opentelemetry = { version = "0.18.0", default-features = false, features = ["trace"] } +opentelemetry-jaeger = "0.17.0" # fmt examples snafu = "0.6.10" diff --git a/examples/examples/opentelemetry.rs b/examples/examples/opentelemetry.rs index b453c77cad..e00cd3a500 100644 --- a/examples/examples/opentelemetry.rs +++ b/examples/examples/opentelemetry.rs @@ -19,7 +19,7 @@ fn main() -> Result<(), Box> { // Install an otel pipeline with a simple span processor that exports data one at a time when // spans end. See the `install_batch` option on each exporter's pipeline builder to see how to // export in batches. - let tracer = opentelemetry_jaeger::new_pipeline() + let tracer = opentelemetry_jaeger::new_agent_pipeline() .with_service_name("report_example") .install_simple()?; let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer); diff --git a/tracing-opentelemetry/Cargo.toml b/tracing-opentelemetry/Cargo.toml index efe2de0ab9..b9142f1e4e 100644 --- a/tracing-opentelemetry/Cargo.toml +++ b/tracing-opentelemetry/Cargo.toml @@ -17,7 +17,7 @@ categories = [ keywords = ["tracing", "opentelemetry", "jaeger", "zipkin", "async"] license = "MIT" edition = "2018" -rust-version = "1.46.0" +rust-version = "1.56.0" [features] default = ["tracing-log", "metrics"] @@ -25,7 +25,7 @@ default = ["tracing-log", "metrics"] metrics = ["opentelemetry/metrics"] [dependencies] -opentelemetry = { version = "0.17.0", default-features = false, features = ["trace"] } +opentelemetry = { version = "0.18.0", default-features = false, features = ["trace"] } tracing = { path = "../tracing", version = "0.1.35", default-features = false, features = ["std"] } tracing-core = { path = "../tracing-core", version = "0.1.28" } tracing-subscriber = { path = "../tracing-subscriber", version = "0.3.0", default-features = false, features = ["registry", "std"] } @@ -39,7 +39,7 @@ thiserror = { version = "1.0.31", optional = true } [dev-dependencies] async-trait = "0.1.56" criterion = { version = "0.3.6", default-features = false } -opentelemetry-jaeger = "0.16.0" +opentelemetry-jaeger = "0.17.0" futures-util = { version = "0.3", default-features = false } tokio = { version = "1", features = ["full"] } tokio-stream = "0.1" @@ -53,4 +53,4 @@ harness = false [package.metadata.docs.rs] all-features = true -rustdoc-args = ["--cfg", "docsrs"] \ No newline at end of file +rustdoc-args = ["--cfg", "docsrs"] diff --git a/tracing-opentelemetry/README.md b/tracing-opentelemetry/README.md index 4fcb658cfa..9260d80636 100644 --- a/tracing-opentelemetry/README.md +++ b/tracing-opentelemetry/README.md @@ -50,7 +50,7 @@ The crate provides the following types: [`tracing`]: https://crates.io/crates/tracing [OpenTelemetry]: https://opentelemetry.io/ -*Compiler support: [requires `rustc` 1.49+][msrv]* +*Compiler support: [requires `rustc` 1.56+][msrv]* [msrv]: #supported-rust-versions @@ -110,7 +110,7 @@ $ firefox http://localhost:16686/ ## Supported Rust Versions Tracing Opentelemetry is built against the latest stable release. The minimum -supported version is 1.46. The current Tracing version is not guaranteed to +supported version is 1.56. The current Tracing version is not guaranteed to build on Rust versions earlier than the minimum supported version. Tracing follows the same compiler support policies as the rest of the Tokio diff --git a/tracing-opentelemetry/src/layer.rs b/tracing-opentelemetry/src/layer.rs index b966e4372d..7624b613e3 100644 --- a/tracing-opentelemetry/src/layer.rs +++ b/tracing-opentelemetry/src/layer.rs @@ -1,11 +1,10 @@ use crate::{OtelData, PreSampledTracer}; use once_cell::unsync; use opentelemetry::{ - trace::{self as otel, noop, TraceContextExt}, - Context as OtelContext, Key, KeyValue, Value, + trace::{self as otel, noop, OrderMap, TraceContextExt}, + Context as OtelContext, Key, KeyValue, StringValue, Value, }; use std::any::TypeId; -use std::borrow::Cow; use std::fmt; use std::marker; use std::thread; @@ -105,12 +104,11 @@ fn str_to_span_kind(s: &str) -> Option { } } -fn str_to_status_code(s: &str) -> Option { +fn str_to_status(s: &str) -> otel::Status { match s { - s if s.eq_ignore_ascii_case("unset") => Some(otel::StatusCode::Unset), - s if s.eq_ignore_ascii_case("ok") => Some(otel::StatusCode::Ok), - s if s.eq_ignore_ascii_case("error") => Some(otel::StatusCode::Error), - _ => None, + s if s.eq_ignore_ascii_case("ok") => otel::Status::Ok, + s if s.eq_ignore_ascii_case("error") => otel::Status::error(""), + _ => otel::Status::Unset, } } @@ -220,7 +218,7 @@ impl<'a, 'b> field::Visit for SpanEventVisitor<'a, 'b> { let mut next_err = value.source(); while let Some(err) = next_err { - chain.push(Cow::Owned(err.to_string())); + chain.push(StringValue::from(err.to_string())); next_err = err.source(); } @@ -245,7 +243,7 @@ impl<'a, 'b> field::Visit for SpanEventVisitor<'a, 'b> { if self.exception_config.propagate { if let Some(span) = &mut self.span_builder { if let Some(attrs) = span.attributes.as_mut() { - attrs.push(Key::new(FIELD_EXCEPTION_MESSAGE).string(error_msg.clone())); + attrs.insert(Key::new(FIELD_EXCEPTION_MESSAGE), error_msg.clone().into()); // NOTE: This is actually not the stacktrace of the exception. This is // the "source chain". It represents the heirarchy of errors from the @@ -253,7 +251,10 @@ impl<'a, 'b> field::Visit for SpanEventVisitor<'a, 'b> { // of the callsites in the code that led to the error happening. // `std::error::Error::backtrace` is a nightly-only API and cannot be // used here until the feature is stabilized. - attrs.push(Key::new(FIELD_EXCEPTION_STACKTRACE).array(chain.clone())); + attrs.insert( + Key::new(FIELD_EXCEPTION_STACKTRACE), + Value::Array(chain.clone().into()), + ); } } } @@ -288,7 +289,7 @@ impl<'a> SpanAttributeVisitor<'a> { fn record(&mut self, attribute: KeyValue) { debug_assert!(self.span_builder.attributes.is_some()); if let Some(v) = self.span_builder.attributes.as_mut() { - v.push(attribute); + v.insert(attribute.key, attribute.value); } } } @@ -322,9 +323,9 @@ impl<'a> field::Visit for SpanAttributeVisitor<'a> { match field.name() { SPAN_NAME_FIELD => self.span_builder.name = value.to_string().into(), SPAN_KIND_FIELD => self.span_builder.span_kind = str_to_span_kind(value), - SPAN_STATUS_CODE_FIELD => self.span_builder.status_code = str_to_status_code(value), + SPAN_STATUS_CODE_FIELD => self.span_builder.status = str_to_status(value), SPAN_STATUS_MESSAGE_FIELD => { - self.span_builder.status_message = Some(value.to_owned().into()) + self.span_builder.status = otel::Status::error(value.to_string()) } _ => self.record(KeyValue::new(field.name(), value.to_string())), } @@ -341,10 +342,10 @@ impl<'a> field::Visit for SpanAttributeVisitor<'a> { self.span_builder.span_kind = str_to_span_kind(&format!("{:?}", value)) } SPAN_STATUS_CODE_FIELD => { - self.span_builder.status_code = str_to_status_code(&format!("{:?}", value)) + self.span_builder.status = str_to_status(&format!("{:?}", value)) } SPAN_STATUS_MESSAGE_FIELD => { - self.span_builder.status_message = Some(format!("{:?}", value).into()) + self.span_builder.status = otel::Status::error(format!("{:?}", value)) } _ => self.record(Key::new(field.name()).string(format!("{:?}", value))), } @@ -363,7 +364,7 @@ impl<'a> field::Visit for SpanAttributeVisitor<'a> { let mut next_err = value.source(); while let Some(err) = next_err { - chain.push(Cow::Owned(err.to_string())); + chain.push(StringValue::from(err.to_string())); next_err = err.source(); } @@ -405,7 +406,7 @@ where /// use tracing_subscriber::Registry; /// /// // Create a jaeger exporter pipeline for a `trace_demo` service. - /// let tracer = opentelemetry_jaeger::new_pipeline() + /// let tracer = opentelemetry_jaeger::new_agent_pipeline() /// .with_service_name("trace_demo") /// .install_simple() /// .expect("Error initializing Jaeger exporter"); @@ -446,7 +447,7 @@ where /// use tracing_subscriber::Registry; /// /// // Create a jaeger exporter pipeline for a `trace_demo` service. - /// let tracer = opentelemetry_jaeger::new_pipeline() + /// let tracer = opentelemetry_jaeger::new_agent_pipeline() /// .with_service_name("trace_demo") /// .install_simple() /// .expect("Error initializing Jaeger exporter"); @@ -684,7 +685,7 @@ where builder.trace_id = Some(self.tracer.new_trace_id()); } - let builder_attrs = builder.attributes.get_or_insert(Vec::with_capacity( + let builder_attrs = builder.attributes.get_or_insert(OrderMap::with_capacity( attrs.fields().len() + self.extra_span_attrs(), )); @@ -692,26 +693,26 @@ where let meta = attrs.metadata(); if let Some(filename) = meta.file() { - builder_attrs.push(KeyValue::new("code.filepath", filename)); + builder_attrs.insert("code.filepath".into(), filename.into()); } if let Some(module) = meta.module_path() { - builder_attrs.push(KeyValue::new("code.namespace", module)); + builder_attrs.insert("code.namespace".into(), module.into()); } if let Some(line) = meta.line() { - builder_attrs.push(KeyValue::new("code.lineno", line as i64)); + builder_attrs.insert("code.lineno".into(), (line as i64).into()); } } if self.with_threads { - THREAD_ID.with(|id| builder_attrs.push(KeyValue::new("thread.id", **id as i64))); + THREAD_ID.with(|id| builder_attrs.insert("thread.id".into(), (**id as i64).into())); if let Some(name) = std::thread::current().name() { // TODO(eliza): it's a bummer that we have to allocate here, but // we can't easily get the string as a `static`. it would be // nice if `opentelemetry` could also take `Arc`s as // `String` values... - builder_attrs.push(KeyValue::new("thread.name", name.to_owned())); + builder_attrs.insert("thread.name".into(), name.to_owned().into()); } } @@ -845,8 +846,10 @@ where }); if let Some(OtelData { builder, .. }) = extensions.get_mut::() { - if builder.status_code.is_none() && *meta.level() == tracing_core::Level::ERROR { - builder.status_code = Some(otel::StatusCode::Error); + if builder.status == otel::Status::Unset + && *meta.level() == tracing_core::Level::ERROR + { + builder.status = otel::Status::error("") } if self.location { @@ -904,15 +907,14 @@ where if self.tracked_inactivity { // Append busy/idle timings when enabled. if let Some(timings) = extensions.get_mut::() { - let busy_ns = KeyValue::new("busy_ns", timings.busy); - let idle_ns = KeyValue::new("idle_ns", timings.idle); - - if let Some(ref mut attributes) = builder.attributes { - attributes.push(busy_ns); - attributes.push(idle_ns); - } else { - builder.attributes = Some(vec![busy_ns, idle_ns]); - } + let busy_ns = Key::new("busy_ns"); + let idle_ns = Key::new("idle_ns"); + + let attributes = builder + .attributes + .get_or_insert_with(|| OrderMap::with_capacity(2)); + attributes.insert(busy_ns, timings.busy.into()); + attributes.insert(idle_ns, timings.idle.into()); } } @@ -965,7 +967,10 @@ fn thread_id_integer(id: thread::ThreadId) -> u64 { mod tests { use super::*; use crate::OtelData; - use opentelemetry::trace::{noop, SpanKind, TraceFlags}; + use opentelemetry::{ + trace::{noop, TraceFlags}, + StringValue, + }; use std::{ borrow::Cow, collections::HashMap, @@ -1043,7 +1048,7 @@ mod tests { false } fn set_attribute(&mut self, _attribute: KeyValue) {} - fn set_status(&mut self, _code: otel::StatusCode, _message: String) {} + fn set_status(&mut self, _status: otel::Status) {} fn update_name>>(&mut self, _new_name: T) {} fn end_with_timestamp(&mut self, _timestamp: SystemTime) {} } @@ -1103,7 +1108,7 @@ mod tests { let subscriber = tracing_subscriber::registry().with(layer().with_tracer(tracer.clone())); tracing::subscriber::with_default(subscriber, || { - tracing::debug_span!("request", otel.kind = %SpanKind::Server); + tracing::debug_span!("request", otel.kind = "server"); }); let recorded_kind = tracer.with_data(|data| data.builder.span_kind.clone()); @@ -1116,11 +1121,19 @@ mod tests { let subscriber = tracing_subscriber::registry().with(layer().with_tracer(tracer.clone())); tracing::subscriber::with_default(subscriber, || { - tracing::debug_span!("request", otel.status_code = ?otel::StatusCode::Ok); + tracing::debug_span!("request", otel.status_code = ?otel::Status::Ok); }); + let recorded_status = tracer + .0 + .lock() + .unwrap() + .as_ref() + .unwrap() + .builder + .status + .clone(); - let recorded_status_code = tracer.with_data(|data| data.builder.status_code); - assert_eq!(recorded_status_code, Some(otel::StatusCode::Ok)) + assert_eq!(recorded_status, otel::Status::Ok) } #[test] @@ -1134,8 +1147,17 @@ mod tests { tracing::debug_span!("request", otel.status_message = message); }); - let recorded_status_message = tracer.with_data(|data| data.builder.status_message.clone()); - assert_eq!(recorded_status_message, Some(message.into())) + let recorded_status_message = tracer + .0 + .lock() + .unwrap() + .as_ref() + .unwrap() + .builder + .status + .clone(); + + assert_eq!(recorded_status_message, otel::Status::error(message)) } #[test] @@ -1153,7 +1175,7 @@ mod tests { let _g = existing_cx.attach(); tracing::subscriber::with_default(subscriber, || { - tracing::debug_span!("request", otel.kind = %SpanKind::Server); + tracing::debug_span!("request", otel.kind = "server"); }); let recorded_trace_id = @@ -1177,7 +1199,7 @@ mod tests { let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone()); let keys = attributes .iter() - .map(|attr| attr.key.as_str()) + .map(|(key, _)| key.as_str()) .collect::>(); assert!(keys.contains(&"idle_ns")); assert!(keys.contains(&"busy_ns")); @@ -1217,7 +1239,7 @@ mod tests { let key_values = attributes .into_iter() - .map(|attr| (attr.key.as_str().to_owned(), attr.value)) + .map(|(key, value)| (key.as_str().to_owned(), value)) .collect::>(); assert_eq!(key_values["error"].as_str(), "user error"); @@ -1225,8 +1247,8 @@ mod tests { key_values["error.chain"], Value::Array( vec![ - Cow::Borrowed("intermediate error"), - Cow::Borrowed("base error") + StringValue::from("intermediate error"), + StringValue::from("base error") ] .into() ) @@ -1237,8 +1259,8 @@ mod tests { key_values[FIELD_EXCEPTION_STACKTRACE], Value::Array( vec![ - Cow::Borrowed("intermediate error"), - Cow::Borrowed("base error") + StringValue::from("intermediate error"), + StringValue::from("base error") ] .into() ) @@ -1258,7 +1280,7 @@ mod tests { let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone()); let keys = attributes .iter() - .map(|attr| attr.key.as_str()) + .map(|(key, _)| key.as_str()) .collect::>(); assert!(keys.contains(&"code.filepath")); assert!(keys.contains(&"code.namespace")); @@ -1278,7 +1300,7 @@ mod tests { let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone()); let keys = attributes .iter() - .map(|attr| attr.key.as_str()) + .map(|(key, _)| key.as_str()) .collect::>(); assert!(!keys.contains(&"code.filepath")); assert!(!keys.contains(&"code.namespace")); @@ -1290,7 +1312,7 @@ mod tests { let thread = thread::current(); let expected_name = thread .name() - .map(|name| Value::String(Cow::Owned(name.to_owned()))); + .map(|name| Value::String(name.to_owned().into())); let expected_id = Value::I64(thread_id_integer(thread.id()) as i64); let tracer = TestTracer(Arc::new(Mutex::new(None))); @@ -1304,7 +1326,7 @@ mod tests { let attributes = tracer .with_data(|data| data.builder.attributes.as_ref().unwrap().clone()) .drain(..) - .map(|keyval| (keyval.key.as_str().to_string(), keyval.value)) + .map(|(key, value)| (key.as_str().to_string(), value)) .collect::>(); assert_eq!(attributes.get("thread.name"), expected_name.as_ref()); assert_eq!(attributes.get("thread.id"), Some(&expected_id)); @@ -1323,7 +1345,7 @@ mod tests { let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone()); let keys = attributes .iter() - .map(|attr| attr.key.as_str()) + .map(|(key, _)| key.as_str()) .collect::>(); assert!(!keys.contains(&"thread.name")); assert!(!keys.contains(&"thread.id")); @@ -1365,7 +1387,7 @@ mod tests { let key_values = attributes .into_iter() - .map(|attr| (attr.key.as_str().to_owned(), attr.value)) + .map(|(key, value)| (key.as_str().to_owned(), value)) .collect::>(); assert_eq!(key_values[FIELD_EXCEPTION_MESSAGE].as_str(), "user error"); @@ -1373,8 +1395,8 @@ mod tests { key_values[FIELD_EXCEPTION_STACKTRACE], Value::Array( vec![ - Cow::Borrowed("intermediate error"), - Cow::Borrowed("base error") + StringValue::from("intermediate error"), + StringValue::from("base error") ] .into() ) diff --git a/tracing-opentelemetry/src/lib.rs b/tracing-opentelemetry/src/lib.rs index 5cd725d5a3..d3bc5bbce3 100644 --- a/tracing-opentelemetry/src/lib.rs +++ b/tracing-opentelemetry/src/lib.rs @@ -9,7 +9,7 @@ //! [OpenTelemetry]: https://opentelemetry.io //! [`tracing`]: https://github.com/tokio-rs/tracing //! -//! *Compiler support: [requires `rustc` 1.49+][msrv]* +//! *Compiler support: [requires `rustc` 1.56+][msrv]* //! //! [msrv]: #supported-rust-versions //! @@ -86,7 +86,7 @@ //! ## Supported Rust Versions //! //! Tracing is built against the latest stable release. The minimum supported -//! version is 1.49. The current Tracing version is not guaranteed to build on +//! version is 1.56. The current Tracing version is not guaranteed to build on //! Rust versions earlier than the minimum supported version. //! //! Tracing follows the same compiler support policies as the rest of the Tokio diff --git a/tracing-opentelemetry/src/metrics.rs b/tracing-opentelemetry/src/metrics.rs index 37df62c4b4..91b9e63a8f 100644 --- a/tracing-opentelemetry/src/metrics.rs +++ b/tracing-opentelemetry/src/metrics.rs @@ -3,8 +3,9 @@ use tracing::{field::Visit, Subscriber}; use tracing_core::Field; use opentelemetry::{ - metrics::{Counter, Meter, MeterProvider, UpDownCounter, ValueRecorder}, - sdk::metrics::PushController, + metrics::{Counter, Histogram, Meter, MeterProvider, UpDownCounter}, + sdk::metrics::controllers::BasicController, + Context as OtelContext, }; use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer}; @@ -13,7 +14,7 @@ const INSTRUMENTATION_LIBRARY_NAME: &str = "tracing/tracing-opentelemetry"; const METRIC_PREFIX_MONOTONIC_COUNTER: &str = "monotonic_counter."; const METRIC_PREFIX_COUNTER: &str = "counter."; -const METRIC_PREFIX_VALUE: &str = "value."; +const METRIC_PREFIX_HISTOGRAM: &str = "histogram."; const I64_MAX: u64 = i64::MAX as u64; #[derive(Default)] @@ -22,9 +23,9 @@ pub(crate) struct Instruments { f64_counter: MetricsMap>, i64_up_down_counter: MetricsMap>, f64_up_down_counter: MetricsMap>, - u64_value_recorder: MetricsMap>, - i64_value_recorder: MetricsMap>, - f64_value_recorder: MetricsMap>, + u64_histogram: MetricsMap>, + i64_histogram: MetricsMap>, + f64_histogram: MetricsMap>, } type MetricsMap = RwLock>; @@ -35,14 +36,15 @@ pub(crate) enum InstrumentType { CounterF64(f64), UpDownCounterI64(i64), UpDownCounterF64(f64), - ValueRecorderU64(u64), - ValueRecorderI64(i64), - ValueRecorderF64(f64), + HistogramU64(u64), + HistogramI64(i64), + HistogramF64(f64), } impl Instruments { pub(crate) fn update_metric( &self, + cx: &OtelContext, meter: &Meter, instrument_type: InstrumentType, metric_name: &'static str, @@ -76,7 +78,7 @@ impl Instruments { &self.u64_counter, metric_name, || meter.u64_counter(metric_name).init(), - |ctr| ctr.add(value, &[]), + |ctr| ctr.add(cx, value, &[]), ); } InstrumentType::CounterF64(value) => { @@ -84,7 +86,7 @@ impl Instruments { &self.f64_counter, metric_name, || meter.f64_counter(metric_name).init(), - |ctr| ctr.add(value, &[]), + |ctr| ctr.add(cx, value, &[]), ); } InstrumentType::UpDownCounterI64(value) => { @@ -92,7 +94,7 @@ impl Instruments { &self.i64_up_down_counter, metric_name, || meter.i64_up_down_counter(metric_name).init(), - |ctr| ctr.add(value, &[]), + |ctr| ctr.add(cx, value, &[]), ); } InstrumentType::UpDownCounterF64(value) => { @@ -100,31 +102,31 @@ impl Instruments { &self.f64_up_down_counter, metric_name, || meter.f64_up_down_counter(metric_name).init(), - |ctr| ctr.add(value, &[]), + |ctr| ctr.add(cx, value, &[]), ); } - InstrumentType::ValueRecorderU64(value) => { + InstrumentType::HistogramU64(value) => { update_or_insert( - &self.u64_value_recorder, + &self.u64_histogram, metric_name, - || meter.u64_value_recorder(metric_name).init(), - |rec| rec.record(value, &[]), + || meter.u64_histogram(metric_name).init(), + |rec| rec.record(cx, value, &[]), ); } - InstrumentType::ValueRecorderI64(value) => { + InstrumentType::HistogramI64(value) => { update_or_insert( - &self.i64_value_recorder, + &self.i64_histogram, metric_name, - || meter.i64_value_recorder(metric_name).init(), - |rec| rec.record(value, &[]), + || meter.i64_histogram(metric_name).init(), + |rec| rec.record(cx, value, &[]), ); } - InstrumentType::ValueRecorderF64(value) => { + InstrumentType::HistogramF64(value) => { update_or_insert( - &self.f64_value_recorder, + &self.f64_histogram, metric_name, - || meter.f64_value_recorder(metric_name).init(), - |rec| rec.record(value, &[]), + || meter.f64_histogram(metric_name).init(), + |rec| rec.record(cx, value, &[]), ); } }; @@ -142,8 +144,10 @@ impl<'a> Visit for MetricVisitor<'a> { } fn record_u64(&mut self, field: &Field, value: u64) { + let cx = OtelContext::current(); if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) { self.instruments.update_metric( + &cx, self.meter, InstrumentType::CounterU64(value), metric_name, @@ -151,6 +155,7 @@ impl<'a> Visit for MetricVisitor<'a> { } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) { if value <= I64_MAX { self.instruments.update_metric( + &cx, self.meter, InstrumentType::UpDownCounterI64(value as i64), metric_name, @@ -163,54 +168,63 @@ impl<'a> Visit for MetricVisitor<'a> { value ); } - } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_VALUE) { + } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) { self.instruments.update_metric( + &cx, self.meter, - InstrumentType::ValueRecorderU64(value), + InstrumentType::HistogramU64(value), metric_name, ); } } fn record_f64(&mut self, field: &Field, value: f64) { + let cx = OtelContext::current(); if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) { self.instruments.update_metric( + &cx, self.meter, InstrumentType::CounterF64(value), metric_name, ); } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) { self.instruments.update_metric( + &cx, self.meter, InstrumentType::UpDownCounterF64(value), metric_name, ); - } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_VALUE) { + } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) { self.instruments.update_metric( + &cx, self.meter, - InstrumentType::ValueRecorderF64(value), + InstrumentType::HistogramF64(value), metric_name, ); } } fn record_i64(&mut self, field: &Field, value: i64) { + let cx = OtelContext::current(); if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) { self.instruments.update_metric( + &cx, self.meter, InstrumentType::CounterU64(value as u64), metric_name, ); } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) { self.instruments.update_metric( + &cx, self.meter, InstrumentType::UpDownCounterI64(value), metric_name, ); - } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_VALUE) { + } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) { self.instruments.update_metric( + &cx, self.meter, - InstrumentType::ValueRecorderI64(value), + InstrumentType::HistogramI64(value), metric_name, ); } @@ -232,14 +246,14 @@ impl<'a> Visit for MetricVisitor<'a> { /// use tracing_opentelemetry::MetricsLayer; /// use tracing_subscriber::layer::SubscriberExt; /// use tracing_subscriber::Registry; -/// # use opentelemetry::sdk::metrics::PushController; +/// # use opentelemetry::sdk::metrics::controllers::BasicController; /// -/// // Constructing a PushController is out-of-scope for the docs here, but there +/// // Constructing a BasicController is out-of-scope for the docs here, but there /// // are examples in the opentelemetry repository. See: -/// // https://github.com/open-telemetry/opentelemetry-rust/blob/c13a11e62a68eacd8c41a0742a0d097808e28fbd/examples/basic-otlp/src/main.rs#L39-L53 -/// # let push_controller: PushController = unimplemented!(); +/// // https://github.com/open-telemetry/opentelemetry-rust/blob/d4b9befea04bcc7fc19319a6ebf5b5070131c486/examples/basic-otlp/src/main.rs#L35-L52 +/// # let controller: BasicController = unimplemented!(); /// -/// let opentelemetry_metrics = MetricsLayer::new(push_controller); +/// let opentelemetry_metrics = MetricsLayer::new(controller); /// let subscriber = Registry::default().with(opentelemetry_metrics); /// tracing::subscriber::set_global_default(subscriber).unwrap(); /// ``` @@ -329,10 +343,9 @@ pub struct MetricsLayer { impl MetricsLayer { /// Create a new instance of MetricsLayer. - pub fn new(push_controller: PushController) -> Self { - let meter = push_controller - .provider() - .meter(INSTRUMENTATION_LIBRARY_NAME, Some(CARGO_PKG_VERSION)); + pub fn new(controller: BasicController) -> Self { + let meter = + controller.versioned_meter(INSTRUMENTATION_LIBRARY_NAME, Some(CARGO_PKG_VERSION), None); MetricsLayer { meter, instruments: Default::default(), diff --git a/tracing-opentelemetry/src/tracer.rs b/tracing-opentelemetry/src/tracer.rs index 9cd6d378d5..66c52fea21 100644 --- a/tracing-opentelemetry/src/tracer.rs +++ b/tracing-opentelemetry/src/tracer.rs @@ -1,9 +1,10 @@ -use opentelemetry::sdk::trace::{SamplingDecision, SamplingResult, Tracer, TracerProvider}; +use opentelemetry::sdk::trace::{Tracer, TracerProvider}; +use opentelemetry::trace::OrderMap; use opentelemetry::{ trace as otel, trace::{ - noop, SpanBuilder, SpanContext, SpanId, SpanKind, TraceContextExt, TraceFlags, TraceId, - TraceState, + noop, SamplingDecision, SamplingResult, SpanBuilder, SpanContext, SpanId, SpanKind, + TraceContextExt, TraceFlags, TraceId, TraceState, }, Context as OtelContext, }; @@ -74,19 +75,18 @@ impl PreSampledTracer for Tracer { let builder = &mut data.builder; // Gather trace state - let (no_parent, trace_id, remote_parent, parent_trace_flags) = - current_trace_state(builder, parent_cx, &provider); + let (trace_id, parent_trace_flags) = current_trace_state(builder, parent_cx, &provider); // Sample or defer to existing sampling decisions let (flags, trace_state) = if let Some(result) = &builder.sampling_result { process_sampling_result(result, parent_trace_flags) - } else if no_parent || remote_parent { + } else { builder.sampling_result = Some(provider.config().sampler.should_sample( Some(parent_cx), trace_id, &builder.name, builder.span_kind.as_ref().unwrap_or(&SpanKind::Internal), - builder.attributes.as_deref().unwrap_or(&[]), + builder.attributes.as_ref().unwrap_or(&OrderMap::default()), builder.links.as_deref().unwrap_or(&[]), self.instrumentation_library(), )); @@ -95,12 +95,6 @@ impl PreSampledTracer for Tracer { builder.sampling_result.as_ref().unwrap(), parent_trace_flags, ) - } else { - // has parent that is local - Some(( - parent_trace_flags, - parent_cx.span().span_context().trace_state().clone(), - )) } .unwrap_or_default(); @@ -126,18 +120,16 @@ fn current_trace_state( builder: &SpanBuilder, parent_cx: &OtelContext, provider: &TracerProvider, -) -> (bool, TraceId, bool, TraceFlags) { +) -> (TraceId, TraceFlags) { if parent_cx.has_active_span() { let span = parent_cx.span(); let sc = span.span_context(); - (false, sc.trace_id(), sc.is_remote(), sc.trace_flags()) + (sc.trace_id(), sc.trace_flags()) } else { ( - true, builder .trace_id .unwrap_or_else(|| provider.config().id_generator.new_trace_id()), - false, Default::default(), ) } diff --git a/tracing-opentelemetry/tests/metrics_publishing.rs b/tracing-opentelemetry/tests/metrics_publishing.rs index 9db53fcb3f..284005e538 100644 --- a/tracing-opentelemetry/tests/metrics_publishing.rs +++ b/tracing-opentelemetry/tests/metrics_publishing.rs @@ -1,26 +1,21 @@ -#![cfg(feature = "metrics")] -use async_trait::async_trait; -use futures_util::{Stream, StreamExt as _}; use opentelemetry::{ - metrics::{Descriptor, InstrumentKind}, - metrics::{Number, NumberKind}, + metrics::MetricsError, sdk::{ - export::{ - metrics::{ - CheckpointSet, ExportKind, ExportKindFor, ExportKindSelector, - Exporter as MetricsExporter, Points, Sum, - }, - trace::{SpanData, SpanExporter}, + export::metrics::{ + aggregation::{self, Histogram, Sum, TemporalitySelector}, + InstrumentationLibraryReader, }, metrics::{ - aggregators::{ArrayAggregator, SumAggregator}, - selectors::simple::Selector, + aggregators::{HistogramAggregator, SumAggregator}, + controllers::BasicController, + processors, + sdk_api::{Descriptor, InstrumentKind, Number, NumberKind}, + selectors, }, }, - Key, Value, + Context, }; use std::cmp::Ordering; -use std::time::Duration; use tracing::Subscriber; use tracing_opentelemetry::MetricsLayer; use tracing_subscriber::prelude::*; @@ -30,7 +25,7 @@ const INSTRUMENTATION_LIBRARY_NAME: &str = "tracing/tracing-opentelemetry"; #[tokio::test] async fn u64_counter_is_exported() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "hello_world".to_string(), InstrumentKind::Counter, NumberKind::U64, @@ -40,11 +35,13 @@ async fn u64_counter_is_exported() { tracing::subscriber::with_default(subscriber, || { tracing::info!(monotonic_counter.hello_world = 1_u64); }); + + exporter.export().unwrap(); } #[tokio::test] async fn u64_counter_is_exported_i64_at_instrumentation_point() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "hello_world2".to_string(), InstrumentKind::Counter, NumberKind::U64, @@ -54,11 +51,13 @@ async fn u64_counter_is_exported_i64_at_instrumentation_point() { tracing::subscriber::with_default(subscriber, || { tracing::info!(monotonic_counter.hello_world2 = 1_i64); }); + + exporter.export().unwrap(); } #[tokio::test] async fn f64_counter_is_exported() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "float_hello_world".to_string(), InstrumentKind::Counter, NumberKind::F64, @@ -68,11 +67,13 @@ async fn f64_counter_is_exported() { tracing::subscriber::with_default(subscriber, || { tracing::info!(monotonic_counter.float_hello_world = 1.000000123_f64); }); + + exporter.export().unwrap(); } #[tokio::test] async fn i64_up_down_counter_is_exported() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "pebcak".to_string(), InstrumentKind::UpDownCounter, NumberKind::I64, @@ -82,11 +83,13 @@ async fn i64_up_down_counter_is_exported() { tracing::subscriber::with_default(subscriber, || { tracing::info!(counter.pebcak = -5_i64); }); + + exporter.export().unwrap(); } #[tokio::test] async fn i64_up_down_counter_is_exported_u64_at_instrumentation_point() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "pebcak2".to_string(), InstrumentKind::UpDownCounter, NumberKind::I64, @@ -96,11 +99,13 @@ async fn i64_up_down_counter_is_exported_u64_at_instrumentation_point() { tracing::subscriber::with_default(subscriber, || { tracing::info!(counter.pebcak2 = 5_u64); }); + + exporter.export().unwrap(); } #[tokio::test] async fn f64_up_down_counter_is_exported() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "pebcak_blah".to_string(), InstrumentKind::UpDownCounter, NumberKind::F64, @@ -110,13 +115,15 @@ async fn f64_up_down_counter_is_exported() { tracing::subscriber::with_default(subscriber, || { tracing::info!(counter.pebcak_blah = 99.123_f64); }); + + exporter.export().unwrap(); } #[tokio::test] -async fn u64_value_is_exported() { - let subscriber = init_subscriber( +async fn u64_histogram_is_exported() { + let (subscriber, exporter) = init_subscriber( "abcdefg".to_string(), - InstrumentKind::ValueRecorder, + InstrumentKind::Histogram, NumberKind::U64, Number::from(9_u64), ); @@ -124,13 +131,15 @@ async fn u64_value_is_exported() { tracing::subscriber::with_default(subscriber, || { tracing::info!(value.abcdefg = 9_u64); }); + + exporter.export().unwrap(); } #[tokio::test] -async fn i64_value_is_exported() { - let subscriber = init_subscriber( +async fn i64_histogram_is_exported() { + let (subscriber, exporter) = init_subscriber( "abcdefg_auenatsou".to_string(), - InstrumentKind::ValueRecorder, + InstrumentKind::Histogram, NumberKind::I64, Number::from(-19_i64), ); @@ -138,13 +147,15 @@ async fn i64_value_is_exported() { tracing::subscriber::with_default(subscriber, || { tracing::info!(value.abcdefg_auenatsou = -19_i64); }); + + exporter.export().unwrap(); } #[tokio::test] -async fn f64_value_is_exported() { - let subscriber = init_subscriber( +async fn f64_histogram_is_exported() { + let (subscriber, exporter) = init_subscriber( "abcdefg_racecar".to_string(), - InstrumentKind::ValueRecorder, + InstrumentKind::Histogram, NumberKind::F64, Number::from(777.0012_f64), ); @@ -152,6 +163,8 @@ async fn f64_value_is_exported() { tracing::subscriber::with_default(subscriber, || { tracing::info!(value.abcdefg_racecar = 777.0012_f64); }); + + exporter.export().unwrap(); } fn init_subscriber( @@ -159,24 +172,25 @@ fn init_subscriber( expected_instrument_kind: InstrumentKind, expected_number_kind: NumberKind, expected_value: Number, -) -> impl Subscriber + 'static { +) -> (impl Subscriber + 'static, TestExporter) { + let controller = opentelemetry::sdk::metrics::controllers::basic(processors::factory( + selectors::simple::histogram(vec![-10.0, 100.0]), + aggregation::cumulative_temporality_selector(), + )) + .build(); + let exporter = TestExporter { expected_metric_name, expected_instrument_kind, expected_number_kind, expected_value, + controller: controller.clone(), }; - let push_controller = opentelemetry::sdk::metrics::controllers::push( - Selector::Exact, - ExportKindSelector::Stateless, + ( + tracing_subscriber::registry().with(MetricsLayer::new(controller)), exporter, - tokio::spawn, - delayed_interval, ) - .build(); - - tracing_subscriber::registry().with(MetricsLayer::new(push_controller)) } #[derive(Clone, Debug)] @@ -185,100 +199,84 @@ struct TestExporter { expected_instrument_kind: InstrumentKind, expected_number_kind: NumberKind, expected_value: Number, + controller: BasicController, } -#[async_trait] -impl SpanExporter for TestExporter { - async fn export( - &mut self, - mut _batch: Vec, - ) -> opentelemetry::sdk::export::trace::ExportResult { - Ok(()) - } -} +impl TestExporter { + fn export(&self) -> Result<(), MetricsError> { + self.controller.collect(&Context::current())?; + self.controller.try_for_each(&mut |library, reader| { + reader.try_for_each(self, &mut |record| { + assert_eq!(self.expected_metric_name, record.descriptor().name()); + assert_eq!( + self.expected_instrument_kind, + *record.descriptor().instrument_kind() + ); + assert_eq!( + self.expected_number_kind, + *record.descriptor().number_kind() + ); + match self.expected_instrument_kind { + InstrumentKind::Counter | InstrumentKind::UpDownCounter => { + let number = record + .aggregator() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .sum() + .unwrap(); + + assert_eq!( + Ordering::Equal, + number + .partial_cmp(&NumberKind::U64, &self.expected_value) + .unwrap() + ); + } + InstrumentKind::Histogram => { + let histogram = record + .aggregator() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .histogram() + .unwrap(); + + let counts = histogram.counts(); + if dbg!(self.expected_value.to_i64(&self.expected_number_kind)) > 100 { + assert_eq!(counts, &[0.0, 0.0, 1.0]); + } else if self.expected_value.to_i64(&self.expected_number_kind) > 0 { + assert_eq!(counts, &[0.0, 1.0, 0.0]); + } else { + assert_eq!(counts, &[1.0, 0.0, 0.0]); + } + } + _ => panic!( + "InstrumentKind {:?} not currently supported!", + self.expected_instrument_kind + ), + }; -impl MetricsExporter for TestExporter { - fn export(&self, checkpoint_set: &mut dyn CheckpointSet) -> opentelemetry::metrics::Result<()> { - checkpoint_set.try_for_each(self, &mut |record| { - assert_eq!(self.expected_metric_name, record.descriptor().name()); - assert_eq!( - self.expected_instrument_kind, - *record.descriptor().instrument_kind() - ); - assert_eq!( - self.expected_number_kind, - *record.descriptor().number_kind() - ); - let number = match self.expected_instrument_kind { - InstrumentKind::Counter | InstrumentKind::UpDownCounter => record - .aggregator() - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .sum() - .unwrap(), - InstrumentKind::ValueRecorder => record - .aggregator() - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .points() - .unwrap()[0] - .clone(), - _ => panic!( - "InstrumentKind {:?} not currently supported!", - self.expected_instrument_kind - ), - }; - assert_eq!( - Ordering::Equal, - number - .partial_cmp(&NumberKind::U64, &self.expected_value) - .unwrap() - ); - - // The following are the same regardless of the individual metric. - assert_eq!( - INSTRUMENTATION_LIBRARY_NAME, - record.descriptor().instrumentation_library().name - ); - assert_eq!( - CARGO_PKG_VERSION, - record.descriptor().instrumentation_version().unwrap() - ); - assert_eq!( - Value::String("unknown_service".into()), - record - .resource() - .get(Key::new("service.name".to_string())) - .unwrap() - ); - - opentelemetry::metrics::Result::Ok(()) + // The following are the same regardless of the individual metric. + assert_eq!(INSTRUMENTATION_LIBRARY_NAME, library.name); + assert_eq!(CARGO_PKG_VERSION, library.version.as_ref().unwrap()); + + Ok(()) + }) }) } } -impl ExportKindFor for TestExporter { - fn export_kind_for(&self, _descriptor: &Descriptor) -> ExportKind { +impl TemporalitySelector for TestExporter { + fn temporality_for( + &self, + _descriptor: &Descriptor, + _kind: &aggregation::AggregationKind, + ) -> aggregation::Temporality { // I don't think the value here makes a difference since // we are just testing a single metric. - ExportKind::Cumulative + aggregation::Temporality::Cumulative } } - -// From opentelemetry::sdk::util:: -// For some reason I can't pull it in from the other crate, it gives -// could not find `util` in `sdk` -/// Helper which wraps `tokio::time::interval` and makes it return a stream -fn tokio_interval_stream(period: std::time::Duration) -> tokio_stream::wrappers::IntervalStream { - tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(period)) -} - -// https://github.com/open-telemetry/opentelemetry-rust/blob/2585d109bf90d53d57c91e19c758dca8c36f5512/examples/basic-otlp/src/main.rs#L34-L37 -// Skip first immediate tick from tokio, not needed for async_std. -fn delayed_interval(duration: Duration) -> impl Stream { - tokio_interval_stream(duration).skip(0) -} diff --git a/tracing-opentelemetry/tests/trace_state_propagation.rs b/tracing-opentelemetry/tests/trace_state_propagation.rs index 0f92bc47aa..49200b4fc9 100644 --- a/tracing-opentelemetry/tests/trace_state_propagation.rs +++ b/tracing-opentelemetry/tests/trace_state_propagation.rs @@ -1,8 +1,8 @@ -use async_trait::async_trait; +use futures_util::future::BoxFuture; use opentelemetry::{ propagation::TextMapPropagator, sdk::{ - export::trace::{SpanData, SpanExporter}, + export::trace::{ExportResult, SpanData, SpanExporter}, propagation::{BaggagePropagator, TextMapCompositePropagator, TraceContextPropagator}, trace::{Tracer, TracerProvider}, }, @@ -158,15 +158,14 @@ fn build_sampled_context() -> (Context, impl Subscriber, TestExporter, TracerPro #[derive(Clone, Default, Debug)] struct TestExporter(Arc>>); -#[async_trait] impl SpanExporter for TestExporter { - async fn export( - &mut self, - mut batch: Vec, - ) -> opentelemetry::sdk::export::trace::ExportResult { - if let Ok(mut inner) = self.0.lock() { - inner.append(&mut batch); - } - Ok(()) + fn export(&mut self, mut batch: Vec) -> BoxFuture<'static, ExportResult> { + let spans = self.0.clone(); + Box::pin(async move { + if let Ok(mut inner) = spans.lock() { + inner.append(&mut batch); + } + Ok(()) + }) } }