Skip to content

Commit

Permalink
Merge pull request #142 from 4t145/support-set-tracing-service-name
Browse files Browse the repository at this point in the history
using w3c-trace-context propagator
  • Loading branch information
4t145 authored Sep 5, 2024
2 parents e5e9b59 + cd51b3b commit 03ef942
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 32 deletions.
2 changes: 1 addition & 1 deletion examples/tracing-otlp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ version = "0.1.0"
edition = "2021"

[dependencies]
tardis = { path = "../../tardis", features = ["future", "web-server", "reldb-postgres", "tracing"] }
tardis = { path = "../../tardis", features = ["future", "web-server", "reldb-postgres", "tracing", "web-client"] }
tracing = { version = "0.1" }
random-string = { version = "1.0.0" }
4 changes: 2 additions & 2 deletions examples/tracing-otlp/config/conf-default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ doc_urls = [["test env", "http://localhost:8089/"]]
# url = "postgres://postgres:ENC(5892ae51dbeedacdf10ba4c0d7af42a7)@localhost:5432/test"

[fw.log]
level = "debug"

level = "info"
directives = ["tardis=trace"]
[fw.log.tracing]
# https://www.jaegertracing.io/docs/1.49/getting-started/
# endpoint = "http://localhost:4318/"
Expand Down
1 change: 1 addition & 0 deletions examples/tracing-otlp/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ impl Task for SendEmailTask {
#[instrument]
async fn handle(&self, _params: HashMap<String, String>) -> TardisResult<()> {
sleep(Duration::from_millis(300)).await;
tardis::TardisFuns::web_client().get_to_str("http://localhost:8089/send_sms", None).await?;
tokio::spawn(
async move {
let span = debug_span!("into spawn");
Expand Down
7 changes: 3 additions & 4 deletions tardis/src/basic/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,17 +276,15 @@ impl TardisTracing<LogConfig> {
tracing::debug!("[Tardis.Tracing] Batch installing tracer. If you are blocked here, try running tokio in multithread.");
let provider = tracer
.with_trace_config(
opentelemetry_sdk::trace::Config::default()
.with_resource(opentelemetry_sdk::Resource::new([opentelemetry::KeyValue::new("service.name", tracing_service_name())])),
opentelemetry_sdk::trace::Config::default().with_resource(opentelemetry_sdk::Resource::new([opentelemetry::KeyValue::new("service.name", tracing_service_name())])),
)
.install_batch(opentelemetry_sdk::runtime::Tokio)
.expect("fail to install otlp tracer");
tracing::debug!("[Tardis.Tracing] Initialized otlp tracer");
opentelemetry::global::set_text_map_propagator(opentelemetry_sdk::propagation::TraceContextPropagator::new());
opentelemetry::global::shutdown_tracer_provider();
opentelemetry::global::set_tracer_provider(provider.clone());
provider.tracer(tracing_service_name())


}

#[cfg(feature = "tracing")]
Expand All @@ -310,6 +308,7 @@ pub struct HeaderInjector<'a>(pub &'a mut http::HeaderMap);
impl<'a> opentelemetry::propagation::Injector for HeaderInjector<'a> {
/// Set a key and value in the HeaderMap. Does nothing if the key or value are not valid inputs.
fn set(&mut self, key: &str, value: String) {
tracing::debug!("inject key: {}, value: {}", key, value);
if let Ok(name) = http::header::HeaderName::from_bytes(key.as_bytes()) {
if let Ok(val) = http::header::HeaderValue::from_str(&value) {
self.0.insert(name, val);
Expand Down
2 changes: 1 addition & 1 deletion tardis/src/web/uniform_error_mw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl<E: Endpoint> Endpoint for UniformErrorImpl<E> {
async fn call(&self, req: Request) -> poem::Result<Self::Output> {
let method = req.method().to_string();
let url = req.uri().to_string();
trace!("[Tardis.WebServer] Request {} {}", method, url);
trace!(headers = ?req.headers(), "[Tardis.WebServer] Request {} {}", method, url);
let resp = self.0.call(req).await;
match resp {
Ok(resp) => {
Expand Down
53 changes: 30 additions & 23 deletions tardis/src/web/web_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ pub fn str_pair_to_string_pair(p: (&str, &str)) -> (String, String) {
(p.0.to_owned(), p.1.to_owned())
}

pub trait DebugUrl: IntoUrl + std::fmt::Debug {}
impl<T> DebugUrl for T where T: IntoUrl + std::fmt::Debug {}
impl TardisWebClient {
/// # Errors
/// Return error if the client cannot be created.
Expand All @@ -94,45 +96,45 @@ impl TardisWebClient {
}

/// Get and parse response body as text
pub async fn get_to_str(&self, url: impl IntoUrl, headers: impl IntoIterator<Item = (String, String)>) -> TardisResult<TardisHttpResponse<String>> {
pub async fn get_to_str(&self, url: impl DebugUrl, headers: impl IntoIterator<Item = (String, String)>) -> TardisResult<TardisHttpResponse<String>> {
let (code, headers, response) = self.request(Method::GET, url, headers, ()).await?;
self.to_text(code, headers, response).await
}

/// Get and parse response body as json
pub async fn get<T: for<'de> Deserialize<'de>>(&self, url: impl IntoUrl, headers: impl IntoIterator<Item = (String, String)>) -> TardisResult<TardisHttpResponse<T>> {
pub async fn get<T: for<'de> Deserialize<'de>>(&self, url: impl DebugUrl, headers: impl IntoIterator<Item = (String, String)>) -> TardisResult<TardisHttpResponse<T>> {
let (code, headers, response) = self.request(Method::GET, url, headers, ()).await?;
self.to_json::<T>(code, headers, response).await
}

/// Head and ignore response body
pub async fn head_to_void(&self, url: impl IntoUrl, headers: impl IntoIterator<Item = (String, String)>) -> TardisResult<TardisHttpResponse<()>> {
pub async fn head_to_void(&self, url: impl DebugUrl, headers: impl IntoIterator<Item = (String, String)>) -> TardisResult<TardisHttpResponse<()>> {
let (code, headers, _) = self.request(Method::HEAD, url, headers, ()).await?;
Ok(TardisHttpResponse { code, headers, body: None })
}

/// Head and parse response body as json
pub async fn head<T: for<'de> Deserialize<'de>>(&self, url: impl IntoUrl, headers: impl IntoIterator<Item = (String, String)>) -> TardisResult<TardisHttpResponse<T>> {
pub async fn head<T: for<'de> Deserialize<'de>>(&self, url: impl DebugUrl, headers: impl IntoIterator<Item = (String, String)>) -> TardisResult<TardisHttpResponse<T>> {
let (code, headers, response) = self.request(Method::HEAD, url, headers, ()).await?;
self.to_json::<T>(code, headers, response).await
}

/// Delete and ignore response body
pub async fn delete_to_void(&self, url: impl IntoUrl, headers: impl IntoIterator<Item = (String, String)>) -> TardisResult<TardisHttpResponse<()>> {
pub async fn delete_to_void(&self, url: impl DebugUrl, headers: impl IntoIterator<Item = (String, String)>) -> TardisResult<TardisHttpResponse<()>> {
let (code, headers, _) = self.request(Method::DELETE, url, headers, ()).await?;
Ok(TardisHttpResponse { code, headers, body: None })
}

/// Delete and parse response body as json
pub async fn delete<T: for<'de> Deserialize<'de>>(&self, url: impl IntoUrl, headers: impl IntoIterator<Item = (String, String)>) -> TardisResult<TardisHttpResponse<T>> {
pub async fn delete<T: for<'de> Deserialize<'de>>(&self, url: impl DebugUrl, headers: impl IntoIterator<Item = (String, String)>) -> TardisResult<TardisHttpResponse<T>> {
let (code, headers, response) = self.request(Method::DELETE, url, headers, ()).await?;
self.to_json::<T>(code, headers, response).await
}

/// Delete and parse response body as json with a body
pub async fn delete_with_body<T: for<'de> Deserialize<'de>, B: Serialize>(
&self,
url: impl IntoUrl,
url: impl DebugUrl,
headers: impl IntoIterator<Item = (String, String)>,
body: &B,
) -> TardisResult<TardisHttpResponse<T>> {
Expand All @@ -143,7 +145,7 @@ impl TardisWebClient {
/// Post and ignore response body
pub async fn post_str_to_str(
&self,
url: impl IntoUrl,
url: impl DebugUrl,
body: impl Into<String>,
headers: impl IntoIterator<Item = (String, String)>,
) -> TardisResult<TardisHttpResponse<String>> {
Expand All @@ -154,7 +156,7 @@ impl TardisWebClient {
/// Post and parse response body as json
pub async fn post_obj_to_str<B: Serialize>(
&self,
url: impl IntoUrl,
url: impl DebugUrl,
body: &B,
headers: impl IntoIterator<Item = (String, String)>,
) -> TardisResult<TardisHttpResponse<String>> {
Expand All @@ -165,7 +167,7 @@ impl TardisWebClient {
/// Post and parse response body as json
pub async fn post_to_obj<T: for<'de> Deserialize<'de>>(
&self,
url: impl IntoUrl,
url: impl DebugUrl,
body: impl Into<String>,
headers: impl IntoIterator<Item = (String, String)>,
) -> TardisResult<TardisHttpResponse<T>> {
Expand All @@ -176,7 +178,7 @@ impl TardisWebClient {
/// Post and parse response body as json
pub async fn post<B: Serialize, T: for<'de> Deserialize<'de>>(
&self,
url: impl IntoUrl,
url: impl DebugUrl,
body: &B,
headers: impl IntoIterator<Item = (String, String)>,
) -> TardisResult<TardisHttpResponse<T>> {
Expand All @@ -187,7 +189,7 @@ impl TardisWebClient {
/// Put and ignore response body
pub async fn put_str_to_str(
&self,
url: impl IntoUrl,
url: impl DebugUrl,
body: impl Into<String>,
headers: impl IntoIterator<Item = (String, String)>,
) -> TardisResult<TardisHttpResponse<String>> {
Expand All @@ -196,15 +198,20 @@ impl TardisWebClient {
}

/// Put and parse response body as json
pub async fn put_obj_to_str<B: Serialize>(&self, url: impl IntoUrl, body: &B, headers: impl IntoIterator<Item = (String, String)>) -> TardisResult<TardisHttpResponse<String>> {
pub async fn put_obj_to_str<B: Serialize>(
&self,
url: impl DebugUrl,
body: &B,
headers: impl IntoIterator<Item = (String, String)>,
) -> TardisResult<TardisHttpResponse<String>> {
let (code, headers, response) = self.request(Method::PUT, url, headers, Json(body)).await?;
self.to_text(code, headers, response).await
}

/// Put and parse response body as json
pub async fn put_to_obj<T: for<'de> Deserialize<'de>>(
&self,
url: impl IntoUrl,
url: impl DebugUrl,
body: impl Into<String>,
headers: impl IntoIterator<Item = (String, String)>,
) -> TardisResult<TardisHttpResponse<T>> {
Expand All @@ -215,7 +222,7 @@ impl TardisWebClient {
/// Put and parse response body as json
pub async fn put<B: Serialize, T: for<'de> Deserialize<'de>>(
&self,
url: impl IntoUrl,
url: impl DebugUrl,
body: &B,
headers: impl IntoIterator<Item = (String, String)>,
) -> TardisResult<TardisHttpResponse<T>> {
Expand All @@ -226,7 +233,7 @@ impl TardisWebClient {
/// Patch and ignore response body
pub async fn patch_str_to_str(
&self,
url: impl IntoUrl,
url: impl DebugUrl,
body: impl Into<String>,
headers: impl IntoIterator<Item = (String, String)>,
) -> TardisResult<TardisHttpResponse<String>> {
Expand All @@ -237,7 +244,7 @@ impl TardisWebClient {
/// Patch and parse response body as json
pub async fn patch_obj_to_str<B: Serialize>(
&self,
url: impl IntoUrl,
url: impl DebugUrl,
body: &B,
headers: impl IntoIterator<Item = (String, String)>,
) -> TardisResult<TardisHttpResponse<String>> {
Expand All @@ -248,7 +255,7 @@ impl TardisWebClient {
/// Patch and parse response body as json
pub async fn patch_to_obj<T: for<'de> Deserialize<'de>>(
&self,
url: impl IntoUrl,
url: impl DebugUrl,
body: impl Into<String>,
headers: impl IntoIterator<Item = (String, String)>,
) -> TardisResult<TardisHttpResponse<T>> {
Expand All @@ -259,17 +266,18 @@ impl TardisWebClient {
/// Patch and parse response body as json
pub async fn patch<B: Serialize, T: for<'de> Deserialize<'de>>(
&self,
url: impl IntoUrl,
url: impl DebugUrl,
body: &B,
headers: impl IntoIterator<Item = (String, String)>,
) -> TardisResult<TardisHttpResponse<T>> {
let (code, headers, response) = self.request(Method::PATCH, url, headers, Json(body)).await?;
self.to_json::<T>(code, headers, response).await
}
#[tracing::instrument(name="send_http_request", skip_all, fields(method=?method, url=?url))]
pub async fn request<K, V>(
&self,
method: Method,
url: impl IntoUrl,
url: impl DebugUrl,
headers: impl IntoIterator<Item = (K, V)>,
body: impl TardisRequestBody,
) -> TardisResult<(u16, HashMap<String, String>, Response)>
Expand All @@ -279,8 +287,6 @@ impl TardisWebClient {
{
let mut url = url.into_url()?;
TardisFuns::uri.sort_url_query(&mut url);
let method_str = method.to_string();
trace!("[Tardis.WebClient] Request {}:{}", method_str, &url);
let mut result = self.client.request(method, url.clone());
for (key, value) in &self.default_headers {
result = result.header(key, value);
Expand All @@ -296,6 +302,7 @@ impl TardisWebClient {
let ctx = Context::current();
global::get_text_map_propagator(|propagator| propagator.inject_context(&ctx, &mut crate::basic::tracing::HeaderInjector(request.headers_mut())));
}
trace!("start request");
let response = self.client.execute(request).await?;
let code = response.status().as_u16();
let headers = response
Expand All @@ -308,7 +315,7 @@ impl TardisWebClient {
)
})
.collect();
trace!("[Tardis.WebClient] Request {}:{}, Response {}", method_str, url, code);
trace!(code, "response received");
Ok((code, headers, response))
}

Expand Down
2 changes: 1 addition & 1 deletion tardis/src/web/web_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ impl TardisWebServer {
Cors::new().allow_origin(&self.config.allowed_origin)
};
let route = route.boxed();
let route = route.with(middleware);
let route = route.with(middleware).with(poem::middleware::Tracing);
#[cfg(feature = "tracing")]
let route = {
let tracer = opentelemetry::global::tracer(crate::basic::tracing::tracing_service_name());
Expand Down

0 comments on commit 03ef942

Please sign in to comment.