From e5f58606c55e419fbb66601b6548da75961f1edf Mon Sep 17 00:00:00 2001 From: Julian Tescher Date: Wed, 6 May 2020 18:44:01 -0700 Subject: [PATCH] Add Correlations API (#101) This adds an implementation of the [Correlations API](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/correlationcontext/api.md) which is used to annotate telemetry, adding context and information to metrics, traces, and logs. It is an abstract data type represented by a set of name/value pairs describing user-defined properties. Example: ```rust let propagator = CorrelationContextPropagator::new(); // can extract from any type that impls `Carrier`, usually an HTTP header map let cx = propagator.extract(&headers); // Iterate over extracted name / value pairs for (name, value) in cx.correlation_context() { // ... } // Add new correlations let cx_with_additions = cx.with_correlations(vec![Key::new("server_id").u64(42)]); // Inject correlations into http request propagator.inject_context(&cx_with_additions, &mut headers); ``` Resolves #62 --- Cargo.toml | 15 +- examples/basic/src/main.rs | 9 +- src/api/core.rs | 36 +++- src/api/correlation/mod.rs | 161 ++++++++++++++++++ src/api/correlation/propagation.rs | 254 +++++++++++++++++++++++++++++ src/api/mod.rs | 2 + 6 files changed, 463 insertions(+), 14 deletions(-) create mode 100644 src/api/correlation/mod.rs create mode 100644 src/api/correlation/propagation.rs diff --git a/Cargo.toml b/Cargo.toml index 639ba87b57..ce801af5f7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,13 +17,14 @@ edition = "2018" [dependencies] base64 = { version = "0.12", optional = true } -futures = { version = "0.3.4", optional = true } -lazy_static = "1.4.0" -pin-project = { version = "0.4.6", optional = true } -prometheus = { version = "0.7.0", optional = true } -rand = { version = "0.7.2", optional = true } -serde = { version = "1.0.104", features = ["derive", "rc"], optional = true } -bincode = { version = "1.2.1", optional = true } +futures = { version = "0.3", optional = true } +lazy_static = "1.4" +percent-encoding = "2.0" +pin-project = { version = "0.4", optional = true } +prometheus = { version = "0.7", optional = true } +rand = { version = "0.7", optional = true } +serde = { version = "1.0", features = ["derive", "rc"], optional = true } +bincode = { version = "1.2", optional = true } [dev-dependencies] criterion = "0.3.1" diff --git a/examples/basic/src/main.rs b/examples/basic/src/main.rs index e4977432a8..cfa1ed69db 100644 --- a/examples/basic/src/main.rs +++ b/examples/basic/src/main.rs @@ -1,5 +1,6 @@ use opentelemetry::api::{ - Gauge, GaugeHandle, Key, Measure, MeasureHandle, Meter, MetricOptions, TraceContextExt, Tracer, + Context, CorrelationContextExt, Gauge, GaugeHandle, Key, Measure, MeasureHandle, Meter, + MetricOptions, TraceContextExt, Tracer, }; use opentelemetry::{global, sdk}; @@ -33,6 +34,8 @@ fn main() -> thrift::Result<()> { init_tracer()?; let meter = sdk::Meter::new("ex_com_basic"); + let foo_key = Key::new("ex.com/foo"); + let bar_key = Key::new("ex.com/bar"); let lemons_key = Key::new("ex_com_lemons"); let another_key = Key::new("ex_com_another"); @@ -54,6 +57,10 @@ fn main() -> thrift::Result<()> { let measure = measure_two.acquire_handle(&common_labels); + let _correlations = + Context::current_with_correlations(vec![foo_key.string("foo1"), bar_key.string("bar1")]) + .attach(); + global::tracer("component-main").in_span("operation", move |cx| { let span = cx.span(); span.add_event( diff --git a/src/api/core.rs b/src/api/core.rs index 49189bcf38..25e43df326 100644 --- a/src/api/core.rs +++ b/src/api/core.rs @@ -75,10 +75,17 @@ impl From<&'static str> for Key { } } -impl Into for Key { +impl From for Key { + /// Convert a `String` to a `Key`. + fn from(string: String) -> Self { + Key(Cow::from(string)) + } +} + +impl From for String { /// Converts `Key` instances into `String`. - fn into(self) -> String { - self.0.to_string() + fn from(key: Key) -> Self { + key.0.into_owned() } } @@ -132,11 +139,11 @@ impl From<&str> for Value { } } -impl Into for Value { +impl From for String { /// Convert `Value` types to `String` for use by exporters that only use /// `String` values. - fn into(self) -> String { - match self { + fn from(value: Value) -> Self { + match value { Value::Bool(value) => value.to_string(), Value::I64(value) => value.to_string(), Value::U64(value) => value.to_string(), @@ -147,6 +154,23 @@ impl Into for Value { } } +impl From<&Value> for String { + /// Convert `&Value` types to `String` for use by exporters that only use + /// `String` values. + fn from(value: &Value) -> Self { + match value { + Value::Bool(value) => value.to_string(), + Value::I64(value) => value.to_string(), + Value::U64(value) => value.to_string(), + Value::F64(value) => value.to_string(), + Value::String(value) => value.clone(), + Value::Bytes(value) => { + String::from_utf8(value.clone()).unwrap_or_else(|_| String::new()) + } + } + } +} + /// `KeyValue` pairs are used by `LabelSet`s and `Span` attributes. #[cfg_attr(feature = "serialize", derive(Deserialize, Serialize))] #[derive(Clone, Debug, PartialEq)] diff --git a/src/api/correlation/mod.rs b/src/api/correlation/mod.rs new file mode 100644 index 0000000000..4800c7ae3a --- /dev/null +++ b/src/api/correlation/mod.rs @@ -0,0 +1,161 @@ +//! # OpenTelemetry Correlation Context API +//! +//! A Correlation Context is used to annotate telemetry, adding context and +//! information to metrics, traces, and logs. It is an abstract data type +//! represented by a set of name/value pairs describing user-defined properties. +//! Each name in a [`CorrelationContext`] is associated with exactly one value. +//! `CorrelationContext`s are serialized according to the editor's draft of +//! the [W3C Correlation Context] specification. +//! +//! [`CorrelationContext`]: struct.CorrelationContext.html +//! [W3C Correlation Context]: https://w3c.github.io/correlation-context/ +//! +//! # Examples +//! +//! ``` +//! use opentelemetry::api::{ +//! CorrelationContextExt, CorrelationContextPropagator, HttpTextFormat, Key +//! }; +//! use std::collections::HashMap; +//! +//! // Example correlation value passed in externally via http headers +//! let mut headers = HashMap::new(); +//! headers.insert("Correlation-Context", "user_id=1".to_string()); +//! +//! let propagator = CorrelationContextPropagator::new(); +//! // can extract from any type that impls `Carrier`, usually an HTTP header map +//! let cx = propagator.extract(&headers); +//! +//! // Iterate over extracted name / value pairs +//! for (name, value) in cx.correlation_context() { +//! // ... +//! } +//! +//! // Add new correlations +//! let cx_with_additions = cx.with_correlations(vec![Key::new("server_id").u64(42)]); +//! +//! // Inject correlations into http request +//! propagator.inject_context(&cx_with_additions, &mut headers); +//! +//! let header_value = headers.get("Correlation-Context").expect("header is injected"); +//! assert!(header_value.contains("user_id=1"), "still contains previous name / value"); +//! assert!(header_value.contains("server_id=42"), "contains new name / value pair"); +//! ``` +use crate::api; +use std::collections::{hash_map, HashMap}; +use std::iter::FromIterator; + +mod propagation; + +pub use propagation::{CorrelationContextExt, CorrelationContextPropagator}; + +/// A set of name/value pairs describing user-defined properties across systems. +#[derive(Debug, Default)] +pub struct CorrelationContext { + inner: HashMap, +} + +impl CorrelationContext { + /// Creates an empty `CorrelationContext`. + pub fn new() -> Self { + CorrelationContext { + inner: HashMap::default(), + } + } + + /// Returns a reference to the value associated with a given name + /// + /// # Examples + /// + /// ``` + /// use opentelemetry::api::{CorrelationContext, Value}; + /// + /// let mut cc = CorrelationContext::new(); + /// let _ = cc.insert("my-name", "my-value"); + /// + /// assert_eq!(cc.get("my-name"), Some(&Value::String("my-value".to_string()))) + /// ``` + pub fn get>(&self, key: T) -> Option<&api::Value> { + self.inner.get(&key.into()) + } + + /// Inserts a name-value pair into the correlation context. + /// + /// If the name was not present, [`None`] is returned. If the name was present, + /// the value is updated, and the old value is returned. + /// + /// # Examples + /// + /// ``` + /// use opentelemetry::api::{CorrelationContext, Value}; + /// + /// let mut cc = CorrelationContext::new(); + /// let _ = cc.insert("my-name", "my-value"); + /// + /// assert_eq!(cc.get("my-name"), Some(&Value::String("my-value".to_string()))) + /// ``` + pub fn insert(&mut self, key: K, value: V) -> Option + where + K: Into, + V: Into, + { + self.inner.insert(key.into(), value.into()) + } + + /// Removes a name from the correlation context, returning the value + /// corresponding to the name if the pair was previously in the map. + pub fn remove>(&mut self, key: K) -> Option { + self.inner.remove(&key.into()) + } + + /// Returns the number of attributes for this correlation context + pub fn len(&self) -> usize { + self.inner.len() + } + + /// Returns `true` if the correlation context contains no items. + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + /// Gets an iterator over the correlation context items, sorted by name. + pub fn iter(&self) -> Iter { + self.into_iter() + } +} + +/// An iterator over the entries of a `CorrelationContext`. +#[derive(Debug)] +pub struct Iter<'a>(hash_map::Iter<'a, api::Key, api::Value>); +impl<'a> Iterator for Iter<'a> { + type Item = (&'a api::Key, &'a api::Value); + + fn next(&mut self) -> Option { + self.0.next() + } +} + +impl<'a> IntoIterator for &'a CorrelationContext { + type Item = (&'a api::Key, &'a api::Value); + type IntoIter = Iter<'a>; + + fn into_iter(self) -> Self::IntoIter { + Iter(self.inner.iter()) + } +} + +impl FromIterator<(api::Key, api::Value)> for CorrelationContext { + fn from_iter>(iter: I) -> Self { + CorrelationContext { + inner: iter.into_iter().collect(), + } + } +} + +impl FromIterator for CorrelationContext { + fn from_iter>(iter: I) -> Self { + CorrelationContext { + inner: iter.into_iter().map(|kv| (kv.key, kv.value)).collect(), + } + } +} diff --git a/src/api/correlation/propagation.rs b/src/api/correlation/propagation.rs new file mode 100644 index 0000000000..2680e03d67 --- /dev/null +++ b/src/api/correlation/propagation.rs @@ -0,0 +1,254 @@ +use super::CorrelationContext; +use crate::api::{self, Context, KeyValue}; +use percent_encoding::{percent_decode_str, utf8_percent_encode, AsciiSet, CONTROLS}; +use std::iter; + +static CORRELATION_CONTEXT_HEADER: &str = "Correlation-Context"; +const FRAGMENT: &AsciiSet = &CONTROLS.add(b' ').add(b'"').add(b';').add(b',').add(b'='); + +lazy_static::lazy_static! { + static ref DEFAULT_CORRELATION_CONTEXT: CorrelationContext = CorrelationContext::default(); +} + +/// Propagates name/value pairs in [W3C Correlation Context] format. +/// +/// [W3C Correlation Context]: https://w3c.github.io/correlation-context/ +#[derive(Debug, Default)] +pub struct CorrelationContextPropagator { + _private: (), +} + +impl CorrelationContextPropagator { + /// Construct a new correlation context provider. + pub fn new() -> Self { + CorrelationContextPropagator { _private: () } + } +} + +impl api::HttpTextFormat for CorrelationContextPropagator { + /// Encodes the values of the `Context` and injects them into the provided `Carrier`. + fn inject_context(&self, cx: &Context, carrier: &mut dyn api::Carrier) { + let correlation_cx = cx.correlation_context(); + if !correlation_cx.is_empty() { + let header_value = correlation_cx + .iter() + .map(|(name, value)| { + utf8_percent_encode(name.as_str().trim(), FRAGMENT) + .chain(iter::once("=")) + .chain(utf8_percent_encode(String::from(value).trim(), FRAGMENT)) + .collect() + }) + .collect::>() + .join(","); + carrier.set(CORRELATION_CONTEXT_HEADER, header_value); + } + } + + /// Extracts a `Context` with correlation context values from a `Carrier`. + fn extract_with_context(&self, cx: &Context, carrier: &dyn api::Carrier) -> Context { + if let Some(header_value) = carrier.get(CORRELATION_CONTEXT_HEADER) { + let correlations = header_value.split(',').flat_map(|context_value| { + if let Some((name_and_value, props)) = context_value + .split(';') + .collect::>() + .split_first() + { + let mut iter = name_and_value.split('='); + if let (Some(name), Some(value)) = (iter.next(), iter.next()) { + let name = percent_decode_str(name).decode_utf8().map_err(|_| ())?; + let value = percent_decode_str(value).decode_utf8().map_err(|_| ())?; + + // TODO: handle props from https://w3c.github.io/correlation-context/ + // for now just append to value + let decoded_props = props + .iter() + .flat_map(|prop| percent_decode_str(prop).decode_utf8()) + .map(|prop| format!(";{}", prop.as_ref().trim())) + .collect::(); + + Ok(KeyValue::new( + name.trim().to_owned(), + value.trim().to_string() + decoded_props.as_str(), + )) + } else { + // Invalid name / value format + Err(()) + } + } else { + // Invalid correlation context value format + Err(()) + } + }); + cx.with_correlations(correlations) + } else { + cx.clone() + } + } +} + +struct Correlations(CorrelationContext); + +/// Methods for soring and retrieving correlation data in a context. +pub trait CorrelationContextExt { + /// Returns a clone of the current context with the included name / value pairs. + /// + /// # Examples + /// + /// ``` + /// use opentelemetry::api::{Context, CorrelationContextExt, KeyValue, Value}; + /// + /// let cx = Context::current_with_correlations(vec![KeyValue::new("my-name", "my-value")]); + /// + /// assert_eq!( + /// cx.correlation_context().get("my-name"), + /// Some(&Value::String("my-value".to_string())), + /// ) + /// ``` + fn current_with_correlations>(correlations: T) -> Self; + + /// Returns a clone of the given context with the included name / value pairs. + /// + /// # Examples + /// + /// ``` + /// use opentelemetry::api::{Context, CorrelationContextExt, KeyValue, Value}; + /// + /// let some_context = Context::current(); + /// let cx = some_context.with_correlations(vec![KeyValue::new("my-name", "my-value")]); + /// + /// assert_eq!( + /// cx.correlation_context().get("my-name"), + /// Some(&Value::String("my-value".to_string())), + /// ) + /// ``` + fn with_correlations>(&self, correlations: T) -> Self; + + /// Returns a clone of the given context with the included name / value pairs. + /// + /// # Examples + /// + /// ``` + /// use opentelemetry::api::{Context, CorrelationContextExt, KeyValue, Value}; + /// + /// let cx = Context::current().with_cleared_correlations(); + /// + /// assert_eq!(cx.correlation_context().len(), 0); + /// ``` + fn with_cleared_correlations(&self) -> Self; + + /// Returns a reference to this context's correlation context, or the default + /// empty correlation context if none has been set. + fn correlation_context(&self) -> &CorrelationContext; +} + +impl CorrelationContextExt for Context { + fn current_with_correlations>(kvs: T) -> Self { + Context::current().with_correlations(kvs) + } + + fn with_correlations>(&self, kvs: T) -> Self { + let merged = self + .correlation_context() + .iter() + .map(|(key, value)| KeyValue::new(key.clone(), value.clone())) + .chain(kvs.into_iter()) + .collect(); + + self.with_value(Correlations(merged)) + } + + fn with_cleared_correlations(&self) -> Self { + self.with_value(Correlations(CorrelationContext::new())) + } + + fn correlation_context(&self) -> &CorrelationContext { + self.get::() + .map(|correlations| &correlations.0) + .unwrap_or_else(|| &DEFAULT_CORRELATION_CONTEXT) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::api::HttpTextFormat; + use crate::api::{Key, Value}; + use std::collections::HashMap; + + #[rustfmt::skip] + fn valid_extract_data() -> Vec<(&'static str, HashMap)> { + vec![ + // "valid w3cHeader" + ("key1=val1,key2=val2", vec![(Key::new("key1"), Value::from("val1")), (Key::new("key2"), Value::from("val2"))].into_iter().collect()), + // "valid w3cHeader with spaces" + ("key1 = val1, key2 =val2 ", vec![(Key::new("key1"), Value::from("val1")), (Key::new("key2"), Value::from("val2"))].into_iter().collect()), + // "valid w3cHeader with properties" + ("key1=val1,key2=val2;prop=1", vec![(Key::new("key1"), Value::from("val1")), (Key::new("key2"), Value::from("val2;prop=1"))].into_iter().collect()), + // "valid header with url-escaped comma" + ("key1=val1,key2=val2%2Cval3", vec![(Key::new("key1"), Value::from("val1")), (Key::new("key2"), Value::from("val2,val3"))].into_iter().collect()), + // "valid header with an invalid header" + ("key1=val1,key2=val2,a,val3", vec![(Key::new("key1"), Value::from("val1")), (Key::new("key2"), Value::from("val2"))].into_iter().collect()), + // "valid header with no value" + ("key1=,key2=val2", vec![(Key::new("key1"), Value::from("")), (Key::new("key2"), Value::from("val2"))].into_iter().collect()), + ] + } + + #[rustfmt::skip] + fn valid_inject_data() -> Vec<(Vec, Vec<&'static str>)> { + vec![ + // "two simple values" + (vec![KeyValue::new("key1", "val1"), KeyValue::new("key2", "val2")], vec!["key1=val1", "key2=val2"]), + // "two values with escaped chars" + (vec![KeyValue::new("key1", "val1,val2"), KeyValue::new("key2", "val3=4")], vec!["key1=val1%2Cval2", "key2=val3%3D4"]), + // "values of non-string types" + ( + vec![ + KeyValue::new("key1", true), + KeyValue::new("key2", Value::I64(123)), + KeyValue::new("key3", Value::U64(123)), + KeyValue::new("key4", Value::F64(123.567)), + ], + vec![ + "key1=true", + "key2=123", + "key3=123", + "key4=123.567", + ], + ), + ] + } + + #[test] + fn extract_correlations() { + let propagator = CorrelationContextPropagator::new(); + + for (header_value, kvs) in valid_extract_data() { + let mut carrier: HashMap<&'static str, String> = HashMap::new(); + carrier.insert(CORRELATION_CONTEXT_HEADER, header_value.to_string()); + let context = propagator.extract(&carrier); + let correlations = context.correlation_context(); + + assert_eq!(kvs.len(), correlations.len()); + for (key, value) in correlations { + assert_eq!(Some(value), kvs.get(key)) + } + } + } + + #[test] + fn inject_correlations() { + let propagator = CorrelationContextPropagator::new(); + + for (kvs, header_parts) in valid_inject_data() { + let mut carrier: HashMap<&'static str, String> = HashMap::new(); + let cx = Context::current_with_correlations(kvs); + propagator.inject_context(&cx, &mut carrier); + let header_value = carrier.get(CORRELATION_CONTEXT_HEADER).unwrap(); + + assert_eq!(header_parts.join(",").len(), header_value.len(),); + for header_part in &header_parts { + assert!(header_value.contains(header_part),) + } + } + } +} diff --git a/src/api/mod.rs b/src/api/mod.rs index bd940178a7..7528fc68eb 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -15,6 +15,7 @@ //! so that the SDK knows where and how to deliver the telemetry. pub mod context; pub mod core; +pub mod correlation; pub mod metrics; pub mod trace; @@ -25,6 +26,7 @@ pub use context::{ propagation::{binary_propagator::BinaryFormat, text_propagator::HttpTextFormat, Carrier}, Context, }; +pub use correlation::{CorrelationContext, CorrelationContextExt, CorrelationContextPropagator}; pub use metrics::{ counter::{Counter, CounterHandle},