Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Suppression of nested logs from dependencies #1330

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion opentelemetry-jaeger/examples/actix-udp/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use actix_service::Service;
use actix_web::middleware::Logger;
use actix_web::{web, App, HttpServer};
use opentelemetry::FutureExt;
use opentelemetry::{
global,
trace::{FutureExt, TraceContextExt, TraceError, Tracer},
trace::{TraceContextExt, TraceError, Tracer},
Key, KeyValue,
};
use opentelemetry_sdk::{trace::config, Resource};
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ publish = false
[dependencies]
once_cell = "1.17"
opentelemetry = { path = "../../../opentelemetry" }
opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "metrics", "logs"] }
opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "metrics", "logs", "logs_level_enabled"] }
opentelemetry-otlp = { path = "../..", features = ["http-proto", "reqwest-client", "logs"] }
opentelemetry-appender-tracing = { path = "../../../opentelemetry-appender-tracing", default-features = false}
opentelemetry-appender-tracing = { path = "../../../opentelemetry-appender-tracing"}
opentelemetry-semantic-conventions = { path = "../../../opentelemetry-semantic-conventions" }

tokio = { version = "1.0", features = ["full"] }
Expand Down
56 changes: 33 additions & 23 deletions opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,48 @@ use std::sync::Arc;

use async_trait::async_trait;
use http::{header::CONTENT_TYPE, Method};
use opentelemetry::logs::{LogError, LogResult};
use opentelemetry::{
logs::{LogError, LogResult},
FutureExt,
};
use opentelemetry_sdk::export::logs::{LogData, LogExporter};

use super::OtlpHttpClient;
use opentelemetry::Context;
use std::thread;

#[async_trait]
impl LogExporter for OtlpHttpClient {
async fn export(&mut self, batch: Vec<LogData>) -> LogResult<()> {
let client = self
.client
.lock()
.map_err(|e| LogError::Other(e.to_string().into()))
.and_then(|g| match &*g {
Some(client) => Ok(Arc::clone(client)),
_ => Err(LogError::Other("exporter is already shut down".into())),
})?;

let (body, content_type) = build_body(batch)?;
let mut request = http::Request::builder()
.method(Method::POST)
.uri(&self.collector_endpoint)
.header(CONTENT_TYPE, content_type)
.body(body)
.map_err(|e| crate::Error::RequestFailed(Box::new(e)))?;

for (k, v) in &self.headers {
request.headers_mut().insert(k.clone(), v.clone());
}
async {
let client = self
.client
.lock()
.map_err(|e| LogError::Other(e.to_string().into()))
.and_then(|g| match &*g {
Some(client) => Ok(Arc::clone(client)),
_ => Err(LogError::Other("exporter is already shut down".into())),
})?;

let (body, content_type) = build_body(batch)?;
let mut request = http::Request::builder()
.method(Method::POST)
.uri(&self.collector_endpoint)
.header(CONTENT_TYPE, content_type)
.body(body)
.map_err(|e| crate::Error::RequestFailed(Box::new(e)))?;

client.send(request).await?;
for (k, v) in &self.headers {
request.headers_mut().insert(k.clone(), v.clone());
}

Ok(())
client.send(request).await?;

Ok(())
}
//.with_current_context_supp()
.with_context(Context::current().with_suppression())
.await
}

fn shutdown(&mut self) {
Expand Down
6 changes: 4 additions & 2 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use std::{
time::Duration,
};

use opentelemetry::Context;

/// The interface for plugging into a [`Logger`].
///
/// [`Logger`]: crate::logs::Logger
Expand Down Expand Up @@ -103,7 +105,7 @@ impl LogProcessor for SimpleLogProcessor {

#[cfg(feature = "logs_level_enabled")]
fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
true
!Context::current().suppression
}
}

Expand Down Expand Up @@ -132,7 +134,7 @@ impl<R: RuntimeChannel<BatchMessage>> LogProcessor for BatchLogProcessor<R> {

#[cfg(feature = "logs_level_enabled")]
fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
true
!Context::current().suppression
}

fn force_flush(&self) -> LogResult<()> {
Expand Down
125 changes: 125 additions & 0 deletions opentelemetry/src/context.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
#[cfg(feature = "trace")]
use crate::trace::context::SynchronizedSpan;
use futures_core::stream::Stream;
use futures_sink::Sink;
use pin_project_lite::pin_project;
use std::any::{Any, TypeId};
use std::cell::RefCell;
use std::collections::HashMap;
use std::fmt;
use std::hash::{BuildHasherDefault, Hasher};
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context as TaskContext, Poll};

thread_local! {
static CURRENT_CONTEXT: RefCell<Context> = RefCell::new(Context::default());
Expand Down Expand Up @@ -78,6 +83,7 @@
pub struct Context {
#[cfg(feature = "trace")]
pub(super) span: Option<Arc<SynchronizedSpan>>,
pub suppression: bool,

Check failure on line 86 in opentelemetry/src/context.rs

View workflow job for this annotation

GitHub Actions / lint

missing documentation for a struct field
entries: HashMap<TypeId, Arc<dyn Any + Sync + Send>, BuildHasherDefault<IdHasher>>,
}

Expand Down Expand Up @@ -313,6 +319,7 @@
Context {
span: Some(Arc::new(value)),
entries: Context::map_current(|cx| cx.entries.clone()),
suppression: Context::map_current(|cx| cx.suppression),
}
}

Expand All @@ -321,6 +328,15 @@
Context {
span: Some(Arc::new(value)),
entries: self.entries.clone(),
suppression: self.suppression,
}
}

