Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: tracing for http transports #681

Merged
merged 2 commits into from
May 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion crates/transport-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ serde_json = { workspace = true, optional = true }
tower = { workspace = true, optional = true }

reqwest = { workspace = true, features = ["json"], optional = true }
tracing = { workspace = true, optional = true }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
http-body-util = { workspace = true, optional = true }
Expand All @@ -28,14 +29,15 @@ hyper-util = { workspace = true, features = ["full"], optional = true }

[features]
default = ["reqwest", "reqwest-default-tls"]
reqwest = ["dep:reqwest", "dep:alloy-json-rpc", "dep:serde_json", "dep:tower"]
reqwest = ["dep:reqwest", "dep:alloy-json-rpc", "dep:serde_json", "dep:tower", "dep:tracing"]
hyper = [
"dep:hyper",
"dep:hyper-util",
"dep:http-body-util",
"dep:alloy-json-rpc",
"dep:serde_json",
"dep:tower",
"dep:tracing"
]
reqwest-default-tls = ["reqwest?/default-tls"]
reqwest-native-tls = ["reqwest?/native-tls"]
Expand Down
76 changes: 47 additions & 29 deletions crates/transport-http/src/hyper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use hyper::{
use hyper_util::client::legacy::{connect::Connect, Client};
use std::task;
use tower::Service;
use tracing::{debug, debug_span, trace, Instrument};

impl<C, B> Http<Client<C, Full<B>>>
where
Expand All @@ -18,40 +19,57 @@ where
/// Make a request.
fn request_hyper(&self, req: RequestPacket) -> TransportFut<'static> {
let this = self.clone();
Box::pin(async move {
let ser = req.serialize().map_err(TransportError::ser_err)?;
let span = debug_span!("HyperTransport", url = %self.url);
Box::pin(
async move {
debug!(count = req.len(), "sending request packet to server");
let ser = req.serialize().map_err(TransportError::ser_err)?;
// convert the Box<RawValue> into a hyper request<B>
let body = Full::from(Bytes::from(<Box<[u8]>>::from(<Box<str>>::from(ser))));
let req = hyper::Request::builder()
.method(hyper::Method::POST)
.uri(this.url.as_str())
.header(
header::CONTENT_TYPE,
header::HeaderValue::from_static("application/json"),
)
.body(body)
.expect("request parts are valid");

// convert the Box<RawValue> into a hyper request<B>
let body = Full::from(Bytes::from(<Box<[u8]>>::from(<Box<str>>::from(ser))));
let req = hyper::Request::builder()
.method(hyper::Method::POST)
.uri(this.url.as_str())
.header(header::CONTENT_TYPE, header::HeaderValue::from_static("application/json"))
.body(body)
.expect("request parts are valid");
let resp = this.client.request(req).await.map_err(TransportErrorKind::custom)?;
let status = resp.status();

let resp = this.client.request(req).await.map_err(TransportErrorKind::custom)?;
let status = resp.status();
debug!(%status, "received response from server");

// Unpack data from the response body. We do this regardless of the status code, as we
// want to return the error in the body if there is one.
let body =
resp.into_body().collect().await.map_err(TransportErrorKind::custom)?.to_bytes();
// Unpack data from the response body. We do this regardless of
// the status code, as we want to return the error in the body
// if there is one.
let body = resp
.into_body()
.collect()
.await
.map_err(TransportErrorKind::custom)?
.to_bytes();

if status != hyper::StatusCode::OK {
return Err(TransportErrorKind::custom_str(&format!(
"HTTP error {status} with body: {}",
String::from_utf8_lossy(&body)
)));
}
debug!(bytes = body.len(), "retrieved response body. Use `trace` for full body");
trace!(body = %String::from_utf8_lossy(&body), "response body");

if status != hyper::StatusCode::OK {
return Err(TransportErrorKind::custom_str(&format!(
"HTTP error {status} with body: {}",
String::from_utf8_lossy(&body)
)));
}

// Deser a Box<RawValue> from the body. If deser fails, return the
// body as a string in the error. If the body is not UTF8, this will
// fail and give the empty string in the error.
serde_json::from_slice(&body).map_err(|err| {
TransportError::deser_err(err, String::from_utf8_lossy(body.as_ref()))
})
})
// Deser a Box<RawValue> from the body. If deser fails, return
// the body as a string in the error. The conversion to String
// is lossy and may not cover all the bytes in the body.
serde_json::from_slice(&body).map_err(|err| {
TransportError::deser_err(err, String::from_utf8_lossy(body.as_ref()))
})
}
.instrument(span),
)
}
}

Expand Down
50 changes: 36 additions & 14 deletions crates/transport-http/src/reqwest.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::Http;
use alloy_json_rpc::{RequestPacket, ResponsePacket};
use alloy_transport::{TransportError, TransportErrorKind, TransportFut};
use reqwest::Response;
use std::task;
use tower::Service;
use tracing::{debug, debug_span, trace, Instrument};
use url::Url;

impl Http<reqwest::Client> {
Expand All @@ -15,21 +15,43 @@ impl Http<reqwest::Client> {
/// Make a request.
fn request_reqwest(&self, req: RequestPacket) -> TransportFut<'static> {
let this = self.clone();
Box::pin(async move {
let resp = this
.client
.post(this.url)
.json(&req)
.send()
.await
.and_then(Response::error_for_status)
.map_err(TransportErrorKind::custom)?;
let span: tracing::Span = debug_span!("ReqwestTransport", url = %self.url);
Box::pin(
async move {
let resp = this
.client
.post(this.url)
.json(&req)
.send()
.await
.map_err(TransportErrorKind::custom)?;
let status = resp.status();

let body = resp.bytes().await.map_err(TransportErrorKind::custom)?;
debug!(%status, "received response from server");

serde_json::from_slice(&body)
.map_err(|err| TransportError::deser_err(err, String::from_utf8_lossy(&body)))
})
// Unpack data from the response body. We do this regardless of
// the status code, as we want to return the error in the body
// if there is one.
let body = resp.bytes().await.map_err(TransportErrorKind::custom)?;

debug!(bytes = body.len(), "retrieved response body. Use `trace` for full body");
trace!(body = %String::from_utf8_lossy(&body), "response body");

if status != reqwest::StatusCode::OK {
return Err(TransportErrorKind::custom_str(&format!(
"HTTP error {status} with body: {}",
String::from_utf8_lossy(&body)
)));
}

// Deser a Box<RawValue> from the body. If deser fails, return
// the body as a string in the error. The conversion to String
// is lossy and may not cover all the bytes in the body.
serde_json::from_slice(&body)
.map_err(|err| TransportError::deser_err(err, String::from_utf8_lossy(&body)))
}
.instrument(span),
)
}
}

Expand Down
Loading