From 9c06c8e45c91776648842e0f9a831611f33652ba Mon Sep 17 00:00:00 2001 From: 4t145 Date: Tue, 2 Jul 2024 16:31:57 +0800 Subject: [PATCH 1/2] support web client and server's open telemetry tracing --- tardis/Cargo.toml | 5 +++++ tardis/src/basic/tracing.rs | 36 ++++++++++++++++++++++++++++++++++++ tardis/src/web.rs | 4 ++++ tardis/src/web/web_client.rs | 11 ++++++++--- tardis/src/web/web_server.rs | 5 +++++ 5 files changed, 58 insertions(+), 3 deletions(-) diff --git a/tardis/Cargo.toml b/tardis/Cargo.toml index 01cf6b7..29389b7 100644 --- a/tardis/Cargo.toml +++ b/tardis/Cargo.toml @@ -60,12 +60,15 @@ tracing = [ "opentelemetry", "opentelemetry-otlp", "opentelemetry_sdk", + "opentelemetry-http", + "poem?/opentelemetry" ] tokio-console = ["console-subscriber"] tracing-appender = ["dep:tracing-appender"] web-server-grpc = ["web-server", "dep:poem-grpc"] cluster = ["web-server", "ws-client", "cache"] build-info = ["git-version"] +opentelemetry-http = ["dep:opentelemetry-http"] [dependencies] # Basic @@ -240,6 +243,8 @@ testcontainers-modules = { version = "0.3", features = [ # Debug git-version = { version = "0.3.9", optional = true } +opentelemetry-http = { version = "0.12.0", features = ["tokio"], optional = true } +http = "1.1.0" [dev-dependencies] # Common diff --git a/tardis/src/basic/tracing.rs b/tardis/src/basic/tracing.rs index ddd5abc..e99d3f8 100644 --- a/tardis/src/basic/tracing.rs +++ b/tardis/src/basic/tracing.rs @@ -287,3 +287,39 @@ impl TardisTracing { headers } } + +#[cfg(feature = "tracing")] +pub struct HeaderInjector<'a>(pub &'a mut http::HeaderMap); + +#[cfg(feature = "tracing")] + +impl<'a> opentelemetry::propagation::Injector for HeaderInjector<'a> { + /// Set a key and value in the HeaderMap. Does nothing if the key or value are not valid inputs. + fn set(&mut self, key: &str, value: String) { + if let Ok(name) = http::header::HeaderName::from_bytes(key.as_bytes()) { + if let Ok(val) = http::header::HeaderValue::from_str(&value) { + self.0.insert(name, val); + } + } + } +} + +/// Helper for extracting headers from HTTP Requests. This is used for OpenTelemetry context +/// propagation over HTTP. +/// See [this](https://github.com/open-telemetry/opentelemetry-rust/blob/main/examples/tracing-http-propagator/README.md) +/// for example usage. +#[cfg(feature = "tracing")] +pub struct HeaderExtractor<'a>(pub &'a http::HeaderMap); + +#[cfg(feature = "tracing")] +impl<'a> opentelemetry::propagation::Extractor for HeaderExtractor<'a> { + /// Get a value for a key from the HeaderMap. If the value is not valid ASCII, returns None. + fn get(&self, key: &str) -> Option<&str> { + self.0.get(key).and_then(|value| value.to_str().ok()) + } + + /// Collect all the keys from the HeaderMap. + fn keys(&self) -> Vec<&str> { + self.0.keys().map(|value| value.as_str()).collect::>() + } +} diff --git a/tardis/src/web.rs b/tardis/src/web.rs index 64128ee..d204d55 100644 --- a/tardis/src/web.rs +++ b/tardis/src/web.rs @@ -25,6 +25,10 @@ pub mod uniform_error_mw; #[cfg(feature = "web-client")] #[cfg_attr(docsrs, doc(cfg(feature = "web-client")))] pub mod web_client; + +// #[cfg(feature = "web-client")] +// #[cfg_attr(docsrs, doc(cfg(feature = "web-client")))] +// pub mod web_client_v2; #[cfg(feature = "web-server")] #[cfg_attr(docsrs, doc(cfg(feature = "web-server")))] pub mod web_resp; diff --git a/tardis/src/web/web_client.rs b/tardis/src/web/web_client.rs index bee8ef5..6dec721 100644 --- a/tardis/src/web/web_client.rs +++ b/tardis/src/web/web_client.rs @@ -266,7 +266,6 @@ impl TardisWebClient { let (code, headers, response) = self.request(Method::PATCH, url, headers, Json(body)).await?; self.to_json::(code, headers, response).await } - pub async fn request( &self, method: Method, @@ -289,8 +288,14 @@ impl TardisWebClient { for (key, value) in headers { result = result.header(key.into(), value.into()); } - result = body.apply_on(result); - let response = result.send().await?; + let request = body.apply_on(result).build()?; + #[cfg(feature = "tracing")] + { + use opentelemetry::{global, Context}; + let ctx = Context::current(); + global::get_text_map_propagator(|propagator| propagator.inject_context(&ctx, &mut crate::basic::tracing::HeaderInjector(request.headers_mut()))); + } + let response = self.client.execute(request).await?; let code = response.status().as_u16(); let headers = response .headers() diff --git a/tardis/src/web/web_server.rs b/tardis/src/web/web_server.rs index a094967..4dfda94 100644 --- a/tardis/src/web/web_server.rs +++ b/tardis/src/web/web_server.rs @@ -320,6 +320,11 @@ impl TardisWebServer { }; let route = route.boxed(); let route = route.with(middleware); + #[cfg(feature = "tracing")] + let route = { + let tracer = opentelemetry::global::tracer(""); + route.with(poem::middleware::OpenTelemetryTracing::new(tracer)) + }; if module_options.uniform_error || module_config.uniform_error { self.state.lock().await.add_route(code, route.with(UniformError).with(AddClusterIdHeader).with(cors), data); } else { From dd1c9a80d73b5901e55d5f1ecf7f27232e8a4e99 Mon Sep 17 00:00:00 2001 From: 4t145 Date: Wed, 3 Jul 2024 09:34:55 +0800 Subject: [PATCH 2/2] fix clippy --- tardis/src/web/web_client.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tardis/src/web/web_client.rs b/tardis/src/web/web_client.rs index 6dec721..fdca210 100644 --- a/tardis/src/web/web_client.rs +++ b/tardis/src/web/web_client.rs @@ -288,7 +288,8 @@ impl TardisWebClient { for (key, value) in headers { result = result.header(key.into(), value.into()); } - let request = body.apply_on(result).build()?; + #[allow(unused_mut)] + let mut request = body.apply_on(result).build()?; #[cfg(feature = "tracing")] { use opentelemetry::{global, Context};