From 61dfb7eee13cc5728db792de47d8d7952f335fd4 Mon Sep 17 00:00:00 2001 From: Julian Tescher Date: Mon, 20 Apr 2020 09:07:08 -0700 Subject: [PATCH 1/5] Add Context API This provides an implementation of the [Context API Spec](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/context/context.md). A context is a propagation mechanism which carries execution-scoped values across API boundaries and between logically associated execution units. Cross-cutting concerns access their data in-process using the same shared `Context` object. It is implemented as an immutable thread-local collection of heterogeneous values that can be inserted and retrieved by their type. A thread's current context can be assigned via the `attach` method. The previous context is restored as the current context when the resulting context guard is dropped, allowing precise control and nesting of active context scopes. --- src/api/context/mod.rs | 375 +++++++++++++++++++++++++++++++++++++++++ src/api/mod.rs | 2 + 2 files changed, 377 insertions(+) create mode 100644 src/api/context/mod.rs diff --git a/src/api/context/mod.rs b/src/api/context/mod.rs new file mode 100644 index 0000000000..eb50a537a1 --- /dev/null +++ b/src/api/context/mod.rs @@ -0,0 +1,375 @@ +//! # OpenTelemetry Context API +//! +//! A [`Context`] is a propagation mechanism which carries execution-scoped +//! values across API boundaries and between logically associated execution +//! units. Cross-cutting concerns access their data in-process using the same +//! shared context object. +//! +//! [`Context`]s are immutable, and their write operations result in the creation +//! of a new context containing the original values and the new specified values. +//! +//! ## Context state +//! +//! Concerns can create and retrieve their local state in the current execution +//! state represented by a context through the [`get`] and [`with_value`] +//! methods. It is recommended to use application-specific types when storing new +//! context values to avoid unintentionally overwriting existing state. +//! +//! ## Managing the current context +//! +//! Contexts can be associated with the caller's current execution unit on a +//! given thread via the [`attach`] method, and previous contexts can be restored +//! by dropping the returned [`ContextGuard`]. Context can be nested, and will +//! restore their parent outer context when detached on drop. To access the +//! values of the context, a snapshot can be created via the [`Context::current`] +//! method. +//! +//! [`Context`]: struct.Context.html +//! [`Context::current`]: struct.Context.html#method.current +//! [`ContextGuard`]: struct.ContextGuard.html +//! [`get`]: struct.Context.html#method.get +//! [`with_value`]: struct.Context.html#method.with_value +//! [`attach`]: struct.Context.html#method.attach +//! +//! # Examples +//! +//! ``` +//! use opentelemetry::api::Context; +//! +//! // Application-specific `a` and `b` values +//! #[derive(Debug, PartialEq)] +//! struct ValueA(&'static str); +//! #[derive(Debug, PartialEq)] +//! struct ValueB(u64); +//! +//! let _outer_guard = Context::new().with_value(ValueA("a")).attach(); +//! +//! // Only value a has been set +//! let current = Context::current(); +//! assert_eq!(current.get::(), Some(&ValueA("a"))); +//! assert_eq!(current.get::(), None); +//! +//! { +//! let _inner_guard = Context::current_with_value(ValueB(42)).attach(); +//! // Both values are set in inner context +//! let current = Context::current(); +//! assert_eq!(current.get::(), Some(&ValueA("a"))); +//! assert_eq!(current.get::(), Some(&ValueB(42))); +//! } +//! +//! // Resets to only the `a` value when inner guard is dropped +//! let current = Context::current(); +//! assert_eq!(current.get::(), Some(&ValueA("a"))); +//! assert_eq!(current.get::(), None); +//! ``` + +use std::any::{Any, TypeId}; +use std::cell::RefCell; +use std::collections::HashMap; +use std::fmt; +use std::hash::{BuildHasherDefault, Hasher}; +use std::rc::Rc; + +thread_local! { + static CURRENT_CONTEXT: RefCell = RefCell::new(Context::default()); + static DEFAULT_CONTEXT: Context = Context::default(); +} + +/// An execution-scoped collection of values. +#[derive(Clone, Default)] +pub struct Context { + entries: HashMap, BuildHasherDefault>, +} + +impl Context { + /// Creates an empty `Context`. + /// + /// The context is initially created with a capacity of 0, so it will not + /// allocate. Use [`with_value`] to create a new context that has entries. + /// + /// [`with_value`]: struct.Context.html#method.with_value + pub fn new() -> Self { + Context::default() + } + + /// Returns an immutable snapshot of the current thread's context. + /// + /// # Examples + /// + /// ``` + /// use opentelemetry::api::Context; + /// + /// #[derive(Debug, PartialEq)] + /// struct ValueA(&'static str); + /// + /// fn do_work() { + /// assert_eq!(Context::current().get(), Some(&ValueA("a"))); + /// } + /// + /// let _guard = Context::new().with_value(ValueA("a")).attach(); + /// do_work() + /// ``` + pub fn current() -> Self { + get_current(|cx| cx.clone()) + } + + /// Returns a clone of the current thread's context with the given value. + /// + /// This is a more efficient form of `Context::current().with_value(value)` + /// as it avoids the intermediate context clone. + /// + /// # Examples + /// + /// ``` + /// use opentelemetry::api::Context; + /// + /// // Given some value types defined in your application + /// #[derive(Debug, PartialEq)] + /// struct ValueA(&'static str); + /// #[derive(Debug, PartialEq)] + /// struct ValueB(u64); + /// + /// // You can create and attach context with the first value set to "a" + /// let _guard = Context::new().with_value(ValueA("a")).attach(); + /// + /// // And create another context based on the fist with a new value + /// let all_current_and_b = Context::current_with_value(ValueB(42)); + /// + /// // The second context now contains all the current values and the addition + /// assert_eq!(all_current_and_b.get::(), Some(&ValueA("a"))); + /// assert_eq!(all_current_and_b.get::(), Some(&ValueB(42))); + /// ``` + pub fn current_with_value(value: T) -> Self { + let mut value = Some(value); + get_current(|cx| cx.with_value(value.take().unwrap())) + } + + /// Returns a reference to the entry for the corresponding value type. + /// + /// # Examples + /// + /// ``` + /// use opentelemetry::api::Context; + /// + /// // Given some value types defined in your application + /// #[derive(Debug, PartialEq)] + /// struct ValueA(&'static str); + /// #[derive(Debug, PartialEq)] + /// struct MyUser(); + /// + /// let cx = Context::new().with_value(ValueA("a")); + /// + /// // Values can be queried by type + /// assert_eq!(cx.get::(), Some(&ValueA("a"))); + /// + /// // And return none if not yet set + /// assert_eq!(cx.get::(), None); + /// ``` + pub fn get(&self) -> Option<&T> { + self.entries + .get(&TypeId::of::()) + .and_then(|rc| (&*rc).downcast_ref()) + } + + /// Returns a copy of the context with the new value included. + /// + /// # Examples + /// + /// ``` + /// use opentelemetry::api::Context; + /// + /// // Given some value types defined in your application + /// #[derive(Debug, PartialEq)] + /// struct ValueA(&'static str); + /// #[derive(Debug, PartialEq)] + /// struct ValueB(u64); + /// + /// // You can create a context with the first value set to "a" + /// let cx_with_a = Context::new().with_value(ValueA("a")); + /// + /// // And create another context based on the fist with a new value + /// let cx_with_a_and_b = cx_with_a.with_value(ValueB(42)); + /// + /// // The first context is still available and unmodified + /// assert_eq!(cx_with_a.get::(), Some(&ValueA("a"))); + /// assert_eq!(cx_with_a.get::(), None); + /// + /// // The second context now contains both values + /// assert_eq!(cx_with_a_and_b.get::(), Some(&ValueA("a"))); + /// assert_eq!(cx_with_a_and_b.get::(), Some(&ValueB(42))); + /// ``` + pub fn with_value(&self, value: T) -> Self { + let mut new_context = self.clone(); + new_context + .entries + .insert(TypeId::of::(), Rc::new(value)); + + new_context + } + + /// Replaces the current context on this thread with this context. + /// + /// Dropping the returned [`ContextGuard`] will reset the current context to the + /// previous value. + /// + /// [`ContextGuard`]: struct.ContextGuard.html + /// + /// # Examples + /// + /// ``` + /// use opentelemetry::api::Context; + /// + /// #[derive(Debug, PartialEq)] + /// struct ValueA(&'static str); + /// + /// let my_cx = Context::new().with_value(ValueA("a")); + /// + /// // Set the current thread context + /// let cx_guard = my_cx.attach(); + /// assert_eq!(Context::current().get::(), Some(&ValueA("a"))); + /// + /// // Drop the guard to restore the previous context + /// drop(cx_guard); + /// assert_eq!(Context::current().get::(), None); + /// ``` + /// + /// Guards do not need to be explicitly dropped: + /// + /// ``` + /// use opentelemetry::api::Context; + /// + /// #[derive(Debug, PartialEq)] + /// struct ValueA(&'static str); + /// + /// fn my_function() -> String { + /// // attach a context the duration of this function. + /// let my_cx = Context::new().with_value(ValueA("a")); + /// // NOTE: a variable name after the underscore is **required** or rust + /// // will drop the guard, restoring the previous context _immediately_. + /// let _guard = my_cx.attach(); + /// + /// // anything happening in functions we call can still access my_cx... + /// my_other_function(); + /// + /// // returning from the function drops the guard, exiting the span. + /// return "Hello world".to_owned(); + /// } + /// + /// fn my_other_function() { + /// // ... + /// } + /// ``` + /// Sub-scopes may be created to limit the duration for which the span is + /// entered: + /// + /// ``` + /// use opentelemetry::api::Context; + /// + /// #[derive(Debug, PartialEq)] + /// struct ValueA(&'static str); + /// + /// let my_cx = Context::new().with_value(ValueA("a")); + /// + /// { + /// let _guard = my_cx.attach(); + /// + /// // the current context can access variables in + /// assert_eq!(Context::current().get::(), Some(&ValueA("a"))); + /// + /// // exiting the scope drops the guard, detaching the context. + /// } + /// + /// // this is back in the default empty context + /// assert_eq!(Context::current().get::(), None); + /// ``` + pub fn attach(self) -> ContextGuard { + let prior = CURRENT_CONTEXT + .try_with(|current| current.replace(self)) + .ok(); + ContextGuard(prior) + } +} + +impl fmt::Debug for Context { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Context") + .field("entries", &self.entries.len()) + .finish() + } +} + +/// A guard that resets the current context to the prior context when dropped. +#[allow(missing_debug_implementations)] +pub struct ContextGuard(Option); + +impl Drop for ContextGuard { + fn drop(&mut self) { + if let Some(previous_cx) = self.0.take() { + let _ = CURRENT_CONTEXT.try_with(|current| current.replace(previous_cx)); + } + } +} + +/// Executes a closure with a reference to this thread's current context. +/// +/// Note: This function will panic if you attempt to attach another context +/// while the context is still borrowed. +fn get_current T, T>(mut f: F) -> T { + CURRENT_CONTEXT + .try_with(|cx| f(&*cx.borrow())) + .unwrap_or_else(|_| DEFAULT_CONTEXT.with(|cx| f(&*cx))) +} + +/// With TypeIds as keys, there's no need to hash them. They are already hashes +/// themselves, coming from the compiler. The IdHasher holds the u64 of +/// the TypeId, and then returns it, instead of doing any bit fiddling. +#[derive(Clone, Default, Debug)] +struct IdHasher(u64); + +impl Hasher for IdHasher { + fn write(&mut self, _: &[u8]) { + unreachable!("TypeId calls write_u64"); + } + + #[inline] + fn write_u64(&mut self, id: u64) { + self.0 = id; + } + + #[inline] + fn finish(&self) -> u64 { + self.0 + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn nested_contexts() { + #[derive(Debug, PartialEq)] + struct ValueA(&'static str); + #[derive(Debug, PartialEq)] + struct ValueB(u64); + let _outer_guard = Context::new().with_value(ValueA("a")).attach(); + + // Only value `a` is set + let current = Context::current(); + assert_eq!(current.get(), Some(&ValueA("a"))); + assert_eq!(current.get::(), None); + + { + let _inner_guard = Context::current_with_value(ValueB(42)).attach(); + // Both values are set in inner context + let current = Context::current(); + assert_eq!(current.get(), Some(&ValueA("a"))); + assert_eq!(current.get(), Some(&ValueB(42))); + } + + // Resets to only value `a` when inner guard is dropped + let current = Context::current(); + assert_eq!(current.get(), Some(&ValueA("a"))); + assert_eq!(current.get::(), None); + } +} diff --git a/src/api/mod.rs b/src/api/mod.rs index 3eafc3d6c2..0e4a82bb58 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -13,12 +13,14 @@ //! In order to enable telemetry the application must take a dependency on the OpenTelemetry SDK, //! which implements the delivery of the telemetry. The application must also configure exporters //! so that the SDK knows where and how to deliver the telemetry. +pub mod context; pub mod core; pub mod metrics; pub mod propagation; pub mod trace; pub use self::core::{Key, KeyValue, Unit, Value}; +pub use context::Context; pub use metrics::{ counter::{Counter, CounterHandle}, gauge::{Gauge, GaugeHandle}, From aa247ce81a78eaf351144b78e2700fb2c6e5cd34 Mon Sep 17 00:00:00 2001 From: Julian Tescher Date: Tue, 21 Apr 2020 11:21:50 -0700 Subject: [PATCH 2/5] Tracing via Context API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This change moves the storage and management of active spans from the current `SpanStack` implementation to the new `Context` api, as well as updating the propagation APIs to inject and extract contexts to remain compatible with the tracers. This has some breaking API changes to the way tracing is currently done, most notable is that the `Tracer`'s `start` method no longer requires an optional parent span context as that can now be provided by the current context. The previous tracing api was: ```rust // extract let remote_span_context = propagator.extract(&carrier); // start let parent = tracer.start("parent", Some(remote_context)); tracer.mark_span_as_active(&parent); // nest (note: `Some(parent.get_context())` and `None` had the same effect here) let child = tracer.start("child", None); tracer.mark_span_as_active(&child) // inject propagator.inject(child.get_context(), &mut carrier); // Marking as inactive was required and error-prone trace.mark_span_as_inactive(&child); trace.mark_span_as_inactive(&parent); ``` And the new API is: ```rust // extract let _attach = propagator.extract(&carrier).attach(); // start let parent = tracer.start("parent"); let _parent_active = tracer.mark_span_as_active(parent); // start let child = tracer.start("child"); let _child_active = tracer.mark_span_as_active(parent); // inject propagator.inject(&mut carrier) ``` Additional changes to facilitate the switch: * `tracer.with_span` now accepts a span for naming consistency and managing the active state of a more complex span (likely produced by a builder), and the previous functionality that accepts a `&str` has been renamed to `in_span`, both of which now yield a context to the provided closure. * The `Instrument` trait has been renamed to `FutureExt` to avoid clashing with metric instruments, and accepts contexts. * `TracerGenerics` methods have been folded in to the `Tracer` trait so the trait is no longer needed 🎉 . * A `TraceContextExt` trait provides methods for working with trace data in a context. Most notably `context.span()` returns a `&dyn api::Span` reference, and `Context::current_with_span(span)` creates a clone of the current context with the span added. * Span's managing their own active state is no longer needed 🎉. * Span's `get_context` method has been renamed to `span_context` to avoid the ambiguity. --- Cargo.toml | 2 +- benches/trace.rs | 12 +- examples/actix/src/main.rs | 14 +- examples/async/src/main.rs | 24 +- examples/basic/src/main.rs | 8 +- examples/grpc/src/client.rs | 14 +- examples/grpc/src/server.rs | 4 +- examples/http/src/client.rs | 12 +- examples/http/src/server.rs | 7 +- examples/stdout.rs | 4 +- examples/zipkin/src/main.rs | 6 +- opentelemetry-jaeger/src/lib.rs | 13 +- opentelemetry-zipkin/src/lib.rs | 15 +- src/api/context/mod.rs | 2 + .../propagation/base64_format.rs | 4 +- .../propagation/binary_propagator.rs | 0 src/api/{ => context}/propagation/mod.rs | 1 - .../context/propagation/text_propagator.rs | 34 ++ src/api/mod.rs | 16 +- src/api/propagation/noop.rs | 36 -- src/api/propagation/text_propagator.rs | 19 - src/api/trace/b3_propagator.rs | 87 ++-- src/api/trace/context.rs | 78 ++++ src/api/trace/futures.rs | 125 ++++-- src/api/trace/mod.rs | 1 + src/api/trace/noop.rs | 47 +-- src/api/trace/span.rs | 21 +- src/api/trace/span_processor.rs | 4 +- src/api/trace/trace_context_propagator.rs | 50 ++- src/api/trace/tracer.rs | 370 ++++++++++++++---- src/exporter/trace/mod.rs | 9 +- src/exporter/trace/stdout.rs | 6 - src/global.rs | 121 +----- src/lib.rs | 4 +- src/sdk/resource.rs | 2 +- src/sdk/trace/span.rs | 36 +- src/sdk/trace/span_processor.rs | 2 +- src/sdk/trace/tracer.rs | 162 ++------ 38 files changed, 771 insertions(+), 601 deletions(-) rename src/api/{ => context}/propagation/base64_format.rs (95%) rename src/api/{ => context}/propagation/binary_propagator.rs (100%) rename src/api/{ => context}/propagation/mod.rs (99%) create mode 100644 src/api/context/propagation/text_propagator.rs delete mode 100644 src/api/propagation/noop.rs delete mode 100644 src/api/propagation/text_propagator.rs create mode 100644 src/api/trace/context.rs diff --git a/Cargo.toml b/Cargo.toml index def1c1c144..639ba87b57 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ bincode = { version = "1.2.1", optional = true } [dev-dependencies] criterion = "0.3.1" -tokio = "0.2" +tokio = { version = "0.2", features = ["full"] } [features] default = ["metrics", "trace"] diff --git a/benches/trace.rs b/benches/trace.rs index 161cfe6c40..c9ba99ff35 100644 --- a/benches/trace.rs +++ b/benches/trace.rs @@ -5,12 +5,10 @@ use opentelemetry::{ }; fn criterion_benchmark(c: &mut Criterion) { - trace_benchmark_group(c, "start-end-span", |tracer| { - tracer.start("foo", None).end() - }); + trace_benchmark_group(c, "start-end-span", |tracer| tracer.start("foo").end()); trace_benchmark_group(c, "start-end-span-4-attrs", |tracer| { - let span = tracer.start("foo", None); + let span = tracer.start("foo"); span.set_attribute(Key::new("key1").bool(false)); span.set_attribute(Key::new("key2").string("hello")); span.set_attribute(Key::new("key3").u64(123)); @@ -19,7 +17,7 @@ fn criterion_benchmark(c: &mut Criterion) { }); trace_benchmark_group(c, "start-end-span-8-attrs", |tracer| { - let span = tracer.start("foo", None); + let span = tracer.start("foo"); span.set_attribute(Key::new("key1").bool(false)); span.set_attribute(Key::new("key2").string("hello")); span.set_attribute(Key::new("key3").u64(123)); @@ -32,7 +30,7 @@ fn criterion_benchmark(c: &mut Criterion) { }); trace_benchmark_group(c, "start-end-span-all-attr-types", |tracer| { - let span = tracer.start("foo", None); + let span = tracer.start("foo"); span.set_attribute(Key::new("key1").bool(false)); span.set_attribute(Key::new("key2").string("hello")); span.set_attribute(Key::new("key3").i64(123)); @@ -45,7 +43,7 @@ fn criterion_benchmark(c: &mut Criterion) { }); trace_benchmark_group(c, "start-end-span-all-attr-types-2x", |tracer| { - let span = tracer.start("foo", None); + let span = tracer.start("foo"); span.set_attribute(Key::new("key1").bool(false)); span.set_attribute(Key::new("key2").string("hello")); span.set_attribute(Key::new("key3").i64(123)); diff --git a/examples/actix/src/main.rs b/examples/actix/src/main.rs index 5b6ff2f5ea..b7e50e51c0 100644 --- a/examples/actix/src/main.rs +++ b/examples/actix/src/main.rs @@ -1,9 +1,8 @@ -use opentelemetry::api::{Key, Span, TracerGenerics}; -use opentelemetry::{global, sdk}; - use actix_service::Service; use actix_web::{web, App, HttpServer}; use futures::future::Future; +use opentelemetry::api::{Key, TraceContextExt, Tracer}; +use opentelemetry::{global, sdk}; fn init_tracer() -> thrift::Result<()> { let exporter = opentelemetry_jaeger::Exporter::builder() @@ -30,9 +29,8 @@ fn init_tracer() -> thrift::Result<()> { fn index() -> &'static str { let tracer = global::tracer("request"); - - tracer.with_span("index", move |span| { - span.set_attribute(Key::new("parameter").i64(10)); + tracer.in_span("index", |ctx| { + ctx.span().set_attribute(Key::new("parameter").i64(10)); "Index" }) } @@ -44,8 +42,8 @@ fn main() -> thrift::Result<()> { App::new() .wrap_fn(|req, srv| { let tracer = global::tracer("request"); - tracer.with_span("middleware", move |span| { - span.set_attribute(Key::new("path").string(req.path())); + tracer.in_span("middleware", move |cx| { + cx.span().set_attribute(Key::new("path").string(req.path())); srv.call(req).map(|res| res) }) }) diff --git a/examples/async/src/main.rs b/examples/async/src/main.rs index 5aae6a644c..2a75420f47 100644 --- a/examples/async/src/main.rs +++ b/examples/async/src/main.rs @@ -18,7 +18,7 @@ //! //! [`hello_world`]: https://github.com/tokio-rs/tokio/blob/132e9f1da5965530b63554d7a1c59824c3de4e30/tokio/examples/hello_world.rs use opentelemetry::{ - api::{trace::futures::Instrument, Tracer}, + api::{trace::futures::FutureExt, Context, TraceContextExt, Tracer}, global, sdk, }; use std::time::Duration; @@ -28,24 +28,27 @@ use tokio::net::TcpStream; async fn connect(addr: &SocketAddr) -> io::Result { let tracer = global::tracer("connector"); - let span = tracer.start("Connecting", None); + let span = tracer.start("Connecting"); + let cx = Context::current_with_value(span); - TcpStream::connect(&addr).instrument(span).await + TcpStream::connect(&addr).with_context(cx).await } async fn write(stream: &mut TcpStream) -> io::Result { let tracer = global::tracer("writer"); - let span = tracer.start("Writing", None); + let span = tracer.start("Writing"); + let cx = Context::current_with_span(span); - stream.write(b"hello world\n").instrument(span).await + stream.write(b"hello world\n").with_context(cx).await } async fn run(addr: &SocketAddr) -> io::Result { let tracer = global::tracer("runner"); - let span = tracer.start(&format!("running: {}", addr), None); + let span = tracer.start(&format!("running: {}", addr)); + let cx = Context::current_with_span(span); - let mut stream = connect(addr).instrument(tracer.clone_span(&span)).await?; - write(&mut stream).instrument(span).await + let mut stream = connect(addr).with_context(cx.clone()).await?; + write(&mut stream).with_context(cx).await } fn init_tracer() -> thrift::Result<()> { @@ -83,10 +86,11 @@ pub async fn main() -> Result<(), Box> { let addr = "127.0.0.1:6142".parse()?; let addr2 = "127.0.0.1:6143".parse()?; let tracer = global::tracer("async_example"); - let span = tracer.start("root", None); + let span = tracer.start("root"); + let cx = Context::current_with_span(span); let (run1, run2) = futures::future::join(run(&addr), run(&addr2)) - .instrument(span) + .with_context(cx) .await; run1?; run2?; diff --git a/examples/basic/src/main.rs b/examples/basic/src/main.rs index 25f7267c39..e4977432a8 100644 --- a/examples/basic/src/main.rs +++ b/examples/basic/src/main.rs @@ -1,5 +1,5 @@ use opentelemetry::api::{ - Gauge, GaugeHandle, Key, Measure, MeasureHandle, Meter, MetricOptions, Span, TracerGenerics, + Gauge, GaugeHandle, Key, Measure, MeasureHandle, Meter, MetricOptions, TraceContextExt, Tracer, }; use opentelemetry::{global, sdk}; @@ -54,7 +54,8 @@ fn main() -> thrift::Result<()> { let measure = measure_two.acquire_handle(&common_labels); - global::tracer("component-main").with_span("operation", move |span| { + global::tracer("component-main").in_span("operation", move |cx| { + let span = cx.span(); span.add_event( "Nice operation!".to_string(), vec![Key::new("bogons").i64(100)], @@ -68,7 +69,8 @@ fn main() -> thrift::Result<()> { vec![one_metric.measurement(1.0), measure_two.measurement(2.0)], ); - global::tracer("component-bar").with_span("Sub operation...", move |span| { + global::tracer("component-bar").in_span("Sub operation...", move |cx| { + let span = cx.span(); span.set_attribute(lemons_key.string("five")); span.add_event("Sub span event".to_string(), vec![]); diff --git a/examples/grpc/src/client.rs b/examples/grpc/src/client.rs index 08c03e99f4..fa2265e135 100644 --- a/examples/grpc/src/client.rs +++ b/examples/grpc/src/client.rs @@ -1,6 +1,8 @@ use hello_world::greeter_client::GreeterClient; use hello_world::HelloRequest; -use opentelemetry::api::{HttpTextFormat, KeyValue, Span, TraceContextPropagator, Tracer}; +use opentelemetry::api::{ + Context, HttpTextFormat, KeyValue, TraceContextExt, TraceContextPropagator, Tracer, +}; use opentelemetry::sdk::Sampler; use opentelemetry::{api, global, sdk}; @@ -55,19 +57,17 @@ async fn main() -> Result<(), Box> { tracing_init()?; let mut client = GreeterClient::connect("http://[::1]:50051").await?; let propagator = TraceContextPropagator::new(); - let request_span = global::tracer("client").start("client-request", None); + let span = global::tracer("client").start("client-request"); + let cx = Context::current_with_span(span); let mut request = tonic::Request::new(HelloRequest { name: "Tonic".into(), }); - propagator.inject( - request_span.get_context(), - &mut TonicMetadataMapCarrier(request.metadata_mut()), - ); + propagator.inject_context(&cx, &mut TonicMetadataMapCarrier(request.metadata_mut())); let response = client.say_hello(request).await?; - request_span.add_event( + cx.span().add_event( "response-received".to_string(), vec![KeyValue::new("response", format!("{:?}", response))], ); diff --git a/examples/grpc/src/server.rs b/examples/grpc/src/server.rs index 307a9cde41..162f827235 100644 --- a/examples/grpc/src/server.rs +++ b/examples/grpc/src/server.rs @@ -20,8 +20,8 @@ impl Greeter for MyGreeter { request: Request, // Accept request of type HelloRequest ) -> Result, Status> { let propagator = api::TraceContextPropagator::new(); - let parent = propagator.extract(&HttpHeaderMapCarrier(request.metadata())); - let span = global::tracer("greeter").start("Processing reply", Some(parent)); + let parent_cx = propagator.extract(&HttpHeaderMapCarrier(request.metadata())); + let span = global::tracer("greeter").start_from_context("Processing reply", &parent_cx); span.set_attribute(KeyValue::new("request", format!("{:?}", request))); // Return an instance of type HelloReply diff --git a/examples/http/src/client.rs b/examples/http/src/client.rs index 33a69ece52..d5a94cd4c1 100644 --- a/examples/http/src/client.rs +++ b/examples/http/src/client.rs @@ -1,5 +1,5 @@ use hyper::{body::Body, Client}; -use opentelemetry::api::{HttpTextFormat, Span, Tracer}; +use opentelemetry::api::{Context, HttpTextFormat, TraceContextExt, Tracer}; use opentelemetry::{api, exporter::trace::stdout, global, sdk}; struct ClientHeaderMapCarrier<'a>(&'a mut hyper::header::HeaderMap); @@ -36,16 +36,14 @@ async fn main() -> std::result::Result<(), Box api::Carrier for HttpHeaderMapCarrier<'a> { async fn handle(req: Request) -> Result, Infallible> { let propagator = api::TraceContextPropagator::new(); - let parent_context = propagator.extract(&HttpHeaderMapCarrier(req.headers())); - let span = global::tracer("example/server").start("hello", Some(parent_context)); + let parent_cx = propagator.extract(&HttpHeaderMapCarrier(req.headers())); + let span = global::tracer("example/server").start_from_context("hello", &parent_cx); span.add_event("handling this...".to_string(), Vec::new()); Ok(Response::new("Hello, World!".into())) diff --git a/examples/stdout.rs b/examples/stdout.rs index 084df9977f..571355e744 100644 --- a/examples/stdout.rs +++ b/examples/stdout.rs @@ -1,6 +1,6 @@ use opentelemetry::exporter::trace::stdout; use opentelemetry::{ - api::{Provider, TracerGenerics}, + api::{Provider, Tracer}, global, sdk, }; @@ -21,5 +21,5 @@ fn main() { global::trace_provider() .get_tracer("component-main") - .with_span("operation", move |_span| {}); + .in_span("operation", |_cx| {}); } diff --git a/examples/zipkin/src/main.rs b/examples/zipkin/src/main.rs index 4ed7fb0e09..36f7b10d94 100644 --- a/examples/zipkin/src/main.rs +++ b/examples/zipkin/src/main.rs @@ -1,4 +1,4 @@ -use opentelemetry::api::{Span, Tracer, TracerGenerics}; +use opentelemetry::api::{Span, Tracer}; use opentelemetry::{global, sdk}; use std::thread; use std::time::Duration; @@ -25,7 +25,7 @@ fn init_tracer() { fn bar() { let tracer = global::tracer("component-bar"); - let span = tracer.start("bar", None); + let span = tracer.start("bar"); thread::sleep(Duration::from_millis(6)); span.end() } @@ -34,7 +34,7 @@ fn main() { init_tracer(); let tracer = global::tracer("component-main"); - tracer.with_span("foo", |_span| { + tracer.in_span("foo", |_cx| { thread::sleep(Duration::from_millis(6)); bar(); thread::sleep(Duration::from_millis(6)); diff --git a/opentelemetry-jaeger/src/lib.rs b/opentelemetry-jaeger/src/lib.rs index 2f4a24d828..6f069159ea 100644 --- a/opentelemetry-jaeger/src/lib.rs +++ b/opentelemetry-jaeger/src/lib.rs @@ -105,7 +105,7 @@ use self::thrift::jaeger; use opentelemetry::{api, exporter::trace, sdk}; use std::sync::{Arc, Mutex}; use std::{ - any, net, + net, time::{Duration, SystemTime}, }; @@ -167,11 +167,6 @@ impl trace::SpanExporter for Exporter { /// Ignored for now. fn shutdown(&self) {} - - /// Allows `Exporter` to be downcast from trait object. - fn as_any(&self) -> &dyn any::Any { - self - } } /// Jaeger exporter builder @@ -328,17 +323,17 @@ impl Into for api::Event { impl Into for Arc { /// Convert spans to jaeger thrift span for exporting. fn into(self) -> jaeger::Span { - let trace_id = self.context.trace_id().to_u128(); + let trace_id = self.span_context.trace_id().to_u128(); let trace_id_high = (trace_id >> 64) as i64; let trace_id_low = trace_id as i64; jaeger::Span { trace_id_low, trace_id_high, - span_id: self.context.span_id().to_u64() as i64, + span_id: self.span_context.span_id().to_u64() as i64, parent_span_id: self.parent_span_id.to_u64() as i64, operation_name: self.name.clone(), references: links_to_references(&self.links), - flags: self.context.trace_flags() as i32, + flags: self.span_context.trace_flags() as i32, start_time: self .start_time .duration_since(SystemTime::UNIX_EPOCH) diff --git a/opentelemetry-zipkin/src/lib.rs b/opentelemetry-zipkin/src/lib.rs index 6fd1e36999..f096b2f26f 100644 --- a/opentelemetry-zipkin/src/lib.rs +++ b/opentelemetry-zipkin/src/lib.rs @@ -40,7 +40,6 @@ extern crate typed_builder; mod model; mod uploader; -use core::any; use model::{annotation, endpoint, span}; use opentelemetry::api; use opentelemetry::exporter::trace; @@ -170,10 +169,6 @@ impl trace::SpanExporter for Exporter { } fn shutdown(&self) {} - - fn as_any(&self) -> &dyn any::Any { - self - } } /// Converts `api::Event` into an `annotation::Annotation` @@ -207,9 +202,15 @@ fn into_zipkin_span_kind(kind: api::SpanKind) -> Option { /// be ingested into a Zipkin collector. fn into_zipkin_span(config: &ExporterConfig, span_data: Arc) -> span::Span { span::Span::builder() - .trace_id(format!("{:032x}", span_data.context.trace_id().to_u128())) + .trace_id(format!( + "{:032x}", + span_data.span_context.trace_id().to_u128() + )) .parent_id(format!("{:016x}", span_data.parent_span_id.to_u64())) - .id(format!("{:016x}", span_data.context.span_id().to_u64())) + .id(format!( + "{:016x}", + span_data.span_context.span_id().to_u64() + )) .name(span_data.name.clone()) .kind(into_zipkin_span_kind(span_data.span_kind.clone())) .timestamp( diff --git a/src/api/context/mod.rs b/src/api/context/mod.rs index eb50a537a1..15c217b4fe 100644 --- a/src/api/context/mod.rs +++ b/src/api/context/mod.rs @@ -70,6 +70,8 @@ use std::fmt; use std::hash::{BuildHasherDefault, Hasher}; use std::rc::Rc; +pub mod propagation; + thread_local! { static CURRENT_CONTEXT: RefCell = RefCell::new(Context::default()); static DEFAULT_CONTEXT: Context = Context::default(); diff --git a/src/api/propagation/base64_format.rs b/src/api/context/propagation/base64_format.rs similarity index 95% rename from src/api/propagation/base64_format.rs rename to src/api/context/propagation/base64_format.rs index e9772f1ffc..e6f333a310 100644 --- a/src/api/propagation/base64_format.rs +++ b/src/api/context/propagation/base64_format.rs @@ -7,7 +7,7 @@ //! and deserializes values from base64 strings. There is a blanket implementation //! for any implementors of `BinaryFormat` use crate::api; -use crate::api::propagation::binary_propagator::BinaryFormat; +use crate::api::BinaryFormat; use base64::{decode, encode}; /// Used to serialize and deserialize `SpanContext`s to and from a base64 @@ -39,8 +39,8 @@ where #[cfg(test)] mod tests { + use super::super::binary_propagator::BinaryPropagator; use super::*; - use crate::api::propagation::binary_propagator::BinaryPropagator; #[rustfmt::skip] fn to_base64_data() -> Vec<(api::SpanContext, String)> { diff --git a/src/api/propagation/binary_propagator.rs b/src/api/context/propagation/binary_propagator.rs similarity index 100% rename from src/api/propagation/binary_propagator.rs rename to src/api/context/propagation/binary_propagator.rs diff --git a/src/api/propagation/mod.rs b/src/api/context/propagation/mod.rs similarity index 99% rename from src/api/propagation/mod.rs rename to src/api/context/propagation/mod.rs index b0d4861b4e..680f1a19be 100644 --- a/src/api/propagation/mod.rs +++ b/src/api/context/propagation/mod.rs @@ -163,7 +163,6 @@ use std::collections::HashMap; #[cfg(feature = "base64")] pub mod base64_format; pub mod binary_propagator; -pub mod noop; pub mod text_propagator; /// Carriers provide an interface for adding and removing fields from an diff --git a/src/api/context/propagation/text_propagator.rs b/src/api/context/propagation/text_propagator.rs new file mode 100644 index 0000000000..04298da00b --- /dev/null +++ b/src/api/context/propagation/text_propagator.rs @@ -0,0 +1,34 @@ +//! # Text Propagator +//! +//! `HttpTextFormat` is a formatter to serialize and deserialize a value into a +//! text format. +use crate::{api, api::Context}; +use std::fmt::Debug; + +/// Methods to inject and extract a value as text into carriers that travel +/// in-band across process boundaries. +pub trait HttpTextFormat: Debug { + /// Properly encodes the values of the current [`Context`] and injects them into + /// the [`Carrier`]. + /// + /// [`Context`]: ../../struct.Context.html + /// [`Carrier`]: ../trait.Carrier.html + fn inject(&self, carrier: &mut dyn api::Carrier) { + self.inject_context(&Context::current(), carrier) + } + + /// Properly encodes the values of the [`Context`] and injects them into the + /// [`Carrier`]. + /// + /// [`Context`]: ../../struct.Context.html + /// [`Carrier`]: ../trait.Carrier.html + fn inject_context(&self, cx: &Context, carrier: &mut dyn api::Carrier); + + /// Retrieves encoded data using the provided [`Carrier`]. If no data for this + /// format was retrieved OR if the retrieved data is invalid, then the current + /// [`Context`] is returned. + /// + /// [`Context`]: ../../struct.Context.html + /// [`Carrier`]: ../trait.Carrier.html + fn extract(&self, carrier: &dyn api::Carrier) -> Context; +} diff --git a/src/api/mod.rs b/src/api/mod.rs index 0e4a82bb58..bd940178a7 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -16,11 +16,16 @@ pub mod context; pub mod core; pub mod metrics; -pub mod propagation; pub mod trace; pub use self::core::{Key, KeyValue, Unit, Value}; -pub use context::Context; +#[cfg(feature = "base64_format")] +pub use context::propagation::base64_format::Base64Format; +pub use context::{ + propagation::{binary_propagator::BinaryFormat, text_propagator::HttpTextFormat, Carrier}, + Context, +}; + pub use metrics::{ counter::{Counter, CounterHandle}, gauge::{Gauge, GaugeHandle}, @@ -29,12 +34,11 @@ pub use metrics::{ value::MeasurementValue, Instrument, InstrumentHandle, LabelSet, Measurement, Meter, MetricOptions, }; -#[cfg(feature = "base64_format")] -pub use propagation::base64_format::Base64Format; -pub use propagation::{binary_propagator::BinaryFormat, text_propagator::HttpTextFormat, Carrier}; pub use trace::{ b3_propagator::B3Propagator, + context::TraceContextExt, event::Event, + futures::FutureExt, id_generator::IdGenerator, link::Link, noop::{NoopProvider, NoopSpan, NoopSpanExporter, NoopTracer}, @@ -44,5 +48,5 @@ pub use trace::{ span_context::{SpanContext, SpanId, TraceId, TRACE_FLAGS_UNUSED, TRACE_FLAG_SAMPLED}, span_processor::SpanProcessor, trace_context_propagator::TraceContextPropagator, - tracer::{SpanBuilder, Tracer, TracerGenerics}, + tracer::{SpanBuilder, Tracer}, }; diff --git a/src/api/propagation/noop.rs b/src/api/propagation/noop.rs deleted file mode 100644 index 007c000df8..0000000000 --- a/src/api/propagation/noop.rs +++ /dev/null @@ -1,36 +0,0 @@ -//! # No-op OpenTelemetry Propagation Implementation -//! -//! This implementation is useful for testing purposes as it is intended -//! to have minimal resource utilization and runtime impact. -use crate::api; - -/// A no-op instance of a `HttpTextFormat`. -#[derive(Debug)] -pub struct NoopTextFormat {} - -impl api::HttpTextFormat for NoopTextFormat { - /// Ignores calls to `inject` - fn inject(&self, _context: api::SpanContext, _carrier: &mut dyn api::Carrier) { - // Ignored - } - - /// Always returns invalid span contexts - fn extract(&self, _carrier: &dyn api::Carrier) -> api::SpanContext { - api::SpanContext::new(api::TraceId::invalid(), api::SpanId::invalid(), 0, false) - } -} - -/// A no-op instance of `BinaryFormat` -#[derive(Debug)] -pub struct NoopBinaryFormat {} - -impl api::BinaryFormat for NoopBinaryFormat { - fn to_bytes(&self, _context: &api::SpanContext) -> [u8; 29] { - [0; 29] - } - - /// Always returns invalid span contexts - fn from_bytes(&self, _bytes: Vec) -> api::SpanContext { - api::SpanContext::new(api::TraceId::invalid(), api::SpanId::invalid(), 0, false) - } -} diff --git a/src/api/propagation/text_propagator.rs b/src/api/propagation/text_propagator.rs deleted file mode 100644 index 8d8227719b..0000000000 --- a/src/api/propagation/text_propagator.rs +++ /dev/null @@ -1,19 +0,0 @@ -//! # Text Propagator -//! -//! `HttpTextFormat` is a formatter to serialize and deserialize a -//! value into a text format. -use crate::api; - -///is used to inject and extract a value as text into carriers that travel -/// in-band across process boundaries. -pub trait HttpTextFormat { - /// Properly encodes the values of the `SpanContext` and injects them - /// into the `Carrier`. - fn inject(&self, context: api::SpanContext, carrier: &mut dyn api::Carrier); - - /// Retrieves encoded `SpanContext`s using the `Carrier`. It decodes - /// the `SpanContext` and returns it. If no `SpanContext` was retrieved - /// OR if the retrieved SpanContext is invalid then an empty `SpanContext` - /// is returned. - fn extract(&self, carrier: &dyn api::Carrier) -> api::SpanContext; -} diff --git a/src/api/trace/b3_propagator.rs b/src/api/trace/b3_propagator.rs index d713fb3899..febb2e57d1 100644 --- a/src/api/trace/b3_propagator.rs +++ b/src/api/trace/b3_propagator.rs @@ -13,7 +13,7 @@ //! //! If `single_header` is set to `true` then `X-B3` header is used to inject //! and extract. Otherwise, separate headers are used to inject and extract. -use crate::api; +use crate::{api, api::TraceContextExt}; static B3_SINGLE_HEADER: &str = "X-B3"; static B3_DEBUG_FLAG_HEADER: &str = "X-B3-Flags"; @@ -126,49 +126,54 @@ impl B3Propagator { } impl api::HttpTextFormat for B3Propagator { - /// Properly encodes the values of the `SpanContext` and injects them - /// into the `Carrier`. - fn inject(&self, context: api::SpanContext, carrier: &mut dyn api::Carrier) { - if context.is_valid() { + /// Properly encodes the values of the `Context`'s `SpanContext` and injects + /// them into the `Carrier`. + fn inject_context(&self, context: &api::Context, carrier: &mut dyn api::Carrier) { + let span_context = context.span().span_context(); + if span_context.is_valid() { if self.single_header { - let sampled = context.trace_flags() & api::TRACE_FLAG_SAMPLED; + let sampled = span_context.trace_flags() & api::TRACE_FLAG_SAMPLED; carrier.set( B3_SINGLE_HEADER, format!( "{:032x}-{:016x}-{:01}", - context.trace_id().to_u128(), - context.span_id().to_u64(), + span_context.trace_id().to_u128(), + span_context.span_id().to_u64(), sampled ), ); } else { carrier.set( B3_TRACE_ID_HEADER, - format!("{:032x}", context.trace_id().to_u128()), + format!("{:032x}", span_context.trace_id().to_u128()), ); carrier.set( B3_SPAN_ID_HEADER, - format!("{:016x}", context.span_id().to_u64()), + format!("{:016x}", span_context.span_id().to_u64()), ); - let sampled = if context.is_sampled() { "1" } else { "0" }; + let sampled = if span_context.is_sampled() { "1" } else { "0" }; carrier.set(B3_SAMPLED_HEADER, sampled.to_string()) } } } - /// Retrieves encoded `SpanContext`s using the `Carrier`. It decodes - /// the `SpanContext` and returns it. If no `SpanContext` was retrieved - /// OR if the retrieved SpanContext is invalid then an empty `SpanContext` - /// is returned. - fn extract(&self, carrier: &dyn api::Carrier) -> api::SpanContext { - if self.single_header { + /// Retrieves encoded data using the provided [`Carrier`]. If no data for this + /// format was retrieved OR if the retrieved data is invalid, then the current + /// [`Context`] is returned. + /// + /// [`Context`]: ../../struct.Context.html + /// [`Carrier`]: ../trait.Carrier.html + fn extract(&self, carrier: &dyn api::Carrier) -> api::Context { + let span_context = if self.single_header { self.extract_single_header(carrier) .unwrap_or_else(|_| api::SpanContext::empty_context()) } else { self.extract_multi_header(carrier) .unwrap_or_else(|_| api::SpanContext::empty_context()) - } + }; + + api::Context::current_with_remote_span_context(span_context) } } @@ -231,7 +236,12 @@ mod tests { for (header, expected_context) in single_header_extract_data() { let mut carrier: HashMap<&'static str, String> = HashMap::new(); carrier.insert(B3_SINGLE_HEADER, header.to_owned()); - assert_eq!(single_header_propagator.extract(&carrier), expected_context) + assert_eq!( + single_header_propagator + .extract(&carrier) + .remote_span_context(), + Some(&expected_context) + ) } for ((trace, span, sampled, debug, parent), expected_context) in multi_header_extract_data() @@ -252,8 +262,35 @@ mod tests { if let Some(parent) = parent { carrier.insert(B3_PARENT_SPAN_ID_HEADER, parent.to_owned()); } - assert_eq!(multi_header_propagator.extract(&carrier), expected_context) + assert_eq!( + multi_header_propagator + .extract(&carrier) + .remote_span_context(), + Some(&expected_context) + ) + } + } + + #[derive(Debug)] + struct TestSpan(api::SpanContext); + impl api::Span for TestSpan { + fn add_event_with_timestamp( + &self, + _name: String, + _timestamp: std::time::SystemTime, + _attributes: Vec, + ) { + } + fn span_context(&self) -> api::SpanContext { + self.0.clone() + } + fn is_recording(&self) -> bool { + false } + fn set_attribute(&self, _attribute: api::KeyValue) {} + fn set_status(&self, _code: api::StatusCode, _message: String) {} + fn update_name(&self, _new_name: String) {} + fn end(&self) {} } #[test] @@ -263,7 +300,10 @@ mod tests { for (expected_header, context) in single_header_inject_data() { let mut carrier = HashMap::new(); - single_header_propagator.inject(context, &mut carrier); + single_header_propagator.inject_context( + &api::Context::current_with_span(TestSpan(context)), + &mut carrier, + ); assert_eq!( carrier.get(B3_SINGLE_HEADER), @@ -273,7 +313,10 @@ mod tests { for (trace_id, span_id, sampled, context) in multi_header_inject_data() { let mut carrier = HashMap::new(); - multi_header_propagator.inject(context, &mut carrier); + multi_header_propagator.inject_context( + &api::Context::current_with_span(TestSpan(context)), + &mut carrier, + ); assert_eq!(carrier.get(B3_TRACE_ID_HEADER), Some(&trace_id.to_owned())); assert_eq!(carrier.get(B3_SPAN_ID_HEADER), Some(&span_id.to_owned())); diff --git a/src/api/trace/context.rs b/src/api/trace/context.rs new file mode 100644 index 0000000000..ead2c55277 --- /dev/null +++ b/src/api/trace/context.rs @@ -0,0 +1,78 @@ +//! Context extensions for tracing +use crate::api; + +lazy_static::lazy_static! { + static ref NOOP_SPAN: api::NoopSpan = api::NoopSpan::new(); +} + +struct Span(Box); +struct RemoteSpanContext(api::SpanContext); + +/// Methods for soring and retrieving trace data in a context. +pub trait TraceContextExt { + /// Returns a clone of the current context with the included span. + /// + /// This is useful for building tracers. + fn current_with_span(span: T) -> Self; + + /// Returns a clone of this context with the included span. + /// + /// This is useful for building tracers. + fn with_span(&self, span: T) -> Self; + + /// Returns a reference to this context's span, or the default no-op span if + /// none has been set. + /// + /// # Examples + /// + /// ``` + /// use opentelemetry::{api, api::{Context, Provider, TraceContextExt, Tracer}, sdk}; + /// + /// // returns a reference to an empty span by default + /// assert_eq!(Context::current().span().span_context(), api::SpanContext::empty_context()); + /// + /// sdk::Provider::default().get_tracer("my-component").in_span("my-span", |cx| { + /// // Returns a reference to the current span if set + /// assert_ne!(cx.span().span_context(), api::SpanContext::empty_context()); + /// }); + /// ``` + fn span(&self) -> &dyn api::Span; + + /// Returns a copy of the current context with the span context included. + /// + /// This is useful for building propagators. + fn current_with_remote_span_context(span_context: api::SpanContext) -> Self; + + /// Returns a reference to the remote span context data stored in this context, + /// or none if no remote span context has been set. + /// + /// This is useful for building tracers. + fn remote_span_context(&self) -> Option<&api::SpanContext>; +} + +impl TraceContextExt for api::Context { + fn current_with_span(span: T) -> Self { + api::Context::current_with_value(Span(Box::new(span))) + } + + fn with_span(&self, span: T) -> Self { + self.with_value(Span(Box::new(span))) + } + + fn span(&self) -> &dyn api::Span { + if let Some(span) = self.get::() { + span.0.as_ref() + } else { + &*NOOP_SPAN + } + } + + fn current_with_remote_span_context(span_context: api::SpanContext) -> Self { + api::Context::current_with_value(RemoteSpanContext(span_context)) + } + + fn remote_span_context(&self) -> Option<&api::SpanContext> { + self.get::() + .map(|span_context| &span_context.0) + } +} diff --git a/src/api/trace/futures.rs b/src/api/trace/futures.rs index e59fed3507..5feef4a380 100644 --- a/src/api/trace/futures.rs +++ b/src/api/trace/futures.rs @@ -3,51 +3,116 @@ //! This module provides utilities for instrumenting asynchronous code written //! using [`futures`] and async/await. //! -//! This main trait is [`Instrument`], which allows a [`Tracer`], and a [`Span`] -//! to be attached to a future, sink, stream, or executor. +//! This main trait is [`FutureExt`], which allows a [`Context`], +//! to be attached to a future, sink, or stream. //! //! [`futures`]: https://doc.rust-lang.org/std/future/trait.Future.html -//! [`Instrument`]: trait.Instrument.html -//! [`Tracer`]: ../tracer/trait.Tracer.html -//! [`Span`]: ../span/trait.Span.html - -use crate::api; +//! [`FutureExt`]: trait.FutureExt.html +//! [`Context`]: ../../context/struct.Context.html +use crate::api::context::Context as OpenTelemetryContext; use pin_project::pin_project; -use std::{pin::Pin, task::Context}; +use std::{ + pin::Pin, + task::{Context as TaskContext, Poll}, +}; -/// A future, stream, sink, or executor that has been instrumented with a tracer and span. +/// A future, stream, or sink that has an associated context. #[pin_project] -#[derive(Debug, Clone)] -pub struct Instrumented { +#[derive(Clone, Debug)] +pub struct WithContext { #[pin] - inner: F, - span: S, + inner: T, + otel_cx: OpenTelemetryContext, } -impl Instrument for F {} +impl FutureExt for T {} + +impl std::future::Future for WithContext { + type Output = T::Output; + + fn poll(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll { + let this = self.project(); + let _guard = this.otel_cx.clone().attach(); -impl std::future::Future for Instrumented { - type Output = F::Output; + this.inner.poll(task_cx) + } +} + +impl futures::Stream for WithContext { + type Item = T::Item; + + fn poll_next(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll> { + let this = self.project(); + let _guard = this.otel_cx.clone().attach(); + T::poll_next(this.inner, task_cx) + } +} + +impl> futures::Sink for WithContext +where + T: futures::Sink, +{ + type Error = T::Error; + + fn poll_ready( + self: Pin<&mut Self>, + task_cx: &mut TaskContext<'_>, + ) -> Poll> { + let this = self.project(); + let _guard = this.otel_cx.clone().attach(); + T::poll_ready(this.inner, task_cx) + } + + fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { + let this = self.project(); + let _guard = this.otel_cx.clone().attach(); + T::start_send(this.inner, item) + } + + fn poll_flush( + self: Pin<&mut Self>, + task_cx: &mut TaskContext<'_>, + ) -> Poll> { + let this = self.project(); + let _guard = this.otel_cx.clone().attach(); + T::poll_flush(this.inner, task_cx) + } - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll { + fn poll_close( + self: Pin<&mut Self>, + task_cx: &mut TaskContext<'_>, + ) -> futures::task::Poll> { let this = self.project(); - this.span.mark_as_active(); - let res = this.inner.poll(cx); - this.span.mark_as_inactive(); - res + let _enter = this.otel_cx.clone().attach(); + T::poll_close(this.inner, task_cx) } } -/// Extension trait allowing futures, streams, sinks, and executors to be traced with a span. -pub trait Instrument: Sized { - /// Traces this type with the provided `Span`, returning a `Instrumented` wrapper. - fn instrument(self, span: S) -> Instrumented { - Instrumented { inner: self, span } +/// Extension trait allowing futures, streams, and sinks to be traced with a span. +pub trait FutureExt: Sized { + /// Attaches the provided [`Context`] to this type, returning a `WithContext` + /// wrapper. + /// + /// When the wrapped type is a future, stream, or sink, the attached context + /// will be set as current while it is being polled. + /// + /// [`Context`]: ../../context/struct.Context.html + fn with_context(self, otel_cx: OpenTelemetryContext) -> WithContext { + WithContext { + inner: self, + otel_cx, + } } - /// Traces this type with the provided `Tracer`'s active span, returning a `Instrumented` wrapper. - fn in_active_span(self, tracer: T) -> Instrumented { - let span = tracer.get_active_span(); - self.instrument(span) + /// Attaches the current [`Context`] to this type, returning a `WithContext` + /// wrapper. + /// + /// When the wrapped type is a future, stream, or sink, the attached context + /// will be set as the default while it is being polled. + /// + /// [`Context`]: ../../context/struct.Context.html + fn with_current_context(self) -> WithContext { + let otel_cx = OpenTelemetryContext::current(); + self.with_context(otel_cx) } } diff --git a/src/api/trace/mod.rs b/src/api/trace/mod.rs index 1f1d3d112a..449051f839 100644 --- a/src/api/trace/mod.rs +++ b/src/api/trace/mod.rs @@ -111,6 +111,7 @@ //! field](https://www.w3.org/TR/trace-context/#tracestate-field). //! pub mod b3_propagator; +pub mod context; pub mod event; pub mod futures; pub mod id_generator; diff --git a/src/api/trace/noop.rs b/src/api/trace/noop.rs index 2932d3f5d8..88f71164b1 100644 --- a/src/api/trace/noop.rs +++ b/src/api/trace/noop.rs @@ -4,7 +4,6 @@ //! has been set. It is also useful for testing purposes as it is intended //! to have minimal resource utilization and runtime impact. use crate::{api, exporter}; -use std::any::Any; use std::sync::Arc; use std::time::SystemTime; @@ -64,7 +63,7 @@ impl api::Span for NoopSpan { } /// Returns an invalid `SpanContext`. - fn get_context(&self) -> api::SpanContext { + fn span_context(&self) -> api::SpanContext { self.span_context.clone() } @@ -92,21 +91,6 @@ impl api::Span for NoopSpan { fn end(&self) { // Ignored } - - /// Returns self as dyn Any - fn as_any(&self) -> &dyn Any { - self - } - - /// Ignores being marked as active - fn mark_as_active(&self) { - // Ignored - } - - /// Ignores being marked as inactive - fn mark_as_inactive(&self) { - // Ignored - } } /// A no-op instance of a `Tracer`. @@ -122,8 +106,8 @@ impl api::Tracer for NoopTracer { } /// Starts a new `NoopSpan`. - fn start(&self, _name: &str, _context: Option) -> Self::Span { - api::NoopSpan::new() + fn start_from_context(&self, _name: &str, _context: &api::Context) -> Self::Span { + self.invalid() } /// Starts a SpanBuilder @@ -132,26 +116,7 @@ impl api::Tracer for NoopTracer { } /// Builds a `NoopSpan` from a `SpanBuilder` - fn build(&self, _builder: api::SpanBuilder) -> Self::Span { - self.invalid() - } - - /// Returns a new `NoopSpan` as this tracer does not maintain a registry. - fn get_active_span(&self) -> Self::Span { - api::NoopSpan::new() - } - - /// Ignores active span state. - fn mark_span_as_active(&self, _span: &Self::Span) { - // Noop - } - - /// Ignores active span state. - fn mark_span_as_inactive(&self, _span_id: api::SpanId) { - // Noop - } - - fn clone_span(&self, _span: &Self::Span) -> Self::Span { + fn build_with_context(&self, _builder: api::SpanBuilder, _cx: &api::Context) -> Self::Span { self.invalid() } } @@ -170,8 +135,4 @@ impl exporter::trace::SpanExporter for NoopSpanExporter { fn shutdown(&self) { // Noop } - - fn as_any(&self) -> &dyn Any { - self - } } diff --git a/src/api/trace/span.rs b/src/api/trace/span.rs index b4480f18f4..7e97b52d5d 100644 --- a/src/api/trace/span.rs +++ b/src/api/trace/span.rs @@ -55,7 +55,7 @@ pub trait Span: fmt::Debug + 'static { /// Returns the `SpanContext` for the given `Span`. The returned value may be used even after /// the `Span is finished. The returned value MUST be the same for the entire `Span` lifetime. - fn get_context(&self) -> api::SpanContext; + fn span_context(&self) -> api::SpanContext; /// Returns true if this `Span` is recording information like events with the `add_event` /// operation, attributes using `set_attributes`, status with `set_status`, etc. @@ -122,25 +122,6 @@ pub trait Span: fmt::Debug + 'static { /// ///This API MUST be non-blocking. fn end(&self); - - /// Used by global tracer to downcast to specific span type. - fn as_any(&self) -> &dyn std::any::Any; - - /// Mark as currently active span. - /// - /// This is the _synchronous_ api. If you are using futures, you - /// need to use the async api via [`instrument`]. - /// - /// [`instrument`]: ../futures/trait.Instrument.html#method.instrument - fn mark_as_active(&self); - - /// Mark as no longer active. - /// - /// This is the _synchronous_ api. If you are using futures, you - /// need to use the async api via [`instrument`]. - /// - /// [`instrument`]: ../futures/trait.Instrument.html#method.instrument - fn mark_as_inactive(&self); } /// `SpanKind` describes the relationship between the Span, its parents, diff --git a/src/api/trace/span_processor.rs b/src/api/trace/span_processor.rs index f4a1f9c53c..dc15128de3 100644 --- a/src/api/trace/span_processor.rs +++ b/src/api/trace/span_processor.rs @@ -31,8 +31,8 @@ //! +-----+--------------+ +---------------------+ //! ``` //! -//! [`is_recording`]: trait.Span.html#is_recording -//! [`Provider`]: trait.Provider.html +//! [`is_recording`]: ../span/trait.Span.html#method.is_recording +//! [`Provider`]: ../provider/trait.Provider.html use crate::exporter; use std::sync::Arc; diff --git a/src/api/trace/trace_context_propagator.rs b/src/api/trace/trace_context_propagator.rs index 6bb8dba1f9..138ce77af7 100644 --- a/src/api/trace/trace_context_propagator.rs +++ b/src/api/trace/trace_context_propagator.rs @@ -18,7 +18,7 @@ //! //! [w3c trace-context docs]: https://w3c.github.io/trace-context/ -use crate::api; +use crate::{api, api::TraceContextExt}; static SUPPORTED_VERSION: u8 = 0; static MAX_VERSION: u8 = 254; @@ -85,14 +85,15 @@ impl TraceContextPropagator { impl api::HttpTextFormat for TraceContextPropagator { /// Properly encodes the values of the `SpanContext` and injects them /// into the `Carrier`. - fn inject(&self, context: api::SpanContext, carrier: &mut dyn api::Carrier) { - if context.is_valid() { + fn inject_context(&self, context: &api::Context, carrier: &mut dyn api::Carrier) { + let span_context = context.span().span_context(); + if span_context.is_valid() { let header_value = format!( "{:02x}-{:032x}-{:016x}-{:02x}", SUPPORTED_VERSION, - context.trace_id().to_u128(), - context.span_id().to_u64(), - context.trace_flags() & api::TRACE_FLAG_SAMPLED + span_context.trace_id().to_u128(), + span_context.span_id().to_u64(), + span_context.trace_flags() & api::TRACE_FLAG_SAMPLED ); carrier.set(TRACEPARENT_HEADER, header_value) } @@ -102,9 +103,10 @@ impl api::HttpTextFormat for TraceContextPropagator { /// the `SpanContext` and returns it. If no `SpanContext` was retrieved /// OR if the retrieved SpanContext is invalid then an empty `SpanContext` /// is returned. - fn extract(&self, carrier: &dyn api::Carrier) -> api::SpanContext { + fn extract(&self, carrier: &dyn api::Carrier) -> api::Context { self.extract_span_context(carrier) - .unwrap_or_else(|_| api::SpanContext::empty_context()) + .map(api::Context::current_with_remote_span_context) + .unwrap_or_else(|_| api::Context::current()) } } @@ -144,8 +146,33 @@ mod tests { for (header, expected_context) in extract_data() { let mut carrier: HashMap<&'static str, String> = HashMap::new(); carrier.insert(TRACEPARENT_HEADER, header.to_owned()); - assert_eq!(propagator.extract(&carrier), expected_context) + assert_eq!( + propagator.extract(&carrier).remote_span_context(), + Some(&expected_context) + ) + } + } + + #[derive(Debug)] + struct TestSpan(api::SpanContext); + impl api::Span for TestSpan { + fn add_event_with_timestamp( + &self, + _name: String, + _timestamp: std::time::SystemTime, + _attributes: Vec, + ) { + } + fn span_context(&self) -> api::SpanContext { + self.0.clone() } + fn is_recording(&self) -> bool { + false + } + fn set_attribute(&self, _attribute: api::KeyValue) {} + fn set_status(&self, _code: api::StatusCode, _message: String) {} + fn update_name(&self, _new_name: String) {} + fn end(&self) {} } #[test] @@ -154,7 +181,10 @@ mod tests { for (expected_header, context) in inject_data() { let mut carrier = HashMap::new(); - propagator.inject(context, &mut carrier); + propagator.inject_context( + &api::Context::current_with_span(TestSpan(context)), + &mut carrier, + ); assert_eq!( Carrier::get(&carrier, TRACEPARENT_HEADER).unwrap_or(""), diff --git a/src/api/trace/tracer.rs b/src/api/trace/tracer.rs index 76a4a3e819..41840a4b79 100644 --- a/src/api/trace/tracer.rs +++ b/src/api/trace/tracer.rs @@ -1,27 +1,155 @@ //! # OpenTelemetry Tracer interface //! -//! The OpenTelemetry library achieves in-process context propagation of -//! `Span`s by way of the `Tracer`. +//! The OpenTelemetry library achieves in-process context propagation of `Span`s +//! by way of the `Tracer`. //! -//! The `Tracer` is responsible for tracking the currently active `Span`, -//! and exposes methods for creating and activating new `Spans`. The -//! `Tracer` is configured with `Propagators` which support transferring -//! span context across process boundaries. +//! The `Tracer` is responsible for tracking the currently active `Span`, and +//! exposes methods for creating and activating new `Spans`. The `Tracer` is +//! configured with `Propagators` which support transferring span context across +//! process boundaries. //! -//! `Tracer`s are generally expected to be used as singletons. -//! Implementations SHOULD provide a single global default Tracer. +//! `Tracer`s are generally expected to be used as singletons. Implementations +//! SHOULD provide a single global default Tracer. //! -//! Some applications may require multiple `Tracer` instances, e.g. to -//! create `Span`s on behalf of other applications. Implementations MAY -//! provide a global registry of Tracers for such applications. +//! Some applications may require multiple `Tracer` instances, e.g. to create +//! `Span`s on behalf of other applications. Implementations MAY provide a +//! global registry of Tracers for such applications. //! -//! The `Tracer` SHOULD allow end users to configure other tracing components that -//! control how `Span`s are passed across process boundaries, including the binary -//! and text format `Propagator`s used to serialize `Span`s created by the -//! `Tracer`. +//! The `Tracer` SHOULD allow end users to configure other tracing components +//! that control how `Span`s are passed across process boundaries, including the +//! binary and text format `Propagator`s used to serialize `Span`s created by +//! the `Tracer`. //! -//! Docs: https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/api-tracing.md#tracer -use crate::api::{self, Span}; +//! ## In Synchronous Code +//! +//! Spans can be created and nested manually: +//! +//! ``` +//! use opentelemetry::{global, api::{Span, Tracer}}; +//! let tracer = global::tracer("my-component"); +//! +//! let parent = tracer.start("foo"); +//! let child = tracer.span_builder("bar") +//! .with_parent(parent.span_context()) +//! .start(&tracer); +//! +//! // ... +//! +//! child.end(); +//! parent.end(); +//! ``` +//! +//! Spans can also use the current thread's [`Context`] to track which span is active: +//! +//! ``` +//! use opentelemetry::{global, api::{Tracer, SpanKind}}; +//! let tracer = global::tracer("my-component"); +//! +//! // Create simple spans with `in_span` +//! tracer.in_span("foo", |_foo_cx| { +//! // parent span is active +//! tracer.in_span("bar", |_bar_cx| { +//! // child span is now the active span and associated with the parent span +//! }); +//! // child has ended, parent now the active span again +//! }); +//! // parent has ended, no active spans +//! +//! // -- OR -- +//! +//! // create complex spans with span builder and `with_span` +//! let parent_span = tracer.span_builder("foo").with_kind(SpanKind::Server).start(&tracer); +//! tracer.with_span(parent_span, |_foo_cx| { +//! // parent span is active +//! let child_span = tracer.span_builder("bar").with_kind(SpanKind::Client).start(&tracer); +//! tracer.with_span(child_span, |_bar_cx| { +//! // child span is now the active span and associated with the parent span +//! }); +//! // child has ended, parent now the active span again +//! }); +//! // parent has ended, no active spans +//! ``` +//! +//! Spans can also be marked as active, and the resulting guard allows for +//! greater control over when the span is no longer considered active. +//! +//! ``` +//! use opentelemetry::{global, api::{Span, Tracer}}; +//! let tracer = global::tracer("my-component"); +//! +//! let parent_span = tracer.start("foo"); +//! let parent_active = tracer.mark_span_as_active(parent_span); +//! +//! { +//! let child = tracer.start("bar"); +//! let _child_active = tracer.mark_span_as_active(child); +//! +//! // do work in the context of the child span... +//! +//! // exiting the scope drops the guard, child is no longer active +//! } +//! // Parent is active span again +//! +//! // Parent can be dropped manually, or allowed to go out of scope as well. +//! drop(parent_active); +//! +//! // no active span +//! ``` +//! +//! ## In Asynchronous Code +//! +//! If you are instrumenting code that make use of [`std::future::Future`] or +//! async/await, be sure to use the [`FutureExt`] trait. This is needed because +//! the following example _will not_ work: +//! +//! ```no_run +//! # use opentelemetry::{global, api::Tracer}; +//! # let tracer = global::tracer("foo"); +//! # let span = tracer.start("foo-span"); +//! async { +//! // Does not work +//! let _g = tracer.mark_span_as_active(span); +//! // ... +//! }; +//! ``` +//! +//! The context guard `_g` will not exit until the future generated by the +//! `async` block is complete. Since futures can be entered and exited +//! _multiple_ times without them completing, the span remains active for as +//! long as the future exists, rather than only when it is polled, leading to +//! very confusing and incorrect output. +//! +//! In order to trace asynchronous code, the [`Future::with_context`] combinator +//! can be used: +//! +//! ``` +//! # async fn run() -> Result<(), ()> { +//! use opentelemetry::api::{Context, FutureExt}; +//! let cx = Context::current(); +//! +//! let my_future = async { +//! // ... +//! }; +//! +//! my_future +//! .with_context(cx) +//! .await; +//! # Ok(()) +//! # } +//! ``` +//! +//! [`Future::with_context`] attaches a context to the future, ensuring that the +//! context's lifetime is as long as the future's. +//! +//! [`std::future::Future`]: https://doc.rust-lang.org/stable/std/future/trait.Future.html +//! [`FutureExt`]: ../futures/trait.FutureExt.html +//! [`Future::with_context`]: ../futures/trait.FutureExt.html#method.with_context +//! [`Context`]: ../../context/struct.Context.html +use crate::api::{ + self, + context::{Context, ContextGuard}, + TraceContextExt, +}; use std::fmt; use std::time::SystemTime; @@ -57,7 +185,34 @@ pub trait Tracer: fmt::Debug + 'static { /// created in another process. Each propagators' deserialization must set /// `is_remote` to true on a parent `SpanContext` so `Span` creation knows if the /// parent is remote. - fn start(&self, name: &str, parent_span: Option) -> Self::Span; + fn start(&self, name: &str) -> Self::Span { + self.start_from_context(name, &Context::current()) + } + + /// Starts a new `Span` in a given context + /// + /// By default the currently active `Span` is set as the new `Span`'s + /// parent. The `Tracer` MAY provide other default options for newly + /// created `Span`s. + /// + /// `Span` creation MUST NOT set the newly created `Span` as the currently + /// active `Span` by default, but this functionality MAY be offered additionally + /// as a separate operation. + /// + /// Each span has zero or one parent spans and zero or more child spans, which + /// represent causally related operations. A tree of related spans comprises a + /// trace. A span is said to be a _root span_ if it does not have a parent. Each + /// trace includes a single root span, which is the shared ancestor of all other + /// spans in the trace. Implementations MUST provide an option to create a `Span` as + /// a root span, and MUST generate a new `TraceId` for each root span created. + /// For a Span with a parent, the `TraceId` MUST be the same as the parent. + /// Also, the child span MUST inherit all `TraceState` values of its parent by default. + /// + /// A `Span` is said to have a _remote parent_ if it is the child of a `Span` + /// created in another process. Each propagators' deserialization must set + /// `is_remote` to true on a parent `SpanContext` so `Span` creation knows if the + /// parent is remote. + fn start_from_context(&self, name: &str, context: &Context) -> Self::Span; /// Creates a span builder /// @@ -65,13 +220,12 @@ pub trait Tracer: fmt::Debug + 'static { fn span_builder(&self, name: &str) -> SpanBuilder; /// Create a span from a `SpanBuilder` - fn build(&self, builder: SpanBuilder) -> Self::Span; + fn build(&self, builder: SpanBuilder) -> Self::Span { + self.build_with_context(builder, &Context::current()) + } - /// Returns the current active span. - /// - /// When getting the current `Span`, the `Tracer` MUST return a placeholder - /// `Span` with an invalid `SpanContext` if there is no currently active `Span`. - fn get_active_span(&self) -> Self::Span; + /// Create a span from a `SpanBuilder` + fn build_with_context(&self, builder: SpanBuilder, cx: &Context) -> Self::Span; /// Mark a given `Span` as active. /// @@ -81,57 +235,139 @@ pub trait Tracer: fmt::Debug + 'static { /// maybe finished (i.e. have a non-null end time) but still be active. A `Span` may be active /// on one thread after it has been made inactive on another. /// - /// NOTE: The `mark_span_as_active`/`mark_span_as_inactive` functions MUST be used - /// together or you can end up retaining references to the currently active `Span`. - /// If you do not want to manage active state of `Span`s manually, use the `with_span` - /// API defined for all `Tracer`s via `TracerGenerics` - fn mark_span_as_active(&self, span: &Self::Span); - - /// Remove span from active span + /// # Examples /// - /// When an active `Span` is made inactive, the previously-active `Span` SHOULD be - /// made active. A `Span` maybe finished (i.e. have a non-null end time) but still - /// be active. A `Span` may be active on one thread after it has been made inactive - /// on another. + /// ``` + /// use opentelemetry::{global, api::{Span, Tracer, KeyValue}}; /// - /// NOTE: The `mark_span_as_active`/`mark_span_as_inactive` functions MUST be used - /// together or you can end up retaining references to the currently active `Span`. - /// If you do not want to manage active state of `Span`s manually, use the `with_span` - /// API defined for all `Tracer`s via `TracerGenerics` - fn mark_span_as_inactive(&self, span_id: api::SpanId); - - /// Clone a span created by this tracer. - fn clone_span(&self, span: &Self::Span) -> Self::Span; -} + /// fn my_function() { + /// let tracer = global::tracer("my-component-a"); + /// // start an active span in one function + /// let span = tracer.start("span-name"); + /// let _guard = tracer.mark_span_as_active(span); + /// // anything happening in functions we call can still access the active span... + /// my_other_function(); + /// } + /// + /// fn my_other_function() { + /// // call methods on the current span from + /// global::tracer("my-component-b").get_active_span(|span| { + /// span.add_event("An event!".to_string(), vec![KeyValue::new("happened", true)]); + /// }); + /// } + /// ``` + #[must_use = "Dropping the guard detaches the context."] + fn mark_span_as_active(&self, span: Self::Span) -> ContextGuard { + let cx = Context::current_with_span(span); + cx.attach() + } -/// TracerGenerics are functions that have generic type parameters. They are a separate -/// trait so that `Tracer` can be used as a trait object in `GlobalTracer`. -pub trait TracerGenerics: Tracer { - /// Wraps the execution of the function body with a span. - /// It starts a new span and sets it as the active span for the given function. - /// It then executes the body. It closes the span before returning the execution result. - fn with_span(&self, name: &'static str, f: F) -> T + /// Executes a closure with a reference to this thread's current span. + /// + /// # Examples + /// + /// ``` + /// use opentelemetry::{global, api::{Span, Tracer, KeyValue}}; + /// + /// fn my_function() { + /// // start an active span in one function + /// global::tracer("my-component").in_span("span-name", |_cx| { + /// // anything happening in functions we call can still access the active span... + /// my_other_function(); + /// }) + /// } + /// + /// fn my_other_function() { + /// // call methods on the current span from + /// global::tracer("my-component").get_active_span(|span| { + /// span.add_event("An event!".to_string(), vec![KeyValue::new("happened", true)]); + /// }) + /// } + /// ``` + fn get_active_span(&self, f: F) -> T where - F: FnOnce(&Self::Span) -> T; -} + F: FnOnce(&dyn api::Span) -> T, + Self: Sized, + { + f(Context::current().span()) + } -// These functions can be implemented for all tracers to allow for convenient `with_span` syntax. -impl TracerGenerics for S { - /// Wraps the execution of the function body with a span. - /// It starts a new span and sets it as the active span for the given function. - /// It then executes the body. It closes the span before returning the execution result. - fn with_span(&self, name: &'static str, f: F) -> T + /// Start a new span and execute the given closure with reference to the span's + /// context. + /// + /// This method starts a new span and sets it as the active span for the given + /// function. It then executes the body. It closes the span before returning the + /// execution result. + /// + /// # Examples + /// + /// ``` + /// use opentelemetry::{global, api::{Span, Tracer, KeyValue}}; + /// + /// fn my_function() { + /// // start an active span in one function + /// global::tracer("my-component").in_span("span-name", |_cx| { + /// // anything happening in functions we call can still access the active span... + /// my_other_function(); + /// }) + /// } + /// + /// fn my_other_function() { + /// // call methods on the current span from + /// global::tracer("my-component").get_active_span(|span| { + /// span.add_event("An event!".to_string(), vec![KeyValue::new("happened", true)]); + /// }) + /// } + /// ``` + fn in_span(&self, name: &'static str, f: F) -> T where - F: FnOnce(&Self::Span) -> T, + F: FnOnce(Context) -> T, + Self::Span: Send + Sync, { - let span = self.start(name, None); - self.mark_span_as_active(&span); - - let result = f(&span); - span.end(); - self.mark_span_as_inactive(span.get_context().span_id()); + let span = self.start(name); + let cx = Context::current_with_span(span); + let _guard = cx.clone().attach(); + f(cx) + } - result + /// Start a new span and execute the given closure with reference to the span's + /// context. + /// + /// This method starts a new span and sets it as the active span for the given + /// function. It then executes the body. It closes the span before returning the + /// execution result. + /// + /// # Examples + /// + /// ``` + /// use opentelemetry::{global, api::{Span, SpanKind, Tracer, KeyValue}}; + /// + /// fn my_function() { + /// let tracer = global::tracer("my-component"); + /// // start a span with custom attributes via span bulder + /// let span = tracer.span_builder("span-name").with_kind(SpanKind::Server).start(&tracer); + /// // Mark the span as active for the duration of the closure + /// global::tracer("my-component").with_span(span, |_cx| { + /// // anything happening in functions we call can still access the active span... + /// my_other_function(); + /// }) + /// } + /// + /// fn my_other_function() { + /// // call methods on the current span from + /// global::tracer("my-component").get_active_span(|span| { + /// span.add_event("An event!".to_string(), vec![KeyValue::new("happened", true)]); + /// }) + /// } + /// ``` + fn with_span(&self, span: Self::Span, f: F) -> T + where + F: FnOnce(Context) -> T, + Self::Span: Send + Sync, + { + let cx = Context::current_with_span(span); + let _guard = cx.clone().attach(); + f(cx) } } diff --git a/src/exporter/trace/mod.rs b/src/exporter/trace/mod.rs index f3f51a697e..42aef06a30 100644 --- a/src/exporter/trace/mod.rs +++ b/src/exporter/trace/mod.rs @@ -54,9 +54,6 @@ pub trait SpanExporter: Send + Sync + std::fmt::Debug { /// data and the destination is unavailable). SDK authors can /// decide if they want to make the shutdown timeout to be configurable. fn shutdown(&self); - - /// Allows exporter to be downcast - fn as_any(&self) -> &dyn std::any::Any; } /// `SpanData` contains all the information collected by a `Span` and can be used @@ -65,7 +62,7 @@ pub trait SpanExporter: Send + Sync + std::fmt::Debug { #[derive(Clone, Debug, PartialEq)] pub struct SpanData { /// Exportable `SpanContext` - pub context: api::SpanContext, + pub span_context: api::SpanContext, /// Span parent id pub parent_span_id: api::SpanId, /// Span kind @@ -102,7 +99,7 @@ mod tests { let trace_flags = 0; let remote = false; - let context = api::SpanContext::new( + let span_context = api::SpanContext::new( api::TraceId::from_u128(trace_id), api::SpanId::from_u64(span_id), trace_flags, @@ -125,7 +122,7 @@ mod tests { let resource = Arc::new(sdk::Resource::default()); let span_data = SpanData { - context, + span_context, parent_span_id: api::SpanId::from_u64(parent_span_id), span_kind, name, diff --git a/src/exporter/trace/stdout.rs b/src/exporter/trace/stdout.rs index d0f2a4969f..67348f6285 100644 --- a/src/exporter/trace/stdout.rs +++ b/src/exporter/trace/stdout.rs @@ -22,7 +22,6 @@ //! global::set_provider(provider); //! ``` use crate::exporter::trace; -use std::any; use std::fmt::Debug; use std::io::{self, stdout, Stdout, Write}; use std::sync::{Arc, Mutex}; @@ -113,9 +112,4 @@ where /// Ignored for now. fn shutdown(&self) {} - - /// Allows `Exporter` to be downcast from trait object. - fn as_any(&self) -> &dyn any::Any { - self - } } diff --git a/src/global.rs b/src/global.rs index 8bebad4832..0170223fdb 100644 --- a/src/global.rs +++ b/src/global.rs @@ -24,7 +24,7 @@ //! //! fn do_something_tracked() { //! // Then you can use the global provider to create a tracer via `tracer`. -//! let _span = global::tracer("my-component").start("span-name", None); +//! let _span = global::tracer("my-component").start("span-name"); //! //! // Or access the configured provider via `trace_provider`. //! let provider = global::trace_provider(); @@ -67,7 +67,6 @@ //! [`trace_provider`]: fn.trace_provider.html //! [trait objects]: https://doc.rust-lang.org/reference/types/trait-object.html#trait-objects use crate::{api, api::Provider}; -use std::any::Any; use std::fmt; use std::sync::{Arc, RwLock}; use std::time::SystemTime; @@ -97,8 +96,8 @@ impl api::Span for BoxedSpan { } /// Returns the `SpanContext` for the given `Span`. - fn get_context(&self) -> api::SpanContext { - self.0.get_context() + fn span_context(&self) -> api::SpanContext { + self.0.span_context() } /// Returns true if this `Span` is recording information like events with the `add_event` @@ -131,31 +130,6 @@ impl api::Span for BoxedSpan { fn end(&self) { self.0.end() } - - /// Returns self as any - fn as_any(&self) -> &dyn Any { - self.0.as_any() - } - - /// Mark span as currently active - /// - /// This is the _synchronous_ api. If you are using futures, you - /// need to use the async api via [`instrument`]. - /// - /// [`instrument`]: ../api/trace/futures/trait.Instrument.html#method.instrument - fn mark_as_active(&self) { - self.0.mark_as_active() - } - - /// Mark span as no longer active - /// - /// This is the _synchronous_ api. If you are using futures, you - /// need to use the async api via [`instrument`]. - /// - /// [`instrument`]: ../api/trace/futures/trait.Instrument.html#method.instrument - fn mark_as_inactive(&self) { - self.0.mark_as_inactive() - } } /// Wraps the [`GlobalProvider`]'s [`Tracer`] so it can be used generically by @@ -184,8 +158,8 @@ impl api::Tracer for BoxedTracer { /// trace. A span is said to be a _root span_ if it does not have a parent. Each /// trace includes a single root span, which is the shared ancestor of all other /// spans in the trace. - fn start(&self, name: &str, parent_span: Option) -> Self::Span { - BoxedSpan(self.0.start_boxed(name, parent_span)) + fn start_from_context(&self, name: &str, cx: &api::Context) -> Self::Span { + BoxedSpan(self.0.start_with_context_boxed(name, cx)) } /// Creates a span builder @@ -196,31 +170,8 @@ impl api::Tracer for BoxedTracer { } /// Create a span from a `SpanBuilder` - fn build(&self, builder: api::SpanBuilder) -> Self::Span { - BoxedSpan(self.0.build_boxed(builder)) - } - - /// Returns the current active span. - /// - /// When getting the current `Span`, the `Tracer` will return a placeholder - /// `Span` with an invalid `SpanContext` if there is no currently active `Span`. - fn get_active_span(&self) -> Self::Span { - BoxedSpan(self.0.get_active_span_boxed()) - } - - /// Mark a given `Span` as active. - fn mark_span_as_active(&self, span: &Self::Span) { - self.0.mark_span_as_active_boxed(span) - } - - /// Mark a given `Span` as inactive. - fn mark_span_as_inactive(&self, span_id: api::SpanId) { - self.0.mark_span_as_inactive_boxed(span_id) - } - - /// Clone span - fn clone_span(&self, span: &Self::Span) -> Self::Span { - BoxedSpan(self.0.clone_span_boxed(span)) + fn build_with_context(&self, builder: api::SpanBuilder, cx: &api::Context) -> Self::Span { + BoxedSpan(self.0.build_with_context_boxed(builder, cx)) } } @@ -235,23 +186,15 @@ pub trait GenericTracer: fmt::Debug + 'static { /// Returns a trait object so the underlying implementation can be swapped /// out at runtime. - fn start_boxed(&self, name: &str, parent: Option) -> Box; + fn start_with_context_boxed(&self, name: &str, cx: &api::Context) -> Box; /// Returns a trait object so the underlying implementation can be swapped /// out at runtime. - fn build_boxed(&self, builder: api::SpanBuilder) -> Box; - - /// Returns the currently active span as a BoxedSpan - fn get_active_span_boxed(&self) -> Box; - - /// Returns the currently active span as a BoxedSpan - fn mark_span_as_active_boxed(&self, span: &DynSpan); - - /// Marks the current span as inactive - fn mark_span_as_inactive_boxed(&self, span_id: api::SpanId); - - /// Clone span - fn clone_span_boxed(&self, span: &DynSpan) -> Box; + fn build_with_context_boxed( + &self, + builder: api::SpanBuilder, + cx: &api::Context, + ) -> Box; } impl GenericTracer for T @@ -266,40 +209,18 @@ where /// Returns a trait object so the underlying implementation can be swapped /// out at runtime. - fn start_boxed(&self, name: &str, parent: Option) -> Box { - Box::new(self.start(name, parent)) + fn start_with_context_boxed(&self, name: &str, cx: &api::Context) -> Box { + Box::new(self.start_from_context(name, cx)) } /// Returns a trait object so the underlying implementation can be swapped /// out at runtime. - fn build_boxed(&self, builder: api::SpanBuilder) -> Box { - Box::new(self.build(builder)) - } - - /// Returns the current active span. - fn get_active_span_boxed(&self) -> Box { - Box::new(self.get_active_span()) - } - - /// Mark span as active. - fn mark_span_as_active_boxed(&self, some_span: &DynSpan) { - if let Some(span) = some_span.as_any().downcast_ref::() { - self.mark_span_as_active(span) - }; - } - - /// Mark span as inactive. - fn mark_span_as_inactive_boxed(&self, span_id: api::SpanId) { - self.mark_span_as_inactive(span_id) - } - - /// Clone span - fn clone_span_boxed(&self, some_span: &DynSpan) -> Box { - if let Some(span) = some_span.as_any().downcast_ref::() { - Box::new(self.clone_span(span)) - } else { - self.invalid_boxed() - } + fn build_with_context_boxed( + &self, + builder: api::SpanBuilder, + cx: &api::Context, + ) -> Box { + Box::new(self.build_with_context(builder, cx)) } } diff --git a/src/lib.rs b/src/lib.rs index 6c02a05bab..3f6412acd6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -190,8 +190,7 @@ //! from the backend. //! //! OpenTelemetry defines the naming convention for metric names as well as a -//! well-known metric names in [Semantic Conventions](data-semantic-conventions.md) -//! document. +//! well-known metric names in [Semantic Conventions] document. //! //! //! ## Resources @@ -215,6 +214,7 @@ //! - `HTTPTextFormat` which is used to inject and extract a value as text into carriers that travel //! in-band across process boundaries. //! +//! [Semantic Conventions]: https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/resource/semantic_conventions/README.md #![recursion_limit = "256"] #![allow(clippy::needless_doctest_main)] #![deny(missing_docs, unreachable_pub, missing_debug_implementations)] diff --git a/src/sdk/resource.rs b/src/sdk/resource.rs index 0728e2cd50..f373a3e922 100644 --- a/src/sdk/resource.rs +++ b/src/sdk/resource.rs @@ -12,7 +12,7 @@ //! That association cannot be changed later. When associated with a `Provider`, all `Span`s //! produced by any `Tracer` from the provider are associated with this `Resource`. //! -//! [`Provider`]: ../../../api/trace/provider/trait.Provider.html +//! [`Provider`]: ../../api/trace/provider/trait.Provider.html use crate::api; #[cfg(feature = "serialize")] use serde::{Deserialize, Serialize}; diff --git a/src/sdk/trace/span.rs b/src/sdk/trace/span.rs index 778002756a..6db23c295c 100644 --- a/src/sdk/trace/span.rs +++ b/src/sdk/trace/span.rs @@ -8,9 +8,7 @@ //! start time is set to the current time on span creation. After the `Span` is created, it //! is possible to change its name, set its `Attributes`, and add `Links` and `Events`. //! These cannot be changed after the `Span`'s end time has been set. -use crate::api::Tracer; use crate::{api, exporter, sdk}; -use std::any::Any; use std::sync::{Arc, Mutex}; use std::time::SystemTime; @@ -43,11 +41,6 @@ impl Span { } } - /// Return span id - pub(crate) fn id(&self) -> api::SpanId { - self.id - } - /// Operate on reference to span inner fn with_data(&self, f: F) -> Option where @@ -90,8 +83,8 @@ impl api::Span for Span { } /// Returns the `SpanContext` for the given `Span`. - fn get_context(&self) -> api::SpanContext { - self.with_data(|data| data.context.clone()) + fn span_context(&self) -> api::SpanContext { + self.with_data(|data| data.span_context.clone()) .unwrap_or_else(|| { api::SpanContext::new(api::TraceId::invalid(), api::SpanId::invalid(), 0, false) }) @@ -136,31 +129,6 @@ impl api::Span for Span { data.end_time = SystemTime::now(); }); } - - /// Returns self as any - fn as_any(&self) -> &dyn Any { - self - } - - /// Mark as currently active span. - /// - /// This is the _synchronous_ api. If you are using futures, you - /// need to use the async api via [`instrument`]. - /// - /// [`instrument`]: ../../api/trace/futures/trait.Instrument.html#method.instrument - fn mark_as_active(&self) { - self.inner.tracer.mark_span_as_active(&self); - } - - /// Mark span as inactive - /// - /// This is the _synchronous_ api. If you are using futures, you - /// need to use the async api via [`instrument`]. - /// - /// [`instrument`]: ../futures/trait.Instrument.html#method.instrument - fn mark_as_inactive(&self) { - self.inner.tracer.mark_span_as_inactive(self.id); - } } impl Drop for SpanInner { diff --git a/src/sdk/trace/span_processor.rs b/src/sdk/trace/span_processor.rs index 938dd61d42..a5f7f72225 100644 --- a/src/sdk/trace/span_processor.rs +++ b/src/sdk/trace/span_processor.rs @@ -120,7 +120,7 @@ impl api::SpanProcessor for SimpleSpanProcessor { } fn on_end(&self, span: Arc) { - if span.context.is_sampled() { + if span.span_context.is_sampled() { self.exporter.export(vec![span]); } } diff --git a/src/sdk/trace/tracer.rs b/src/sdk/trace/tracer.rs index c1ce3001ed..57c251ca88 100644 --- a/src/sdk/trace/tracer.rs +++ b/src/sdk/trace/tracer.rs @@ -7,11 +7,9 @@ //! and exposes methods for creating and activating new `Spans`. //! //! Docs: https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/api-tracing.md#tracer -use crate::api::trace::span::Span; +use crate::api::TraceContextExt; use crate::sdk; -use crate::{api, exporter}; -use std::cell::RefCell; -use std::collections::HashSet; +use crate::{api, api::context::Context, exporter}; use std::fmt; use std::sync::Arc; use std::time::SystemTime; @@ -86,11 +84,6 @@ impl Tracer { } } -thread_local! { - /// Track currently active `Span` per thread via a `SpanStack`. - static CURRENT_SPANS: RefCell = RefCell::new(SpanStack::new()); -} - impl api::Tracer for Tracer { /// This implementation of `api::Tracer` produces `sdk::Span` instances. type Span = sdk::Span; @@ -101,18 +94,17 @@ impl api::Tracer for Tracer { sdk::Span::new(api::SpanId::invalid(), None, self.clone()) } - /// Starts a new `Span`. + /// Starts a new `Span` in a given context. /// /// Each span has zero or one parent spans and zero or more child spans, which /// represent causally related operations. A tree of related spans comprises a /// trace. A span is said to be a _root span_ if it does not have a parent. Each /// trace includes a single root span, which is the shared ancestor of all other /// spans in the trace. - fn start(&self, name: &str, parent_span: Option) -> Self::Span { - let mut builder = self.span_builder(name); - builder.parent_context = parent_span; + fn start_from_context(&self, name: &str, cx: &Context) -> Self::Span { + let builder = self.span_builder(name); - self.build(builder) + self.build_with_context(builder, cx) } /// Creates a span builder @@ -129,7 +121,7 @@ impl api::Tracer for Tracer { /// trace. A span is said to be a _root span_ if it does not have a parent. Each /// trace includes a single root span, which is the shared ancestor of all other /// spans in the trace. - fn build(&self, mut builder: api::SpanBuilder) -> Self::Span { + fn build_with_context(&self, mut builder: api::SpanBuilder, cx: &Context) -> Self::Span { let config = self.provider.config(); let span_id = builder .span_id @@ -140,35 +132,39 @@ impl api::Tracer for Tracer { let mut attribute_options = builder.attributes.take().unwrap_or_else(Vec::new); let mut link_options = builder.links.take().unwrap_or_else(Vec::new); - // Build context for sampling decision - let (no_parent, trace_id, parent_span_id, remote_parent, parent_trace_flags) = builder + let parent_span_context = builder .parent_context - .clone() - .or_else(|| Some(self.get_active_span().get_context())) - .filter(|ctx| ctx.is_valid()) - .map(|ctx| { - ( + .take() + .or_else(|| Some(cx.span().span_context()).filter(|cx| cx.is_valid())) + .or_else(|| cx.remote_span_context().cloned()) + .filter(|cx| cx.is_valid()); + // Build context for sampling decision + let (no_parent, trace_id, parent_span_id, remote_parent, parent_trace_flags) = + parent_span_context + .as_ref() + .map(|ctx| { + ( + false, + ctx.trace_id(), + ctx.span_id(), + ctx.is_remote(), + ctx.trace_flags(), + ) + }) + .unwrap_or(( + true, + builder + .trace_id + .unwrap_or_else(|| self.provider().config().id_generator.new_trace_id()), + api::SpanId::invalid(), false, - ctx.trace_id(), - ctx.span_id(), - ctx.is_remote(), - ctx.trace_flags(), - ) - }) - .unwrap_or(( - true, - builder - .trace_id - .unwrap_or_else(|| self.provider().config().id_generator.new_trace_id()), - api::SpanId::invalid(), - false, - 0, - )); + 0, + )); // Make new sampling decision or use parent sampling decision let sampling_decision = if no_parent || remote_parent { self.make_sampling_decision( - builder.parent_context.as_ref(), + parent_span_context.as_ref(), trace_id, span_id, &builder.name, @@ -177,7 +173,10 @@ impl api::Tracer for Tracer { &link_options, ) } else { - Some((parent_trace_flags, Vec::new())) + // has parent that is local: use parent if sampled, else `None`. + parent_span_context + .filter(|span_context| span_context.is_sampled()) + .map(|_| (parent_trace_flags, Vec::new())) }; // Build optional inner context, `None` if not recording. @@ -200,7 +199,7 @@ impl api::Tracer for Tracer { let resource = config.resource.clone(); exporter::trace::SpanData { - context: api::SpanContext::new(trace_id, span_id, trace_flags, false), + span_context: api::SpanContext::new(trace_id, span_id, trace_flags, false), parent_span_id, span_kind, name: builder.name, @@ -225,87 +224,4 @@ impl api::Tracer for Tracer { sdk::Span::new(span_id, inner, self.clone()) } - - /// Returns the current active span. - /// - /// When getting the current `Span`, the `Tracer` will return a placeholder - /// `Span` with an invalid `SpanContext` if there is no currently active `Span`. - fn get_active_span(&self) -> Self::Span { - CURRENT_SPANS - .with(|spans| spans.borrow().current()) - .unwrap_or_else(|| self.invalid()) - } - - /// Mark a given `Span` as active. - fn mark_span_as_active(&self, span: &Self::Span) { - CURRENT_SPANS.with(|spans| { - spans.borrow_mut().push(span.clone()); - }) - } - - /// Mark a given `Span` as inactive. - fn mark_span_as_inactive(&self, span_id: api::SpanId) { - CURRENT_SPANS.with(|spans| { - spans.borrow_mut().pop(span_id); - }) - } - - /// Clone span - fn clone_span(&self, span: &Self::Span) -> Self::Span { - span.clone() - } -} - -/// Used to track `Span` and its status in the stack -struct ContextId { - span: sdk::Span, - duplicate: bool, -} - -/// A stack of `Span`s that can be used to track active `Span`s per thread. -pub(crate) struct SpanStack { - stack: Vec, - ids: HashSet, -} - -impl SpanStack { - /// Create a new `SpanStack` - fn new() -> Self { - SpanStack { - stack: vec![], - ids: HashSet::new(), - } - } - - /// Push a `Span` to the stack - fn push(&mut self, span: sdk::Span) { - let duplicate = self.ids.contains(&span.id()); - if !duplicate { - self.ids.insert(span.id()); - } - self.stack.push(ContextId { span, duplicate }) - } - - /// Pop a `Span` from the stack - fn pop(&mut self, expected_id: api::SpanId) -> Option { - if self.stack.last()?.span.id() == expected_id { - let ContextId { span, duplicate } = self.stack.pop()?; - if !duplicate { - self.ids.remove(&span.id()); - } - Some(span) - } else { - None - } - } - - /// Find the latest `Span` that is not doubly marked as active (pushed twice) - #[inline] - fn current(&self) -> Option { - self.stack - .iter() - .rev() - .find(|context_id| !context_id.duplicate) - .map(|context_id| context_id.span.clone()) - } } From ad2dfd687519e6dfcf1f8689afb3f0fff040acce Mon Sep 17 00:00:00 2001 From: Julian Tescher Date: Sat, 25 Apr 2020 12:20:08 -0700 Subject: [PATCH 3/5] Fix documentation links and spelling --- src/api/trace/b3_propagator.rs | 7 ++----- src/api/trace/context.rs | 2 +- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/src/api/trace/b3_propagator.rs b/src/api/trace/b3_propagator.rs index febb2e57d1..d293e25bf2 100644 --- a/src/api/trace/b3_propagator.rs +++ b/src/api/trace/b3_propagator.rs @@ -158,12 +158,9 @@ impl api::HttpTextFormat for B3Propagator { } } - /// Retrieves encoded data using the provided [`Carrier`]. If no data for this + /// Retrieves encoded data using the provided `Carrier`. If no data for this /// format was retrieved OR if the retrieved data is invalid, then the current - /// [`Context`] is returned. - /// - /// [`Context`]: ../../struct.Context.html - /// [`Carrier`]: ../trait.Carrier.html + /// `Context` is returned. fn extract(&self, carrier: &dyn api::Carrier) -> api::Context { let span_context = if self.single_header { self.extract_single_header(carrier) diff --git a/src/api/trace/context.rs b/src/api/trace/context.rs index ead2c55277..e51121c18e 100644 --- a/src/api/trace/context.rs +++ b/src/api/trace/context.rs @@ -8,7 +8,7 @@ lazy_static::lazy_static! { struct Span(Box); struct RemoteSpanContext(api::SpanContext); -/// Methods for soring and retrieving trace data in a context. +/// Methods for storing and retrieving trace data in a context. pub trait TraceContextExt { /// Returns a clone of the current context with the included span. /// From 129bc7ab51387668d56ff6a762d607a16d452b58 Mon Sep 17 00:00:00 2001 From: Julian Tescher Date: Sun, 26 Apr 2020 17:51:00 -0700 Subject: [PATCH 4/5] Allow extract to accept a context --- src/api/context/propagation/text_propagator.rs | 12 +++++++++++- src/api/trace/b3_propagator.rs | 4 ++-- src/api/trace/context.rs | 8 ++++---- src/api/trace/trace_context_propagator.rs | 6 +++--- 4 files changed, 20 insertions(+), 10 deletions(-) diff --git a/src/api/context/propagation/text_propagator.rs b/src/api/context/propagation/text_propagator.rs index 04298da00b..0251d63b7b 100644 --- a/src/api/context/propagation/text_propagator.rs +++ b/src/api/context/propagation/text_propagator.rs @@ -30,5 +30,15 @@ pub trait HttpTextFormat: Debug { /// /// [`Context`]: ../../struct.Context.html /// [`Carrier`]: ../trait.Carrier.html - fn extract(&self, carrier: &dyn api::Carrier) -> Context; + fn extract(&self, carrier: &dyn api::Carrier) -> Context { + self.extract_with_context(&Context::current(), carrier) + } + + /// Retrieves encoded data using the provided [`Carrier`]. If no data for this + /// format was retrieved OR if the retrieved data is invalid, then the given + /// [`Context`] is returned. + /// + /// [`Context`]: ../../struct.Context.html + /// [`Carrier`]: ../trait.Carrier.html + fn extract_with_context(&self, cx: &Context, carrier: &dyn api::Carrier) -> Context; } diff --git a/src/api/trace/b3_propagator.rs b/src/api/trace/b3_propagator.rs index d293e25bf2..ca853ddb04 100644 --- a/src/api/trace/b3_propagator.rs +++ b/src/api/trace/b3_propagator.rs @@ -161,7 +161,7 @@ impl api::HttpTextFormat for B3Propagator { /// Retrieves encoded data using the provided `Carrier`. If no data for this /// format was retrieved OR if the retrieved data is invalid, then the current /// `Context` is returned. - fn extract(&self, carrier: &dyn api::Carrier) -> api::Context { + fn extract_with_context(&self, cx: &api::Context, carrier: &dyn api::Carrier) -> api::Context { let span_context = if self.single_header { self.extract_single_header(carrier) .unwrap_or_else(|_| api::SpanContext::empty_context()) @@ -170,7 +170,7 @@ impl api::HttpTextFormat for B3Propagator { .unwrap_or_else(|_| api::SpanContext::empty_context()) }; - api::Context::current_with_remote_span_context(span_context) + cx.with_remote_span_context(span_context) } } diff --git a/src/api/trace/context.rs b/src/api/trace/context.rs index e51121c18e..7b3f38dc5a 100644 --- a/src/api/trace/context.rs +++ b/src/api/trace/context.rs @@ -38,10 +38,10 @@ pub trait TraceContextExt { /// ``` fn span(&self) -> &dyn api::Span; - /// Returns a copy of the current context with the span context included. + /// Returns a copy of this context with the span context included. /// /// This is useful for building propagators. - fn current_with_remote_span_context(span_context: api::SpanContext) -> Self; + fn with_remote_span_context(&self, span_context: api::SpanContext) -> Self; /// Returns a reference to the remote span context data stored in this context, /// or none if no remote span context has been set. @@ -67,8 +67,8 @@ impl TraceContextExt for api::Context { } } - fn current_with_remote_span_context(span_context: api::SpanContext) -> Self { - api::Context::current_with_value(RemoteSpanContext(span_context)) + fn with_remote_span_context(&self, span_context: api::SpanContext) -> Self { + self.with_value(RemoteSpanContext(span_context)) } fn remote_span_context(&self) -> Option<&api::SpanContext> { diff --git a/src/api/trace/trace_context_propagator.rs b/src/api/trace/trace_context_propagator.rs index 138ce77af7..8029f29fe1 100644 --- a/src/api/trace/trace_context_propagator.rs +++ b/src/api/trace/trace_context_propagator.rs @@ -103,10 +103,10 @@ impl api::HttpTextFormat for TraceContextPropagator { /// the `SpanContext` and returns it. If no `SpanContext` was retrieved /// OR if the retrieved SpanContext is invalid then an empty `SpanContext` /// is returned. - fn extract(&self, carrier: &dyn api::Carrier) -> api::Context { + fn extract_with_context(&self, cx: &api::Context, carrier: &dyn api::Carrier) -> api::Context { self.extract_span_context(carrier) - .map(api::Context::current_with_remote_span_context) - .unwrap_or_else(|_| api::Context::current()) + .map(|sc| cx.with_remote_span_context(sc)) + .unwrap_or_else(|_| cx.clone()) } } From 9db4033a4efe249f537e79787498779dcb03f902 Mon Sep 17 00:00:00 2001 From: Julian Tescher Date: Sat, 25 Apr 2020 12:17:33 -0700 Subject: [PATCH 5/5] Add Correlations API This adds an implementation of the [Correlations API](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/correlationcontext/api.md) which is used to annotate telemetry, adding context and information to metrics, traces, and logs. It is an abstract data type represented by a set of name/value pairs describing user-defined properties. Example: ```rust let propagator = CorrelationContextPropagator::new(); // can extract from any type that impls `Carrier`, usually an HTTP header map let cx = propagator.extract(&headers); // Iterate over extracted name / value pairs for (name, value) in cx.correlation_context() { // ... } // Add new correlations let cx_with_additions = cx.with_correlations(vec![Key::new("server_id").u64(42)]); // Inject correlations into http request propagator.inject_context(&cx_with_additions, &mut headers); ``` Resolves #62 --- Cargo.toml | 15 +- examples/basic/src/main.rs | 9 +- src/api/core.rs | 36 +++- src/api/correlation/mod.rs | 161 ++++++++++++++++++ src/api/correlation/propagation.rs | 254 +++++++++++++++++++++++++++++ src/api/mod.rs | 2 + 6 files changed, 463 insertions(+), 14 deletions(-) create mode 100644 src/api/correlation/mod.rs create mode 100644 src/api/correlation/propagation.rs diff --git a/Cargo.toml b/Cargo.toml index 639ba87b57..ce801af5f7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,13 +17,14 @@ edition = "2018" [dependencies] base64 = { version = "0.12", optional = true } -futures = { version = "0.3.4", optional = true } -lazy_static = "1.4.0" -pin-project = { version = "0.4.6", optional = true } -prometheus = { version = "0.7.0", optional = true } -rand = { version = "0.7.2", optional = true } -serde = { version = "1.0.104", features = ["derive", "rc"], optional = true } -bincode = { version = "1.2.1", optional = true } +futures = { version = "0.3", optional = true } +lazy_static = "1.4" +percent-encoding = "2.0" +pin-project = { version = "0.4", optional = true } +prometheus = { version = "0.7", optional = true } +rand = { version = "0.7", optional = true } +serde = { version = "1.0", features = ["derive", "rc"], optional = true } +bincode = { version = "1.2", optional = true } [dev-dependencies] criterion = "0.3.1" diff --git a/examples/basic/src/main.rs b/examples/basic/src/main.rs index e4977432a8..cfa1ed69db 100644 --- a/examples/basic/src/main.rs +++ b/examples/basic/src/main.rs @@ -1,5 +1,6 @@ use opentelemetry::api::{ - Gauge, GaugeHandle, Key, Measure, MeasureHandle, Meter, MetricOptions, TraceContextExt, Tracer, + Context, CorrelationContextExt, Gauge, GaugeHandle, Key, Measure, MeasureHandle, Meter, + MetricOptions, TraceContextExt, Tracer, }; use opentelemetry::{global, sdk}; @@ -33,6 +34,8 @@ fn main() -> thrift::Result<()> { init_tracer()?; let meter = sdk::Meter::new("ex_com_basic"); + let foo_key = Key::new("ex.com/foo"); + let bar_key = Key::new("ex.com/bar"); let lemons_key = Key::new("ex_com_lemons"); let another_key = Key::new("ex_com_another"); @@ -54,6 +57,10 @@ fn main() -> thrift::Result<()> { let measure = measure_two.acquire_handle(&common_labels); + let _correlations = + Context::current_with_correlations(vec![foo_key.string("foo1"), bar_key.string("bar1")]) + .attach(); + global::tracer("component-main").in_span("operation", move |cx| { let span = cx.span(); span.add_event( diff --git a/src/api/core.rs b/src/api/core.rs index 49189bcf38..25e43df326 100644 --- a/src/api/core.rs +++ b/src/api/core.rs @@ -75,10 +75,17 @@ impl From<&'static str> for Key { } } -impl Into for Key { +impl From for Key { + /// Convert a `String` to a `Key`. + fn from(string: String) -> Self { + Key(Cow::from(string)) + } +} + +impl From for String { /// Converts `Key` instances into `String`. - fn into(self) -> String { - self.0.to_string() + fn from(key: Key) -> Self { + key.0.into_owned() } } @@ -132,11 +139,11 @@ impl From<&str> for Value { } } -impl Into for Value { +impl From for String { /// Convert `Value` types to `String` for use by exporters that only use /// `String` values. - fn into(self) -> String { - match self { + fn from(value: Value) -> Self { + match value { Value::Bool(value) => value.to_string(), Value::I64(value) => value.to_string(), Value::U64(value) => value.to_string(), @@ -147,6 +154,23 @@ impl Into for Value { } } +impl From<&Value> for String { + /// Convert `&Value` types to `String` for use by exporters that only use + /// `String` values. + fn from(value: &Value) -> Self { + match value { + Value::Bool(value) => value.to_string(), + Value::I64(value) => value.to_string(), + Value::U64(value) => value.to_string(), + Value::F64(value) => value.to_string(), + Value::String(value) => value.clone(), + Value::Bytes(value) => { + String::from_utf8(value.clone()).unwrap_or_else(|_| String::new()) + } + } + } +} + /// `KeyValue` pairs are used by `LabelSet`s and `Span` attributes. #[cfg_attr(feature = "serialize", derive(Deserialize, Serialize))] #[derive(Clone, Debug, PartialEq)] diff --git a/src/api/correlation/mod.rs b/src/api/correlation/mod.rs new file mode 100644 index 0000000000..4800c7ae3a --- /dev/null +++ b/src/api/correlation/mod.rs @@ -0,0 +1,161 @@ +//! # OpenTelemetry Correlation Context API +//! +//! A Correlation Context is used to annotate telemetry, adding context and +//! information to metrics, traces, and logs. It is an abstract data type +//! represented by a set of name/value pairs describing user-defined properties. +//! Each name in a [`CorrelationContext`] is associated with exactly one value. +//! `CorrelationContext`s are serialized according to the editor's draft of +//! the [W3C Correlation Context] specification. +//! +//! [`CorrelationContext`]: struct.CorrelationContext.html +//! [W3C Correlation Context]: https://w3c.github.io/correlation-context/ +//! +//! # Examples +//! +//! ``` +//! use opentelemetry::api::{ +//! CorrelationContextExt, CorrelationContextPropagator, HttpTextFormat, Key +//! }; +//! use std::collections::HashMap; +//! +//! // Example correlation value passed in externally via http headers +//! let mut headers = HashMap::new(); +//! headers.insert("Correlation-Context", "user_id=1".to_string()); +//! +//! let propagator = CorrelationContextPropagator::new(); +//! // can extract from any type that impls `Carrier`, usually an HTTP header map +//! let cx = propagator.extract(&headers); +//! +//! // Iterate over extracted name / value pairs +//! for (name, value) in cx.correlation_context() { +//! // ... +//! } +//! +//! // Add new correlations +//! let cx_with_additions = cx.with_correlations(vec![Key::new("server_id").u64(42)]); +//! +//! // Inject correlations into http request +//! propagator.inject_context(&cx_with_additions, &mut headers); +//! +//! let header_value = headers.get("Correlation-Context").expect("header is injected"); +//! assert!(header_value.contains("user_id=1"), "still contains previous name / value"); +//! assert!(header_value.contains("server_id=42"), "contains new name / value pair"); +//! ``` +use crate::api; +use std::collections::{hash_map, HashMap}; +use std::iter::FromIterator; + +mod propagation; + +pub use propagation::{CorrelationContextExt, CorrelationContextPropagator}; + +/// A set of name/value pairs describing user-defined properties across systems. +#[derive(Debug, Default)] +pub struct CorrelationContext { + inner: HashMap, +} + +impl CorrelationContext { + /// Creates an empty `CorrelationContext`. + pub fn new() -> Self { + CorrelationContext { + inner: HashMap::default(), + } + } + + /// Returns a reference to the value associated with a given name + /// + /// # Examples + /// + /// ``` + /// use opentelemetry::api::{CorrelationContext, Value}; + /// + /// let mut cc = CorrelationContext::new(); + /// let _ = cc.insert("my-name", "my-value"); + /// + /// assert_eq!(cc.get("my-name"), Some(&Value::String("my-value".to_string()))) + /// ``` + pub fn get>(&self, key: T) -> Option<&api::Value> { + self.inner.get(&key.into()) + } + + /// Inserts a name-value pair into the correlation context. + /// + /// If the name was not present, [`None`] is returned. If the name was present, + /// the value is updated, and the old value is returned. + /// + /// # Examples + /// + /// ``` + /// use opentelemetry::api::{CorrelationContext, Value}; + /// + /// let mut cc = CorrelationContext::new(); + /// let _ = cc.insert("my-name", "my-value"); + /// + /// assert_eq!(cc.get("my-name"), Some(&Value::String("my-value".to_string()))) + /// ``` + pub fn insert(&mut self, key: K, value: V) -> Option + where + K: Into, + V: Into, + { + self.inner.insert(key.into(), value.into()) + } + + /// Removes a name from the correlation context, returning the value + /// corresponding to the name if the pair was previously in the map. + pub fn remove>(&mut self, key: K) -> Option { + self.inner.remove(&key.into()) + } + + /// Returns the number of attributes for this correlation context + pub fn len(&self) -> usize { + self.inner.len() + } + + /// Returns `true` if the correlation context contains no items. + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + /// Gets an iterator over the correlation context items, sorted by name. + pub fn iter(&self) -> Iter { + self.into_iter() + } +} + +/// An iterator over the entries of a `CorrelationContext`. +#[derive(Debug)] +pub struct Iter<'a>(hash_map::Iter<'a, api::Key, api::Value>); +impl<'a> Iterator for Iter<'a> { + type Item = (&'a api::Key, &'a api::Value); + + fn next(&mut self) -> Option { + self.0.next() + } +} + +impl<'a> IntoIterator for &'a CorrelationContext { + type Item = (&'a api::Key, &'a api::Value); + type IntoIter = Iter<'a>; + + fn into_iter(self) -> Self::IntoIter { + Iter(self.inner.iter()) + } +} + +impl FromIterator<(api::Key, api::Value)> for CorrelationContext { + fn from_iter>(iter: I) -> Self { + CorrelationContext { + inner: iter.into_iter().collect(), + } + } +} + +impl FromIterator for CorrelationContext { + fn from_iter>(iter: I) -> Self { + CorrelationContext { + inner: iter.into_iter().map(|kv| (kv.key, kv.value)).collect(), + } + } +} diff --git a/src/api/correlation/propagation.rs b/src/api/correlation/propagation.rs new file mode 100644 index 0000000000..2680e03d67 --- /dev/null +++ b/src/api/correlation/propagation.rs @@ -0,0 +1,254 @@ +use super::CorrelationContext; +use crate::api::{self, Context, KeyValue}; +use percent_encoding::{percent_decode_str, utf8_percent_encode, AsciiSet, CONTROLS}; +use std::iter; + +static CORRELATION_CONTEXT_HEADER: &str = "Correlation-Context"; +const FRAGMENT: &AsciiSet = &CONTROLS.add(b' ').add(b'"').add(b';').add(b',').add(b'='); + +lazy_static::lazy_static! { + static ref DEFAULT_CORRELATION_CONTEXT: CorrelationContext = CorrelationContext::default(); +} + +/// Propagates name/value pairs in [W3C Correlation Context] format. +/// +/// [W3C Correlation Context]: https://w3c.github.io/correlation-context/ +#[derive(Debug, Default)] +pub struct CorrelationContextPropagator { + _private: (), +} + +impl CorrelationContextPropagator { + /// Construct a new correlation context provider. + pub fn new() -> Self { + CorrelationContextPropagator { _private: () } + } +} + +impl api::HttpTextFormat for CorrelationContextPropagator { + /// Encodes the values of the `Context` and injects them into the provided `Carrier`. + fn inject_context(&self, cx: &Context, carrier: &mut dyn api::Carrier) { + let correlation_cx = cx.correlation_context(); + if !correlation_cx.is_empty() { + let header_value = correlation_cx + .iter() + .map(|(name, value)| { + utf8_percent_encode(name.as_str().trim(), FRAGMENT) + .chain(iter::once("=")) + .chain(utf8_percent_encode(String::from(value).trim(), FRAGMENT)) + .collect() + }) + .collect::>() + .join(","); + carrier.set(CORRELATION_CONTEXT_HEADER, header_value); + } + } + + /// Extracts a `Context` with correlation context values from a `Carrier`. + fn extract_with_context(&self, cx: &Context, carrier: &dyn api::Carrier) -> Context { + if let Some(header_value) = carrier.get(CORRELATION_CONTEXT_HEADER) { + let correlations = header_value.split(',').flat_map(|context_value| { + if let Some((name_and_value, props)) = context_value + .split(';') + .collect::>() + .split_first() + { + let mut iter = name_and_value.split('='); + if let (Some(name), Some(value)) = (iter.next(), iter.next()) { + let name = percent_decode_str(name).decode_utf8().map_err(|_| ())?; + let value = percent_decode_str(value).decode_utf8().map_err(|_| ())?; + + // TODO: handle props from https://w3c.github.io/correlation-context/ + // for now just append to value + let decoded_props = props + .iter() + .flat_map(|prop| percent_decode_str(prop).decode_utf8()) + .map(|prop| format!(";{}", prop.as_ref().trim())) + .collect::(); + + Ok(KeyValue::new( + name.trim().to_owned(), + value.trim().to_string() + decoded_props.as_str(), + )) + } else { + // Invalid name / value format + Err(()) + } + } else { + // Invalid correlation context value format + Err(()) + } + }); + cx.with_correlations(correlations) + } else { + cx.clone() + } + } +} + +struct Correlations(CorrelationContext); + +/// Methods for soring and retrieving correlation data in a context. +pub trait CorrelationContextExt { + /// Returns a clone of the current context with the included name / value pairs. + /// + /// # Examples + /// + /// ``` + /// use opentelemetry::api::{Context, CorrelationContextExt, KeyValue, Value}; + /// + /// let cx = Context::current_with_correlations(vec![KeyValue::new("my-name", "my-value")]); + /// + /// assert_eq!( + /// cx.correlation_context().get("my-name"), + /// Some(&Value::String("my-value".to_string())), + /// ) + /// ``` + fn current_with_correlations>(correlations: T) -> Self; + + /// Returns a clone of the given context with the included name / value pairs. + /// + /// # Examples + /// + /// ``` + /// use opentelemetry::api::{Context, CorrelationContextExt, KeyValue, Value}; + /// + /// let some_context = Context::current(); + /// let cx = some_context.with_correlations(vec![KeyValue::new("my-name", "my-value")]); + /// + /// assert_eq!( + /// cx.correlation_context().get("my-name"), + /// Some(&Value::String("my-value".to_string())), + /// ) + /// ``` + fn with_correlations>(&self, correlations: T) -> Self; + + /// Returns a clone of the given context with the included name / value pairs. + /// + /// # Examples + /// + /// ``` + /// use opentelemetry::api::{Context, CorrelationContextExt, KeyValue, Value}; + /// + /// let cx = Context::current().with_cleared_correlations(); + /// + /// assert_eq!(cx.correlation_context().len(), 0); + /// ``` + fn with_cleared_correlations(&self) -> Self; + + /// Returns a reference to this context's correlation context, or the default + /// empty correlation context if none has been set. + fn correlation_context(&self) -> &CorrelationContext; +} + +impl CorrelationContextExt for Context { + fn current_with_correlations>(kvs: T) -> Self { + Context::current().with_correlations(kvs) + } + + fn with_correlations>(&self, kvs: T) -> Self { + let merged = self + .correlation_context() + .iter() + .map(|(key, value)| KeyValue::new(key.clone(), value.clone())) + .chain(kvs.into_iter()) + .collect(); + + self.with_value(Correlations(merged)) + } + + fn with_cleared_correlations(&self) -> Self { + self.with_value(Correlations(CorrelationContext::new())) + } + + fn correlation_context(&self) -> &CorrelationContext { + self.get::() + .map(|correlations| &correlations.0) + .unwrap_or_else(|| &DEFAULT_CORRELATION_CONTEXT) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::api::HttpTextFormat; + use crate::api::{Key, Value}; + use std::collections::HashMap; + + #[rustfmt::skip] + fn valid_extract_data() -> Vec<(&'static str, HashMap)> { + vec![ + // "valid w3cHeader" + ("key1=val1,key2=val2", vec![(Key::new("key1"), Value::from("val1")), (Key::new("key2"), Value::from("val2"))].into_iter().collect()), + // "valid w3cHeader with spaces" + ("key1 = val1, key2 =val2 ", vec![(Key::new("key1"), Value::from("val1")), (Key::new("key2"), Value::from("val2"))].into_iter().collect()), + // "valid w3cHeader with properties" + ("key1=val1,key2=val2;prop=1", vec![(Key::new("key1"), Value::from("val1")), (Key::new("key2"), Value::from("val2;prop=1"))].into_iter().collect()), + // "valid header with url-escaped comma" + ("key1=val1,key2=val2%2Cval3", vec![(Key::new("key1"), Value::from("val1")), (Key::new("key2"), Value::from("val2,val3"))].into_iter().collect()), + // "valid header with an invalid header" + ("key1=val1,key2=val2,a,val3", vec![(Key::new("key1"), Value::from("val1")), (Key::new("key2"), Value::from("val2"))].into_iter().collect()), + // "valid header with no value" + ("key1=,key2=val2", vec![(Key::new("key1"), Value::from("")), (Key::new("key2"), Value::from("val2"))].into_iter().collect()), + ] + } + + #[rustfmt::skip] + fn valid_inject_data() -> Vec<(Vec, Vec<&'static str>)> { + vec![ + // "two simple values" + (vec![KeyValue::new("key1", "val1"), KeyValue::new("key2", "val2")], vec!["key1=val1", "key2=val2"]), + // "two values with escaped chars" + (vec![KeyValue::new("key1", "val1,val2"), KeyValue::new("key2", "val3=4")], vec!["key1=val1%2Cval2", "key2=val3%3D4"]), + // "values of non-string types" + ( + vec![ + KeyValue::new("key1", true), + KeyValue::new("key2", Value::I64(123)), + KeyValue::new("key3", Value::U64(123)), + KeyValue::new("key4", Value::F64(123.567)), + ], + vec![ + "key1=true", + "key2=123", + "key3=123", + "key4=123.567", + ], + ), + ] + } + + #[test] + fn extract_correlations() { + let propagator = CorrelationContextPropagator::new(); + + for (header_value, kvs) in valid_extract_data() { + let mut carrier: HashMap<&'static str, String> = HashMap::new(); + carrier.insert(CORRELATION_CONTEXT_HEADER, header_value.to_string()); + let context = propagator.extract(&carrier); + let correlations = context.correlation_context(); + + assert_eq!(kvs.len(), correlations.len()); + for (key, value) in correlations { + assert_eq!(Some(value), kvs.get(key)) + } + } + } + + #[test] + fn inject_correlations() { + let propagator = CorrelationContextPropagator::new(); + + for (kvs, header_parts) in valid_inject_data() { + let mut carrier: HashMap<&'static str, String> = HashMap::new(); + let cx = Context::current_with_correlations(kvs); + propagator.inject_context(&cx, &mut carrier); + let header_value = carrier.get(CORRELATION_CONTEXT_HEADER).unwrap(); + + assert_eq!(header_parts.join(",").len(), header_value.len(),); + for header_part in &header_parts { + assert!(header_value.contains(header_part),) + } + } + } +} diff --git a/src/api/mod.rs b/src/api/mod.rs index bd940178a7..7528fc68eb 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -15,6 +15,7 @@ //! so that the SDK knows where and how to deliver the telemetry. pub mod context; pub mod core; +pub mod correlation; pub mod metrics; pub mod trace; @@ -25,6 +26,7 @@ pub use context::{ propagation::{binary_propagator::BinaryFormat, text_propagator::HttpTextFormat, Carrier}, Context, }; +pub use correlation::{CorrelationContext, CorrelationContextExt, CorrelationContextPropagator}; pub use metrics::{ counter::{Counter, CounterHandle},