diff --git a/opentelemetry-http/Cargo.toml b/opentelemetry-http/Cargo.toml index 77da400bd2..04003e6a90 100644 --- a/opentelemetry-http/Cargo.toml +++ b/opentelemetry-http/Cargo.toml @@ -24,3 +24,4 @@ hyper-util = { workspace = true, features = ["client-legacy", "http1", "http2"], opentelemetry = { version = "0.27", path = "../opentelemetry", features = ["trace"] } reqwest = { workspace = true, features = ["blocking"], optional = true } tokio = { workspace = true, features = ["time"], optional = true } +futures-timer = "3.0.3" diff --git a/opentelemetry-http/src/lib.rs b/opentelemetry-http/src/lib.rs index 778f8aee98..558b80162e 100644 --- a/opentelemetry-http/src/lib.rs +++ b/opentelemetry-http/src/lib.rs @@ -1,10 +1,14 @@ use async_trait::async_trait; use std::fmt::Debug; - +use std::future::Future; +use std::pin::Pin; +use std::task::Poll; +use std::time::Duration; #[doc(no_inline)] pub use bytes::Bytes; #[doc(no_inline)] pub use http::{Request, Response}; +use opentelemetry::Context; use opentelemetry::propagation::{Extractor, Injector}; /// Helper for injecting headers into HTTP Requests. This is used for OpenTelemetry context @@ -102,7 +106,7 @@ mod reqwest { #[cfg(feature = "hyper")] pub mod hyper { - use crate::ResponseExt; + use crate::{timeout, ResponseExt}; use super::{async_trait, Bytes, HttpClient, HttpError, Request, Response}; use http::HeaderValue; @@ -116,7 +120,7 @@ pub mod hyper { use std::pin::Pin; use std::task::{self, Poll}; use std::time::Duration; - use tokio::time; + #[derive(Debug, Clone)] pub struct HyperClient @@ -163,7 +167,7 @@ pub mod hyper { .headers_mut() .insert(http::header::AUTHORIZATION, authorization.clone()); } - let mut response = time::timeout(self.timeout, self.inner.request(request)).await??; + let mut response = timeout(self.timeout, self.inner.request(request)).await??; let headers = std::mem::take(response.headers_mut()); let mut http_response = Response::builder() @@ -218,6 +222,40 @@ impl ResponseExt for Response { } } +struct Timeout { + future: F, + delay: Pin + Send>>, +} + +impl Future for Timeout where + F::Output: Send, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + let this = self.get_mut(); + + // Poll the delay future + if this.delay.as_mut().poll(cx).is_ready() { + return Poll::Ready(Err("timeout")); + } + + // Poll the main future + match Pin::new(&mut this.future).poll(cx) { + Poll::Ready(output) => Poll::Ready(Ok(output)), + Poll::Pending => Poll::Pending, + } + } +} + +fn timeout(duration: Duration, future: F) -> Timeout { + Timeout { + future, + delay: Box::pin(futures_timer::Delay::new(duration)), + } +} + + #[cfg(test)] mod tests { use super::*; diff --git a/opentelemetry-otlp/src/exporter/http/metrics.rs b/opentelemetry-otlp/src/exporter/http/metrics.rs index 5ba84b30e6..0e1da9dc5d 100644 --- a/opentelemetry-otlp/src/exporter/http/metrics.rs +++ b/opentelemetry-otlp/src/exporter/http/metrics.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use async_trait::async_trait; use http::{header::CONTENT_TYPE, Method}; +use opentelemetry::otel_debug; use opentelemetry_sdk::metrics::data::ResourceMetrics; use opentelemetry_sdk::metrics::{MetricError, MetricResult}; @@ -21,6 +22,8 @@ impl MetricsClient for OtlpHttpClient { _ => Err(MetricError::Other("exporter is already shut down".into())), })?; + otel_debug!(name: "MetricsClientExport"); + let (body, content_type) = self.build_metrics_export_body(metrics)?; let mut request = http::Request::builder() .method(Method::POST) diff --git a/opentelemetry-otlp/tests/integration_test/src/test_utils.rs b/opentelemetry-otlp/tests/integration_test/src/test_utils.rs index f913029c00..a67161e876 100644 --- a/opentelemetry-otlp/tests/integration_test/src/test_utils.rs +++ b/opentelemetry-otlp/tests/integration_test/src/test_utils.rs @@ -41,7 +41,7 @@ static INIT_TRACING: Once = Once::new(); fn init_tracing() { INIT_TRACING.call_once(|| { let subscriber = FmtSubscriber::builder() - .with_max_level(tracing::Level::DEBUG) + .with_max_level(tracing::Level::TRACE) .finish(); tracing::subscriber::set_global_default(subscriber) diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index 43bfd0912e..4fc0783b60 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -367,6 +367,8 @@ impl PeriodicReaderInner { return Err(e); } + otel_debug!(name: "PeriodicReaderMetricsExported"); + Ok(()) }