Skip to content

Commit

Permalink
feat: tracing for http transports (alloy-rs#681)
Browse files Browse the repository at this point in the history
  • Loading branch information
prestwich authored and ben186 committed Jul 27, 2024
1 parent 8a01bf0 commit 5b79a5f
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 44 deletions.
4 changes: 3 additions & 1 deletion crates/transport-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ serde_json = { workspace = true, optional = true }
tower = { workspace = true, optional = true }

reqwest = { workspace = true, features = ["json"], optional = true }
tracing = { workspace = true, optional = true }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
http-body-util = { workspace = true, optional = true }
Expand All @@ -28,14 +29,15 @@ hyper-util = { workspace = true, features = ["full"], optional = true }

[features]
default = ["reqwest", "reqwest-default-tls"]
reqwest = ["dep:reqwest", "dep:alloy-json-rpc", "dep:serde_json", "dep:tower"]
reqwest = ["dep:reqwest", "dep:alloy-json-rpc", "dep:serde_json", "dep:tower", "dep:tracing"]
hyper = [
"dep:hyper",
"dep:hyper-util",
"dep:http-body-util",
"dep:alloy-json-rpc",
"dep:serde_json",
"dep:tower",
"dep:tracing"
]
reqwest-default-tls = ["reqwest?/default-tls"]
reqwest-native-tls = ["reqwest?/native-tls"]
Expand Down
76 changes: 47 additions & 29 deletions crates/transport-http/src/hyper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use hyper::{
use hyper_util::client::legacy::{connect::Connect, Client};
use std::task;
use tower::Service;
use tracing::{debug, debug_span, trace, Instrument};

impl<C, B> Http<Client<C, Full<B>>>
where
Expand All @@ -18,40 +19,57 @@ where
/// Make a request.
fn request_hyper(&self, req: RequestPacket) -> TransportFut<'static> {
let this = self.clone();
Box::pin(async move {
let ser = req.serialize().map_err(TransportError::ser_err)?;
let span = debug_span!("HyperTransport", url = %self.url);
Box::pin(
async move {
debug!(count = req.len(), "sending request packet to server");
let ser = req.serialize().map_err(TransportError::ser_err)?;
// convert the Box<RawValue> into a hyper request<B>
let body = Full::from(Bytes::from(<Box<[u8]>>::from(<Box<str>>::from(ser))));
let req = hyper::Request::builder()
.method(hyper::Method::POST)
.uri(this.url.as_str())
.header(
header::CONTENT_TYPE,
header::HeaderValue::from_static("application/json"),
)
.body(body)
.expect("request parts are valid");

// convert the Box<RawValue> into a hyper request<B>
let body = Full::from(Bytes::from(<Box<[u8]>>::from(<Box<str>>::from(ser))));
let req = hyper::Request::builder()
.method(hyper::Method::POST)
.uri(this.url.as_str())
.header(header::CONTENT_TYPE, header::HeaderValue::from_static("application/json"))
.body(body)
.expect("request parts are valid");
let resp = this.client.request(req).await.map_err(TransportErrorKind::custom)?;
let status = resp.status();

let resp = this.client.request(req).await.map_err(TransportErrorKind::custom)?;
let status = resp.status();
debug!(%status, "received response from server");

// Unpack data from the response body. We do this regardless of the status code, as we
// want to return the error in the body if there is one.
let body =
resp.into_body().collect().await.map_err(TransportErrorKind::custom)?.to_bytes();
// Unpack data from the response body. We do this regardless of
// the status code, as we want to return the error in the body
// if there is one.
let body = resp
.into_body()
.collect()
.await
.map_err(TransportErrorKind::custom)?
.to_bytes();

if status != hyper::StatusCode::OK {
return Err(TransportErrorKind::custom_str(&format!(
"HTTP error {status} with body: {}",
String::from_utf8_lossy(&body)
)));
}
debug!(bytes = body.len(), "retrieved response body. Use `trace` for full body");
trace!(body = %String::from_utf8_lossy(&body), "response body");

if status != hyper::StatusCode::OK {
return Err(TransportErrorKind::custom_str(&format!(
"HTTP error {status} with body: {}",
String::from_utf8_lossy(&body)
)));
}

// Deser a Box<RawValue> from the body. If deser fails, return the
// body as a string in the error. If the body is not UTF8, this will
// fail and give the empty string in the error.
serde_json::from_slice(&body).map_err(|err| {
TransportError::deser_err(err, String::from_utf8_lossy(body.as_ref()))
})
})
// Deser a Box<RawValue> from the body. If deser fails, return
// the body as a string in the error. The conversion to String
// is lossy and may not cover all the bytes in the body.
serde_json::from_slice(&body).map_err(|err| {
TransportError::deser_err(err, String::from_utf8_lossy(body.as_ref()))
})
}
.instrument(span),
)
}
}

Expand Down
50 changes: 36 additions & 14 deletions crates/transport-http/src/reqwest.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::Http;
use alloy_json_rpc::{RequestPacket, ResponsePacket};
use alloy_transport::{TransportError, TransportErrorKind, TransportFut};
use reqwest::Response;
use std::task;
use tower::Service;
use tracing::{debug, debug_span, trace, Instrument};
use url::Url;

impl Http<reqwest::Client> {
Expand All @@ -15,21 +15,43 @@ impl Http<reqwest::Client> {
/// Make a request.
fn request_reqwest(&self, req: RequestPacket) -> TransportFut<'static> {
let this = self.clone();
Box::pin(async move {
let resp = this
.client
.post(this.url)
.json(&req)
.send()
.await
.and_then(Response::error_for_status)
.map_err(TransportErrorKind::custom)?;
let span: tracing::Span = debug_span!("ReqwestTransport", url = %self.url);
Box::pin(
async move {
let resp = this
.client
.post(this.url)
.json(&req)
.send()
.await
.map_err(TransportErrorKind::custom)?;
let status = resp.status();

let body = resp.bytes().await.map_err(TransportErrorKind::custom)?;
debug!(%status, "received response from server");

serde_json::from_slice(&body)
.map_err(|err| TransportError::deser_err(err, String::from_utf8_lossy(&body)))
})
// Unpack data from the response body. We do this regardless of
// the status code, as we want to return the error in the body
// if there is one.
let body = resp.bytes().await.map_err(TransportErrorKind::custom)?;

debug!(bytes = body.len(), "retrieved response body. Use `trace` for full body");
trace!(body = %String::from_utf8_lossy(&body), "response body");

if status != reqwest::StatusCode::OK {
return Err(TransportErrorKind::custom_str(&format!(
"HTTP error {status} with body: {}",
String::from_utf8_lossy(&body)
)));
}

// Deser a Box<RawValue> from the body. If deser fails, return
// the body as a string in the error. The conversion to String
// is lossy and may not cover all the bytes in the body.
serde_json::from_slice(&body)
.map_err(|err| TransportError::deser_err(err, String::from_utf8_lossy(&body)))
}
.instrument(span),
)
}
}

Expand Down

0 comments on commit 5b79a5f

Please sign in to comment.