diff --git a/opentelemetry-jaeger/examples/actix-udp/src/main.rs b/opentelemetry-jaeger/examples/actix-udp/src/main.rs index ede18ca264..d69073ae85 100644 --- a/opentelemetry-jaeger/examples/actix-udp/src/main.rs +++ b/opentelemetry-jaeger/examples/actix-udp/src/main.rs @@ -1,9 +1,10 @@ use actix_service::Service; use actix_web::middleware::Logger; use actix_web::{web, App, HttpServer}; +use opentelemetry::FutureExt; use opentelemetry::{ global, - trace::{FutureExt, TraceContextExt, TraceError, Tracer}, + trace::{TraceContextExt, TraceError, Tracer}, Key, KeyValue, }; use opentelemetry_sdk::{trace::config, Resource}; diff --git a/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml b/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml index 50f84549b3..785656ace2 100644 --- a/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml +++ b/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml @@ -8,9 +8,9 @@ publish = false [dependencies] once_cell = "1.17" opentelemetry = { path = "../../../opentelemetry" } -opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "metrics", "logs"] } +opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "metrics", "logs", "logs_level_enabled"] } opentelemetry-otlp = { path = "../..", features = ["http-proto", "reqwest-client", "logs"] } -opentelemetry-appender-tracing = { path = "../../../opentelemetry-appender-tracing", default-features = false} +opentelemetry-appender-tracing = { path = "../../../opentelemetry-appender-tracing"} opentelemetry-semantic-conventions = { path = "../../../opentelemetry-semantic-conventions" } tokio = { version = "1.0", features = ["full"] } diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index 8f358c977d..b373baa757 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -2,38 +2,48 @@ use std::sync::Arc; use async_trait::async_trait; use http::{header::CONTENT_TYPE, Method}; -use opentelemetry::logs::{LogError, LogResult}; +use opentelemetry::{ + logs::{LogError, LogResult}, + FutureExt, +}; use opentelemetry_sdk::export::logs::{LogData, LogExporter}; use super::OtlpHttpClient; +use opentelemetry::Context; +use std::thread; #[async_trait] impl LogExporter for OtlpHttpClient { async fn export(&mut self, batch: Vec) -> LogResult<()> { - let client = self - .client - .lock() - .map_err(|e| LogError::Other(e.to_string().into())) - .and_then(|g| match &*g { - Some(client) => Ok(Arc::clone(client)), - _ => Err(LogError::Other("exporter is already shut down".into())), - })?; - - let (body, content_type) = build_body(batch)?; - let mut request = http::Request::builder() - .method(Method::POST) - .uri(&self.collector_endpoint) - .header(CONTENT_TYPE, content_type) - .body(body) - .map_err(|e| crate::Error::RequestFailed(Box::new(e)))?; - - for (k, v) in &self.headers { - request.headers_mut().insert(k.clone(), v.clone()); - } + async { + let client = self + .client + .lock() + .map_err(|e| LogError::Other(e.to_string().into())) + .and_then(|g| match &*g { + Some(client) => Ok(Arc::clone(client)), + _ => Err(LogError::Other("exporter is already shut down".into())), + })?; + + let (body, content_type) = build_body(batch)?; + let mut request = http::Request::builder() + .method(Method::POST) + .uri(&self.collector_endpoint) + .header(CONTENT_TYPE, content_type) + .body(body) + .map_err(|e| crate::Error::RequestFailed(Box::new(e)))?; - client.send(request).await?; + for (k, v) in &self.headers { + request.headers_mut().insert(k.clone(), v.clone()); + } - Ok(()) + client.send(request).await?; + + Ok(()) + } + //.with_current_context_supp() + .with_context(Context::current().with_suppression()) + .await } fn shutdown(&mut self) { diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 6bac467a5b..1cccbd4ade 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -19,6 +19,8 @@ use std::{ time::Duration, }; +use opentelemetry::Context; + /// The interface for plugging into a [`Logger`]. /// /// [`Logger`]: crate::logs::Logger @@ -103,7 +105,7 @@ impl LogProcessor for SimpleLogProcessor { #[cfg(feature = "logs_level_enabled")] fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool { - true + !Context::current().suppression } } @@ -132,7 +134,7 @@ impl> LogProcessor for BatchLogProcessor { #[cfg(feature = "logs_level_enabled")] fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool { - true + !Context::current().suppression } fn force_flush(&self) -> LogResult<()> { diff --git a/opentelemetry/src/context.rs b/opentelemetry/src/context.rs index 67eae958f6..84312efc1d 100644 --- a/opentelemetry/src/context.rs +++ b/opentelemetry/src/context.rs @@ -1,12 +1,17 @@ #[cfg(feature = "trace")] use crate::trace::context::SynchronizedSpan; +use futures_core::stream::Stream; +use futures_sink::Sink; +use pin_project_lite::pin_project; use std::any::{Any, TypeId}; use std::cell::RefCell; use std::collections::HashMap; use std::fmt; use std::hash::{BuildHasherDefault, Hasher}; use std::marker::PhantomData; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context as TaskContext, Poll}; thread_local! { static CURRENT_CONTEXT: RefCell = RefCell::new(Context::default()); @@ -78,6 +83,7 @@ thread_local! { pub struct Context { #[cfg(feature = "trace")] pub(super) span: Option>, + pub suppression: bool, entries: HashMap, BuildHasherDefault>, } @@ -313,6 +319,7 @@ impl Context { Context { span: Some(Arc::new(value)), entries: Context::map_current(|cx| cx.entries.clone()), + suppression: Context::map_current(|cx| cx.suppression), } } @@ -321,6 +328,15 @@ impl Context { Context { span: Some(Arc::new(value)), entries: self.entries.clone(), + suppression: self.suppression, + } + } + + pub fn with_suppression(&self) -> Self { + Context { + suppression: true, + entries: self.entries.clone(), + span: self.span.clone(), } } } @@ -333,6 +349,115 @@ impl fmt::Debug for Context { } } +pin_project! { + /// A future, stream, or sink that has an associated context. + #[derive(Clone, Debug)] + pub struct WithContext { + #[pin] + inner: T, + otel_cx: Context, + } +} + +impl FutureExt for T {} + +impl std::future::Future for WithContext { + type Output = T::Output; + + fn poll(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll { + let this = self.project(); + let _guard = this.otel_cx.clone().attach(); + + this.inner.poll(task_cx) + } +} + +impl Stream for WithContext { + type Item = T::Item; + + fn poll_next(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll> { + let this = self.project(); + let _guard = this.otel_cx.clone().attach(); + T::poll_next(this.inner, task_cx) + } +} + +impl> Sink for WithContext +where + T: Sink, +{ + type Error = T::Error; + + fn poll_ready( + self: Pin<&mut Self>, + task_cx: &mut TaskContext<'_>, + ) -> Poll> { + let this = self.project(); + let _guard = this.otel_cx.clone().attach(); + T::poll_ready(this.inner, task_cx) + } + + fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { + let this = self.project(); + let _guard = this.otel_cx.clone().attach(); + T::start_send(this.inner, item) + } + + fn poll_flush( + self: Pin<&mut Self>, + task_cx: &mut TaskContext<'_>, + ) -> Poll> { + let this = self.project(); + let _guard = this.otel_cx.clone().attach(); + T::poll_flush(this.inner, task_cx) + } + + fn poll_close( + self: Pin<&mut Self>, + task_cx: &mut TaskContext<'_>, + ) -> Poll> { + let this = self.project(); + let _enter = this.otel_cx.clone().attach(); + T::poll_close(this.inner, task_cx) + } +} + +/// Extension trait allowing futures, streams, and sinks to be traced with a span. +pub trait FutureExt: Sized { + /// Attaches the provided [`Context`] to this type, returning a `WithContext` + /// wrapper. + /// + /// When the wrapped type is a future, stream, or sink, the attached context + /// will be set as current while it is being polled. + /// + /// [`Context`]: crate::Context + fn with_context(self, otel_cx: Context) -> WithContext { + WithContext { + inner: self, + otel_cx, + } + } + + /// Attaches the current [`Context`] to this type, returning a `WithContext` + /// wrapper. + /// + /// When the wrapped type is a future, stream, or sink, the attached context + /// will be set as the default while it is being polled. + /// + /// [`Context`]: crate::Context + fn with_current_context(self) -> WithContext { + let otel_cx = Context::current(); + self.with_context(otel_cx) + } + + /// Attaches the current context [`Context`] to this type, with + /// suppression enabled, returning a `WithContext` wrapper. + fn with_current_context_supp(self) -> WithContext { + let otel_cx = Context::current().with_suppression(); + self.with_context(otel_cx) + } +} + /// A guard that resets the current context to the prior context when dropped. #[allow(missing_debug_implementations)] pub struct ContextGuard { diff --git a/opentelemetry/src/lib.rs b/opentelemetry/src/lib.rs index cdc44372d4..7412933c7f 100644 --- a/opentelemetry/src/lib.rs +++ b/opentelemetry/src/lib.rs @@ -208,7 +208,7 @@ pub mod baggage; mod context; -pub use context::{Context, ContextGuard}; +pub use context::{Context, ContextGuard, FutureExt, WithContext}; mod common; diff --git a/opentelemetry/src/trace/context.rs b/opentelemetry/src/trace/context.rs index 681c7b2e0c..648c196105 100644 --- a/opentelemetry/src/trace/context.rs +++ b/opentelemetry/src/trace/context.rs @@ -4,16 +4,7 @@ use crate::{ trace::{Span, SpanContext, Status}, Context, ContextGuard, KeyValue, }; -use futures_core::stream::Stream; -use futures_sink::Sink; -use pin_project_lite::pin_project; -use std::{ - borrow::Cow, - error::Error, - pin::Pin, - sync::Mutex, - task::{Context as TaskContext, Poll}, -}; +use std::{borrow::Cow, error::Error, sync::Mutex}; const NOOP_SPAN: SynchronizedSpan = SynchronizedSpan { span_context: SpanContext::NONE, @@ -359,105 +350,3 @@ where { Context::map_current(|cx| f(cx.span())) } - -pin_project! { - /// A future, stream, or sink that has an associated context. - #[derive(Clone, Debug)] - pub struct WithContext { - #[pin] - inner: T, - otel_cx: Context, - } -} - -impl FutureExt for T {} - -impl std::future::Future for WithContext { - type Output = T::Output; - - fn poll(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll { - let this = self.project(); - let _guard = this.otel_cx.clone().attach(); - - this.inner.poll(task_cx) - } -} - -impl Stream for WithContext { - type Item = T::Item; - - fn poll_next(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll> { - let this = self.project(); - let _guard = this.otel_cx.clone().attach(); - T::poll_next(this.inner, task_cx) - } -} - -impl> Sink for WithContext -where - T: Sink, -{ - type Error = T::Error; - - fn poll_ready( - self: Pin<&mut Self>, - task_cx: &mut TaskContext<'_>, - ) -> Poll> { - let this = self.project(); - let _guard = this.otel_cx.clone().attach(); - T::poll_ready(this.inner, task_cx) - } - - fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { - let this = self.project(); - let _guard = this.otel_cx.clone().attach(); - T::start_send(this.inner, item) - } - - fn poll_flush( - self: Pin<&mut Self>, - task_cx: &mut TaskContext<'_>, - ) -> Poll> { - let this = self.project(); - let _guard = this.otel_cx.clone().attach(); - T::poll_flush(this.inner, task_cx) - } - - fn poll_close( - self: Pin<&mut Self>, - task_cx: &mut TaskContext<'_>, - ) -> Poll> { - let this = self.project(); - let _enter = this.otel_cx.clone().attach(); - T::poll_close(this.inner, task_cx) - } -} - -/// Extension trait allowing futures, streams, and sinks to be traced with a span. -pub trait FutureExt: Sized { - /// Attaches the provided [`Context`] to this type, returning a `WithContext` - /// wrapper. - /// - /// When the wrapped type is a future, stream, or sink, the attached context - /// will be set as current while it is being polled. - /// - /// [`Context`]: crate::Context - fn with_context(self, otel_cx: Context) -> WithContext { - WithContext { - inner: self, - otel_cx, - } - } - - /// Attaches the current [`Context`] to this type, returning a `WithContext` - /// wrapper. - /// - /// When the wrapped type is a future, stream, or sink, the attached context - /// will be set as the default while it is being polled. - /// - /// [`Context`]: crate::Context - fn with_current_context(self) -> WithContext { - let otel_cx = Context::current(); - self.with_context(otel_cx) - } -} diff --git a/opentelemetry/src/trace/mod.rs b/opentelemetry/src/trace/mod.rs index 2cd44f3059..d9aabd48bf 100644 --- a/opentelemetry/src/trace/mod.rs +++ b/opentelemetry/src/trace/mod.rs @@ -174,9 +174,7 @@ mod tracer; mod tracer_provider; pub use self::{ - context::{ - get_active_span, mark_span_as_active, FutureExt, SpanRef, TraceContextExt, WithContext, - }, + context::{get_active_span, mark_span_as_active, SpanRef, TraceContextExt}, span::{Span, SpanKind, Status}, span_context::{SpanContext, SpanId, TraceFlags, TraceId, TraceState}, tracer::{SamplingDecision, SamplingResult, SpanBuilder, Tracer},