From 22c70529a66ecfb822944a91a444b640c37d47cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFs=20Postula?= Date: Wed, 24 Jul 2024 18:49:58 +0200 Subject: [PATCH] feat: allow sending non String payload with execute --- src/body.rs | 116 ++++++++++++++++++++++++++++++++ src/lib.rs | 39 ++++++----- src/service/middleware/retry.rs | 50 +++----------- 3 files changed, 145 insertions(+), 60 deletions(-) create mode 100644 src/body.rs diff --git a/src/body.rs b/src/body.rs new file mode 100644 index 00000000..3228d6e8 --- /dev/null +++ b/src/body.rs @@ -0,0 +1,116 @@ +use http_body_util::BodyExt; + +use bytes::Bytes; +use http_body::Frame; +use snafu::{Backtrace, GenerateImplicitData}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +type BoxBody = http_body_util::combinators::BoxBody; +type BoxError = Box; + +fn boxed(body: B) -> BoxBody +where + B: http_body::Body + Send + Sync + 'static, + B::Error: Into, +{ + try_downcast(body).unwrap_or_else(|body| { + body.map_err(|e| crate::Error::Other { + source: e.into(), + backtrace: Backtrace::generate(), + }) + .boxed() + }) +} + +fn try_downcast(k: K) -> Result +where + T: 'static, + K: Send + 'static, +{ + let mut k = Some(k); + if let Some(k) = ::downcast_mut::>(&mut k) { + Ok(k.take().unwrap()) + } else { + Err(k.unwrap()) + } +} + +// Define octocrab Body +#[derive(Debug)] +pub struct OctoBody(BoxBody); + +impl OctoBody { + /// Create a new `Body` that wraps another [`http_body::Body`]. + pub fn new(body: B) -> Self + where + B: http_body::Body + Send + Sync + 'static, + B::Error: Into, + { + try_downcast(body).unwrap_or_else(|body| Self(boxed(body))) + } + /// Create an empty body. + pub fn empty() -> Self { + Self::new(http_body_util::Empty::new()) + } +} + +impl Default for OctoBody { + fn default() -> Self { + Self::empty() + } +} + +// Implement standard Bodiesque casting +impl From<()> for OctoBody { + fn from(_: ()) -> Self { + Self::empty() + } +} + +impl From for OctoBody { + fn from(buf: String) -> Self { + Self::new(http_body_util::Full::from(buf)) + } +} + +impl From> for OctoBody { + fn from(buf: Vec) -> Self { + Self::new(http_body_util::Full::from(buf)) + } +} + +impl From for OctoBody { + fn from(buf: Bytes) -> Self { + Self::new(http_body_util::Full::from(buf)) + } +} + +impl From<&'static str> for OctoBody { + fn from(buf: &'static str) -> Self { + Self::new(http_body_util::Full::from(buf)) + } +} + +impl http_body::Body for OctoBody { + type Data = Bytes; + type Error = crate::Error; + + #[inline] + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + Pin::new(&mut self.0).poll_frame(cx) + } + + #[inline] + fn size_hint(&self) -> http_body::SizeHint { + self.0.size_hint() + } + + #[inline] + fn is_end_stream(&self) -> bool { + self.0.is_end_stream() + } +} diff --git a/src/lib.rs b/src/lib.rs index 8051cc17..820c6de1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -181,6 +181,7 @@ #![cfg_attr(test, recursion_limit = "512")] mod api; +mod body; mod error; mod from_response; mod page; @@ -191,6 +192,7 @@ pub mod models; pub mod params; pub mod service; +use body::OctoBody; use chrono::{DateTime, Utc}; use http::{HeaderMap, HeaderValue, Method, Uri}; use http_body_util::combinators::BoxBody; @@ -422,7 +424,7 @@ impl OctocrabBuilder { impl OctocrabBuilder where - Svc: Service, Response = Response> + Send + 'static, + Svc: Service, Response = Response> + Send + 'static, Svc::Future: Send + 'static, Svc::Error: Into, B: http_body::Body + Send + 'static, @@ -467,7 +469,7 @@ impl OctocrabBuilder { impl OctocrabBuilder where - Svc: Service, Response = Response> + Send + 'static, + Svc: Service, Response = Response> + Send + 'static, Svc::Future: Send + 'static, Svc::Error: Into, B: http_body::Body + Send + Sync + 'static, @@ -584,8 +586,8 @@ impl OctocrabBuilder #[cfg(feature = "retry")] pub fn set_connector_retry_service( &self, - connector: hyper_util::client::legacy::Client, - ) -> Retry> { + connector: hyper_util::client::legacy::Client, + ) -> Retry> { let retry_layer = RetryLayer::new(self.config.retry_config.clone()); retry_layer.layer(connector) @@ -610,7 +612,7 @@ impl OctocrabBuilder /// Build a [`Client`] instance with the current [`Service`] stack. #[cfg(feature = "default-client")] pub fn build(self) -> Result { - let client: hyper_util::client::legacy::Client<_, String> = { + let client: hyper_util::client::legacy::Client<_, OctoBody> = { #[cfg(all(not(feature = "opentls"), not(feature = "rustls")))] let mut connector = hyper::client::conn::http1::HttpConnector::new(); @@ -646,7 +648,7 @@ impl OctocrabBuilder #[cfg(feature = "tracing")] let client = TraceLayer::new_for_http() - .make_span_with(|req: &Request| { + .make_span_with(|req: &Request| { tracing::debug_span!( "HTTP", http.method = %req.method(), @@ -657,7 +659,7 @@ impl OctocrabBuilder otel.status_code = tracing::field::Empty, ) }) - .on_request(|_req: &Request, _span: &Span| { + .on_request(|_req: &Request, _span: &Span| { tracing::debug!("requesting"); }) .on_response( @@ -913,8 +915,8 @@ pub enum AuthState { } pub type OctocrabService = Buffer< - BoxService, http::Response>, BoxError>, - http::Request, + BoxService, http::Response>, BoxError>, + http::Request, >; /// The GitHub API client. @@ -954,7 +956,7 @@ impl Octocrab { /// Creates a new `Octocrab`. fn new(service: S, auth_state: AuthState) -> Self where - S: Service, Response = Response>> + S: Service, Response = Response>> + Send + 'static, S::Future: Send + 'static, @@ -1363,7 +1365,7 @@ impl Octocrab { &self, mut builder: Builder, body: Option<&B>, - ) -> Result> { + ) -> Result> { // Since Octocrab doesn't require streamable bodies(aka, file upload) because it is serde::Serialize), // we can just use String body, since it is both http_body::Body(required by Hyper::Client), and Clone(required by BoxService). @@ -1372,14 +1374,14 @@ impl Octocrab { if let Some(body) = body { builder = builder.header(http::header::CONTENT_TYPE, "application/json"); - let request = builder - .body(serde_json::to_string(body).context(SerdeSnafu)?) - .context(HttpSnafu)?; + let serialized = serde_json::to_string(body).context(SerdeSnafu)?; + let body: OctoBody = serialized.into(); + let request = builder.body(body).context(HttpSnafu)?; Ok(request) } else { Ok(builder .header(http::header::CONTENT_LENGTH, "0") - .body(String::new()) + .body(OctoBody::empty()) .context(HttpSnafu)?) } } @@ -1442,7 +1444,7 @@ impl Octocrab { .method(http::Method::POST) .uri(uri); let response = self - .send(request.body("{}".to_string()).context(HttpSnafu)?) + .send(request.body("{}".into()).context(HttpSnafu)?) .await?; let _status = response.status(); @@ -1470,7 +1472,7 @@ impl Octocrab { /// Send the given request to the underlying service pub async fn send( &self, - request: Request, + request: Request, ) -> Result>> { let mut svc = self.client.clone(); let response: Response> = svc @@ -1496,9 +1498,10 @@ impl Octocrab { /// Execute the given `request` using octocrab's Client. pub async fn execute( &self, - request: http::Request, + request: http::Request>, ) -> Result>> { let (mut parts, body) = request.into_parts(); + let body: OctoBody = body.into(); // Saved request that we can retry later if necessary let auth_header: Option = match self.auth_state { AuthState::None => None, diff --git a/src/service/middleware/retry.rs b/src/service/middleware/retry.rs index 1dfce9f6..0eb6202b 100644 --- a/src/service/middleware/retry.rs +++ b/src/service/middleware/retry.rs @@ -1,67 +1,33 @@ -use futures_util::future; use http::{Request, Response}; use hyper_util::client::legacy::Error; use tower::retry::Policy; +use crate::body::OctoBody; + #[derive(Clone)] pub enum RetryConfig { None, Simple(usize), } -impl Policy, Response, Error> for RetryConfig { +impl Policy, Response, Error> for RetryConfig { type Future = futures_util::future::Ready; fn retry( &self, - _req: &Request, - result: Result<&Response, &Error>, + _req: &Request, + _result: Result<&Response, &Error>, ) -> Option { match self { RetryConfig::None => None, - RetryConfig::Simple(count) => match result { - Ok(response) => { - if response.status().is_server_error() || response.status() == 429 { - if *count > 0 { - Some(future::ready(RetryConfig::Simple(count - 1))) - } else { - None - } - } else { - None - } - } - Err(_) => { - if *count > 0 { - Some(future::ready(RetryConfig::Simple(count - 1))) - } else { - None - } - } - }, + RetryConfig::Simple(_count) => None, } } - fn clone_request(&self, req: &Request) -> Option> { + fn clone_request(&self, _req: &Request) -> Option> { match self { RetryConfig::None => None, - _ => { - // `Request` can't be cloned - let mut new_req = Request::builder() - .uri(req.uri()) - .method(req.method()) - .version(req.version()); - for (name, value) in req.headers() { - new_req = new_req.header(name, value); - } - - let body = req.body().clone(); - let new_req = new_req.body(body).expect( - "This should never panic, as we are cloning a components from existing request", - ); - - Some(new_req) - } + _ => None, } } }