pub fn with_suppression(&self) -> Self {

Check failure on line 335 in opentelemetry/src/context.rs

View workflow job for this annotation

GitHub Actions / lint

missing documentation for a method
Context {
suppression: true,
entries: self.entries.clone(),
span: self.span.clone(),
}
}
}
Expand All @@ -333,6 +349,115 @@
}
}

pin_project! {
/// A future, stream, or sink that has an associated context.
#[derive(Clone, Debug)]
pub struct WithContext<T> {
#[pin]
inner: T,
otel_cx: Context,
}
}

impl<T: Sized> FutureExt for T {}

impl<T: std::future::Future> std::future::Future for WithContext<T> {
type Output = T::Output;

fn poll(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Self::Output> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();

this.inner.poll(task_cx)
}
}

impl<T: Stream> Stream for WithContext<T> {
type Item = T::Item;

fn poll_next(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();
T::poll_next(this.inner, task_cx)
}
}

impl<I, T: Sink<I>> Sink<I> for WithContext<T>
where
T: Sink<I>,
{
type Error = T::Error;

fn poll_ready(
self: Pin<&mut Self>,
task_cx: &mut TaskContext<'_>,
) -> Poll<Result<(), Self::Error>> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();
T::poll_ready(this.inner, task_cx)
}

fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();
T::start_send(this.inner, item)
}

fn poll_flush(
self: Pin<&mut Self>,
task_cx: &mut TaskContext<'_>,
) -> Poll<Result<(), Self::Error>> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();
T::poll_flush(this.inner, task_cx)
}

fn poll_close(
self: Pin<&mut Self>,
task_cx: &mut TaskContext<'_>,
) -> Poll<Result<(), Self::Error>> {
let this = self.project();
let _enter = this.otel_cx.clone().attach();
T::poll_close(this.inner, task_cx)
}
}

/// Extension trait allowing futures, streams, and sinks to be traced with a span.
pub trait FutureExt: Sized {
/// Attaches the provided [`Context`] to this type, returning a `WithContext`
/// wrapper.
///
/// When the wrapped type is a future, stream, or sink, the attached context
/// will be set as current while it is being polled.
///
/// [`Context`]: crate::Context
fn with_context(self, otel_cx: Context) -> WithContext<Self> {
WithContext {
inner: self,
otel_cx,
}
}

/// Attaches the current [`Context`] to this type, returning a `WithContext`
/// wrapper.
///
/// When the wrapped type is a future, stream, or sink, the attached context
/// will be set as the default while it is being polled.
///
/// [`Context`]: crate::Context
fn with_current_context(self) -> WithContext<Self> {
let otel_cx = Context::current();
self.with_context(otel_cx)
}

/// Attaches the current context [`Context`] to this type, with
/// suppression enabled, returning a `WithContext` wrapper.
fn with_current_context_supp(self) -> WithContext<Self> {
let otel_cx = Context::current().with_suppression();
self.with_context(otel_cx)
}
}

/// A guard that resets the current context to the prior context when dropped.
#[allow(missing_debug_implementations)]
pub struct ContextGuard {
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ pub mod baggage;

mod context;

pub use context::{Context, ContextGuard};
pub use context::{Context, ContextGuard, FutureExt, WithContext};

mod common;

Expand Down
Loading
Loading