From 9d019c29602c39cc6752ebaf81b2a4d77834e0e8 Mon Sep 17 00:00:00 2001 From: Liam Perlaki Date: Wed, 5 Apr 2023 12:28:03 +0200 Subject: [PATCH 1/4] experimental keep CompleteMultipartUpload alive --- crates/s3s/Cargo.toml | 3 ++ crates/s3s/src/http/ser.rs | 24 ++++++++++++++ crates/s3s/src/ops/generated.rs | 56 +++++++++++++++++++++++++++++++++ crates/s3s/src/ops/mod.rs | 12 ++++++- 4 files changed, 94 insertions(+), 1 deletion(-) diff --git a/crates/s3s/Cargo.toml b/crates/s3s/Cargo.toml index 8b85e468..9ddb7582 100644 --- a/crates/s3s/Cargo.toml +++ b/crates/s3s/Cargo.toml @@ -46,5 +46,8 @@ transform-stream = "0.3.0" urlencoding = "2.1.2" zeroize = "1.6.0" +tokio = { version = "1.27.0", features = ["time"] } + + [dev-dependencies] tokio = { version = "1.27.0", features = ["full"] } diff --git a/crates/s3s/src/http/ser.rs b/crates/s3s/src/http/ser.rs index 44a81dae..39b2d979 100644 --- a/crates/s3s/src/http/ser.rs +++ b/crates/s3s/src/http/ser.rs @@ -104,6 +104,30 @@ pub fn set_xml_body(res: &mut Response, val: &T) -> S3Result Ok(()) } +pub async fn set_xml_sending_body(res: &mut Response) -> S3Result { + res.headers.insert(hyper::header::CONTENT_TYPE, APPLICATION_XML); + let (mut sender, body) = hyper::Body::channel(); + res.body = body.into(); + let mut buf = Vec::with_capacity(256); + { + let mut ser = xml::Serializer::new(&mut buf); + ser.decl().map_err(S3Error::internal_error)?; + } + + sender.send_data(buf.into()).await.map_err(S3Error::internal_error)?; + Ok(sender) +} + +pub async fn send_xml_body(res: &mut hyper::body::Sender, val: &T) -> S3Result { + let mut buf = Vec::with_capacity(256); + { + let mut ser = xml::Serializer::new(&mut buf); + val.serialize(&mut ser).map_err(S3Error::internal_error)?; + } + res.send_data(buf.into()).await.map_err(S3Error::internal_error)?; + Ok(()) +} + pub fn set_stream_body(res: &mut Response, stream: StreamingBlob) { res.body = Body::from(stream); } diff --git a/crates/s3s/src/ops/generated.rs b/crates/s3s/src/ops/generated.rs index e6d048cd..37dbf02e 100644 --- a/crates/s3s/src/ops/generated.rs +++ b/crates/s3s/src/ops/generated.rs @@ -3,6 +3,9 @@ #![allow(clippy::declare_interior_mutable_const)] #![allow(clippy::borrow_interior_mutable_const)] +use bytes::Bytes; +use futures::FutureExt; + use crate::dto::*; use crate::error::*; use crate::header::*; @@ -405,6 +408,59 @@ impl CompleteMultipartUpload { http::add_opt_header(&mut res, X_AMZ_VERSION_ID, x.version_id)?; Ok(res) } + + pub async fn call_shared(&self, s3: std::sync::Arc, req: &mut http::Request) -> S3Result { + let input = Self::deserialize_http(req)?; + let req = super::build_s3_request(input, req); + let fut = async move { s3.complete_multipart_upload(req).await }.fuse(); + let mut fut = Box::pin(fut); + futures::select! { + result = &mut fut => { + let res = match result { + Ok(output) => Self::serialize_http(output)?, + Err(err) => super::serialize_error(err)?, + }; + return Ok(res) + } + _ = tokio::time::sleep(std::time::Duration::from_millis(100)).fuse() => { + () + } + } + + let mut res = http::Response::with_status(http::StatusCode::OK); + let mut sender = http::set_xml_sending_body(&mut res).await?; + + tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_millis(2)); + loop { + futures::select! { + _ = interval.tick().fuse() => { + sender.send_data(Bytes::from_static(b" ")).await.unwrap(); + } + res = &mut fut => { + match res { + Ok(output) => { + http::send_xml_body(&mut sender, &output).await.unwrap(); + let mut tmp_res = http::Response::with_status(http::StatusCode::OK); + http::add_header(&mut tmp_res, X_AMZ_SERVER_SIDE_ENCRYPTION_BUCKET_KEY_ENABLED, output.bucket_key_enabled).unwrap(); + http::add_opt_header(&mut tmp_res, X_AMZ_EXPIRATION, output.expiration).unwrap(); + http::add_opt_header(&mut tmp_res, X_AMZ_REQUEST_CHARGED, output.request_charged).unwrap(); + http::add_opt_header(&mut tmp_res, X_AMZ_SERVER_SIDE_ENCRYPTION_AWS_KMS_KEY_ID, output.ssekms_key_id).unwrap(); + http::add_opt_header(&mut tmp_res, X_AMZ_SERVER_SIDE_ENCRYPTION, output.server_side_encryption).unwrap(); + http::add_opt_header(&mut tmp_res, X_AMZ_VERSION_ID, output.version_id).unwrap(); + + sender.send_trailers(tmp_res.headers).await.unwrap(); + }, + Err(err) => http::send_xml_body(&mut sender, &err).await.unwrap(), + }; + return + } + } + } + }); + + Ok(res) + } } #[async_trait::async_trait] diff --git a/crates/s3s/src/ops/mod.rs b/crates/s3s/src/ops/mod.rs index 7fd17bc6..97a26b49 100644 --- a/crates/s3s/src/ops/mod.rs +++ b/crates/s3s/src/ops/mod.rs @@ -211,7 +211,17 @@ pub async fn call( } }; - match op.call(s3, req).await { + if op.name() == "CompleteMultipartUpload" { + return match CompleteMultipartUpload.call_shared(s3.clone(), req).await { + Ok(res) => Ok(res), + Err(err) => { + debug!(op = %op.name(), ?err, "op returns error"); + serialize_error(err) + } + }; + } + + match op.call(&**s3, req).await { Ok(res) => Ok(res), Err(err) => { debug!(op = %op.name(), ?err, "op returns error"); From acced5662995e435559722a0c0a24e7b52291f92 Mon Sep 17 00:00:00 2001 From: Liam Perlaki Date: Tue, 11 Apr 2023 11:13:06 +0200 Subject: [PATCH 2/4] use custom KeepAliveBody for CompleteMultipartUpload --- crates/s3s-aws/src/connector.rs | 3 +- crates/s3s/Cargo.toml | 1 + crates/s3s/src/http/body.rs | 45 ++++++++++------ crates/s3s/src/http/ser.rs | 27 ++++------ crates/s3s/src/keep_alive_body.rs | 84 ++++++++++++++++++++++++++++++ crates/s3s/src/lib.rs | 2 + crates/s3s/src/ops/generated.rs | 86 ++++++++++++++----------------- 7 files changed, 168 insertions(+), 80 deletions(-) create mode 100644 crates/s3s/src/keep_alive_body.rs diff --git a/crates/s3s-aws/src/connector.rs b/crates/s3s-aws/src/connector.rs index 5b3de91d..f803e17c 100644 --- a/crates/s3s-aws/src/connector.rs +++ b/crates/s3s-aws/src/connector.rs @@ -1,3 +1,4 @@ +use hyper::body::HttpBody; use s3s::service::SharedS3Service; use s3s::{S3Error, S3Result}; @@ -59,7 +60,7 @@ fn convert_input(mut req: Request) -> Request { fn convert_output(result: S3Result>) -> Result, ConnectorError> { match result { - Ok(res) => Ok(res.map(|s3s_body| SdkBody::from(hyper::Body::from(s3s_body)))), + Ok(res) => Ok(res.map(|s3s_body| SdkBody::from_dyn(s3s_body.boxed()))), Err(e) => Err(on_err(e)), } } diff --git a/crates/s3s/Cargo.toml b/crates/s3s/Cargo.toml index 9ddb7582..86e68e8c 100644 --- a/crates/s3s/Cargo.toml +++ b/crates/s3s/Cargo.toml @@ -46,6 +46,7 @@ transform-stream = "0.3.0" urlencoding = "2.1.2" zeroize = "1.6.0" +sync_wrapper = { version = "0.1.2", default-features = false } tokio = { version = "1.27.0", features = ["time"] } diff --git a/crates/s3s/src/http/body.rs b/crates/s3s/src/http/body.rs index 68248d73..f4603699 100644 --- a/crates/s3s/src/http/body.rs +++ b/crates/s3s/src/http/body.rs @@ -36,7 +36,11 @@ pin_project_lite::pin_project! { DynStream { #[pin] inner: DynByteStream - } + }, + HttpBody { + #[pin] + inner: http_body::combinators::BoxBody, + }, } } @@ -63,6 +67,11 @@ impl Body { kind: Kind::DynStream { inner: stream }, } } + pub fn http_body(body: http_body::combinators::BoxBody) -> Self { + Self { + kind: Kind::HttpBody { inner: body }, + } + } } impl From for Body { @@ -123,11 +132,22 @@ impl http_body::Body for Body { Stream::poll_next(inner, cx) // } + KindProj::HttpBody { inner } => { + http_body::Body::poll_data(inner, cx) + // + } } } fn poll_trailers(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll, Self::Error>> { - Poll::Ready(Ok(None)) // TODO: How to impl poll_trailers? + let mut this = self.project(); + match this.kind.as_mut().project() { + KindProj::Empty => Poll::Ready(Ok(None)), + KindProj::Once { .. } => Poll::Ready(Ok(None)), + KindProj::Hyper { inner } => http_body::Body::poll_trailers(inner, _cx).map_err(|e| Box::new(e) as StdError), + KindProj::DynStream { .. } => Poll::Ready(Ok(None)), + KindProj::HttpBody { inner } => http_body::Body::poll_trailers(inner, _cx), + } } fn is_end_stream(&self) -> bool { @@ -136,6 +156,7 @@ impl http_body::Body for Body { Kind::Once { inner } => inner.is_empty(), Kind::Hyper { inner } => http_body::Body::is_end_stream(inner), Kind::DynStream { inner } => inner.remaining_length().exact() == Some(0), + Kind::HttpBody { inner } => http_body::Body::is_end_stream(inner), } } @@ -145,6 +166,7 @@ impl http_body::Body for Body { Kind::Once { inner } => http_body::SizeHint::with_exact(inner.len() as u64), Kind::Hyper { inner } => http_body::Body::size_hint(inner), Kind::DynStream { inner } => inner.remaining_length().into(), + Kind::HttpBody { inner } => http_body::Body::size_hint(inner), } } } @@ -164,6 +186,7 @@ impl ByteStream for Body { Kind::Once { inner } => RemainingLength::new_exact(inner.len()), Kind::Hyper { inner } => http_body::Body::size_hint(inner).into(), Kind::DynStream { inner } => inner.remaining_length(), + Kind::HttpBody { inner } => http_body::Body::size_hint(inner).into(), } } } @@ -183,6 +206,9 @@ impl fmt::Debug for Body { d.field("dyn_stream", &"{..}"); d.field("remaining_length", &inner.remaining_length()); } + Kind::HttpBody { inner } => { + d.field("http_body", inner); + } } d.finish() } @@ -207,19 +233,4 @@ impl Body { _ => None, } } - - fn into_hyper(self) -> hyper::Body { - match self.kind { - Kind::Empty => hyper::Body::empty(), - Kind::Once { inner } => inner.into(), - Kind::Hyper { inner } => inner, - Kind::DynStream { inner } => hyper::Body::wrap_stream(inner), - } - } -} - -impl From for hyper::Body { - fn from(value: Body) -> Self { - value.into_hyper() - } } diff --git a/crates/s3s/src/http/ser.rs b/crates/s3s/src/http/ser.rs index 39b2d979..1b7103c7 100644 --- a/crates/s3s/src/http/ser.rs +++ b/crates/s3s/src/http/ser.rs @@ -5,11 +5,15 @@ use crate::dto::SelectObjectContentEventStream; use crate::dto::{Metadata, StreamingBlob, Timestamp, TimestampFormat}; use crate::error::{S3Error, S3Result}; use crate::http::{HeaderName, HeaderValue}; +use crate::keep_alive_body::KeepAliveBody; use crate::{utils, xml}; use std::convert::Infallible; use std::fmt::Write as _; +use futures::Future; + +use http_body::Body as HttpBody; use hyper::header::{IntoHeaderName, InvalidHeaderValue}; pub fn add_header(res: &mut Response, name: N, value: V) -> S3Result @@ -104,27 +108,18 @@ pub fn set_xml_body(res: &mut Response, val: &T) -> S3Result Ok(()) } -pub async fn set_xml_sending_body(res: &mut Response) -> S3Result { - res.headers.insert(hyper::header::CONTENT_TYPE, APPLICATION_XML); - let (mut sender, body) = hyper::Body::channel(); - res.body = body.into(); +pub fn set_keep_alive_xml_body( + res: &mut Response, + fut: impl Future + Send + Sync + 'static, + duration: std::time::Duration, +) -> S3Result { let mut buf = Vec::with_capacity(256); { let mut ser = xml::Serializer::new(&mut buf); ser.decl().map_err(S3Error::internal_error)?; } - - sender.send_data(buf.into()).await.map_err(S3Error::internal_error)?; - Ok(sender) -} - -pub async fn send_xml_body(res: &mut hyper::body::Sender, val: &T) -> S3Result { - let mut buf = Vec::with_capacity(256); - { - let mut ser = xml::Serializer::new(&mut buf); - val.serialize(&mut ser).map_err(S3Error::internal_error)?; - } - res.send_data(buf.into()).await.map_err(S3Error::internal_error)?; + res.body = Body::http_body(KeepAliveBody::with_initial_body(fut, buf.into(), duration).boxed()); + res.headers.insert(hyper::header::CONTENT_TYPE, APPLICATION_XML); Ok(()) } diff --git a/crates/s3s/src/keep_alive_body.rs b/crates/s3s/src/keep_alive_body.rs new file mode 100644 index 00000000..055aa2ac --- /dev/null +++ b/crates/s3s/src/keep_alive_body.rs @@ -0,0 +1,84 @@ +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +use bytes::Bytes; +use http_body::Body; +use tokio::time::Interval; + +use crate::{http::Response, StdError}; + +// sends whitespace while the future is pending +pin_project_lite::pin_project! { + + pub struct KeepAliveBody { + #[pin] + inner: F, + initial_body: Option, + response: Option, + interval: Interval, + } +} +impl KeepAliveBody { + pub fn new(inner: F, interval: Duration) -> Self { + Self { + inner, + initial_body: None, + response: None, + interval: tokio::time::interval(interval), + } + } + + pub fn with_initial_body(inner: F, initial_body: Bytes, interval: Duration) -> Self { + Self { + inner, + initial_body: Some(initial_body), + response: None, + interval: tokio::time::interval(interval), + } + } +} + +impl Body for KeepAliveBody +where + F: Future, +{ + type Data = Bytes; + + type Error = StdError; + + fn poll_data(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { + let mut this = self.project(); + if let Some(initial_body) = this.initial_body.take() { + cx.waker().wake_by_ref(); + return Poll::Ready(Some(Ok(initial_body))); + } + loop { + if let Some(response) = this.response { + return Pin::new(&mut response.body).poll_data(cx); + } + match this.inner.as_mut().poll(cx) { + Poll::Ready(response) => { + *this.response = Some(response); + } + Poll::Pending => match this.interval.poll_tick(cx) { + Poll::Ready(_) => return Poll::Ready(Some(Ok(Bytes::from_static(b" ")))), + Poll::Pending => return Poll::Pending, + }, + } + } + } + + fn poll_trailers(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll, Self::Error>> { + let this = self.project(); + + if let Some(response) = this.response { + return Pin::new(&mut response.body).poll_trailers(cx); + } else { + return Poll::Ready(Ok(None)); + } + } +} diff --git a/crates/s3s/src/lib.rs b/crates/s3s/src/lib.rs index 2407021e..93ee1d6d 100644 --- a/crates/s3s/src/lib.rs +++ b/crates/s3s/src/lib.rs @@ -35,6 +35,8 @@ pub mod path; pub mod service; pub mod stream; +pub mod keep_alive_body; + pub use self::error::*; pub use self::http::Body; pub use self::request::S3Request; diff --git a/crates/s3s/src/ops/generated.rs b/crates/s3s/src/ops/generated.rs index 37dbf02e..1280f322 100644 --- a/crates/s3s/src/ops/generated.rs +++ b/crates/s3s/src/ops/generated.rs @@ -3,9 +3,6 @@ #![allow(clippy::declare_interior_mutable_const)] #![allow(clippy::borrow_interior_mutable_const)] -use bytes::Bytes; -use futures::FutureExt; - use crate::dto::*; use crate::error::*; use crate::header::*; @@ -412,52 +409,49 @@ impl CompleteMultipartUpload { pub async fn call_shared(&self, s3: std::sync::Arc, req: &mut http::Request) -> S3Result { let input = Self::deserialize_http(req)?; let req = super::build_s3_request(input, req); - let fut = async move { s3.complete_multipart_upload(req).await }.fuse(); - let mut fut = Box::pin(fut); - futures::select! { - result = &mut fut => { - let res = match result { - Ok(output) => Self::serialize_http(output)?, - Err(err) => super::serialize_error(err)?, - }; - return Ok(res) - } - _ = tokio::time::sleep(std::time::Duration::from_millis(100)).fuse() => { - () - } - } + let fut = async move { + let res = s3.complete_multipart_upload(req).await; + match res { + Ok(output) => { + let mut res = http::Response::with_status(http::StatusCode::OK); + + let mut buf = Vec::with_capacity(256); + + let mut ser = crate::xml::Serializer::new(&mut buf); + crate::xml::Serialize::serialize(&output, &mut ser) + .map_err(S3Error::internal_error) + .unwrap(); + + res.body = crate::Body::from(buf); + + http::add_header(&mut res, X_AMZ_SERVER_SIDE_ENCRYPTION_BUCKET_KEY_ENABLED, output.bucket_key_enabled) + .unwrap(); + http::add_opt_header(&mut res, X_AMZ_EXPIRATION, output.expiration).unwrap(); + http::add_opt_header(&mut res, X_AMZ_REQUEST_CHARGED, output.request_charged).unwrap(); + http::add_opt_header(&mut res, X_AMZ_SERVER_SIDE_ENCRYPTION_AWS_KMS_KEY_ID, output.ssekms_key_id).unwrap(); + http::add_opt_header(&mut res, X_AMZ_SERVER_SIDE_ENCRYPTION, output.server_side_encryption).unwrap(); + http::add_opt_header(&mut res, X_AMZ_VERSION_ID, output.version_id).unwrap(); + + res + } + Err(err) => { + let mut res = http::Response::with_status(http::StatusCode::OK); - let mut res = http::Response::with_status(http::StatusCode::OK); - let mut sender = http::set_xml_sending_body(&mut res).await?; - - tokio::spawn(async move { - let mut interval = tokio::time::interval(std::time::Duration::from_millis(2)); - loop { - futures::select! { - _ = interval.tick().fuse() => { - sender.send_data(Bytes::from_static(b" ")).await.unwrap(); - } - res = &mut fut => { - match res { - Ok(output) => { - http::send_xml_body(&mut sender, &output).await.unwrap(); - let mut tmp_res = http::Response::with_status(http::StatusCode::OK); - http::add_header(&mut tmp_res, X_AMZ_SERVER_SIDE_ENCRYPTION_BUCKET_KEY_ENABLED, output.bucket_key_enabled).unwrap(); - http::add_opt_header(&mut tmp_res, X_AMZ_EXPIRATION, output.expiration).unwrap(); - http::add_opt_header(&mut tmp_res, X_AMZ_REQUEST_CHARGED, output.request_charged).unwrap(); - http::add_opt_header(&mut tmp_res, X_AMZ_SERVER_SIDE_ENCRYPTION_AWS_KMS_KEY_ID, output.ssekms_key_id).unwrap(); - http::add_opt_header(&mut tmp_res, X_AMZ_SERVER_SIDE_ENCRYPTION, output.server_side_encryption).unwrap(); - http::add_opt_header(&mut tmp_res, X_AMZ_VERSION_ID, output.version_id).unwrap(); - - sender.send_trailers(tmp_res.headers).await.unwrap(); - }, - Err(err) => http::send_xml_body(&mut sender, &err).await.unwrap(), - }; - return - } + let mut buf = Vec::with_capacity(256); + + let mut ser = crate::xml::Serializer::new(&mut buf); + crate::xml::Serialize::serialize(&err, &mut ser) + .map_err(S3Error::internal_error) + .unwrap(); + + res.body = crate::Body::from(buf); + res } } - }); + }; + + let mut res = http::Response::with_status(http::StatusCode::OK); + http::set_keep_alive_xml_body(&mut res, sync_wrapper::SyncFuture::new(fut), std::time::Duration::from_millis(100))?; Ok(res) } From b5c48e25a735a470dfc1818b3551afc25be7b966 Mon Sep 17 00:00:00 2001 From: Liam Perlaki Date: Tue, 11 Apr 2023 13:46:06 +0200 Subject: [PATCH 3/4] codegen CompleteMultipartUpload modifications --- codegen/src/ops.rs | 29 ++++++++++++--- crates/s3s/src/http/ser.rs | 24 +++++++----- crates/s3s/src/ops/generated.rs | 65 +++++---------------------------- crates/s3s/src/ops/mod.rs | 19 ++++------ 4 files changed, 55 insertions(+), 82 deletions(-) diff --git a/codegen/src/ops.rs b/codegen/src/ops.rs index 2e072cd7..16b4d5e4 100644 --- a/codegen/src/ops.rs +++ b/codegen/src/ops.rs @@ -251,7 +251,11 @@ fn codegen_op_http_ser(op: &Operation, rust_types: &RustTypes, g: &mut Codegen) } if is_xml_output(ty) { - g.ln("http::set_xml_body(&mut res, &x)?;"); + if op.name == "CompleteMultipartUpload" { + g.ln("http::set_xml_body_no_decl(&mut res, &x)?;"); + } else { + g.ln("http::set_xml_body(&mut res, &x)?;"); + } } else if let Some(field) = ty.fields.iter().find(|x| x.position == "payload") { match field.type_.as_str() { "Policy" => { @@ -603,13 +607,26 @@ fn codegen_op_http_call(op: &Operation, g: &mut Codegen) { g.ln("let input = Self::deserialize_http(req)?;"); g.ln("let req = super::build_s3_request(input, req);"); - g.ln(f!("let result = s3.{method}(req).await;")); + if op.name == "CompleteMultipartUpload" { + g.ln("let s3 = s3.clone();"); + g.ln("let fut = async move {"); + g.ln(f!("let result = s3.{method}(req).await;")); + g.ln("match result {"); + g.ln("Ok(output) => Self::serialize_http(output).unwrap(),"); + g.ln("Err(err) => super::serialize_error_no_decl(err).unwrap(),"); + g.ln("}"); + g.ln("};"); + g.ln("let mut res = http::Response::with_status(http::StatusCode::OK);"); + g.ln("http::set_keep_alive_xml_body(&mut res, sync_wrapper::SyncFuture::new(fut), std::time::Duration::from_millis(100))?;"); + } else { + g.ln(f!("let result = s3.{method}(req).await;")); - g.ln("let res = match result {"); - g.ln("Ok(output) => Self::serialize_http(output)?,"); + g.ln("let res = match result {"); + g.ln("Ok(output) => Self::serialize_http(output)?,"); - g.ln("Err(err) => super::serialize_error(err)?,"); - g.ln("};"); + g.ln("Err(err) => super::serialize_error(err)?,"); + g.ln("};"); + } g.ln("Ok(res)"); diff --git a/crates/s3s/src/http/ser.rs b/crates/s3s/src/http/ser.rs index 1b7103c7..766e0549 100644 --- a/crates/s3s/src/http/ser.rs +++ b/crates/s3s/src/http/ser.rs @@ -11,9 +11,6 @@ use crate::{utils, xml}; use std::convert::Infallible; use std::fmt::Write as _; -use futures::Future; - -use http_body::Body as HttpBody; use hyper::header::{IntoHeaderName, InvalidHeaderValue}; pub fn add_header(res: &mut Response, name: N, value: V) -> S3Result @@ -110,19 +107,26 @@ pub fn set_xml_body(res: &mut Response, val: &T) -> S3Result pub fn set_keep_alive_xml_body( res: &mut Response, - fut: impl Future + Send + Sync + 'static, + fut: impl std::future::Future + Send + Sync + 'static, duration: std::time::Duration, ) -> S3Result { - let mut buf = Vec::with_capacity(256); - { - let mut ser = xml::Serializer::new(&mut buf); - ser.decl().map_err(S3Error::internal_error)?; - } - res.body = Body::http_body(KeepAliveBody::with_initial_body(fut, buf.into(), duration).boxed()); + let mut buf = Vec::with_capacity(40); + let mut ser = xml::Serializer::new(&mut buf); + ser.decl().map_err(S3Error::internal_error)?; + + res.body = Body::http_body(http_body::Body::boxed(KeepAliveBody::with_initial_body(fut, buf.into(), duration))); res.headers.insert(hyper::header::CONTENT_TYPE, APPLICATION_XML); Ok(()) } +pub fn set_xml_body_no_decl(res: &mut Response, val: &T) -> S3Result { + let mut buf = Vec::with_capacity(256); + let mut ser = xml::Serializer::new(&mut buf); + val.serialize(&mut ser).map_err(S3Error::internal_error)?; + res.body = Body::from(buf); + Ok(()) +} + pub fn set_stream_body(res: &mut Response, stream: StreamingBlob) { res.body = Body::from(stream); } diff --git a/crates/s3s/src/ops/generated.rs b/crates/s3s/src/ops/generated.rs index 1280f322..0bb81bf1 100644 --- a/crates/s3s/src/ops/generated.rs +++ b/crates/s3s/src/ops/generated.rs @@ -396,7 +396,7 @@ impl CompleteMultipartUpload { pub fn serialize_http(x: CompleteMultipartUploadOutput) -> S3Result { let mut res = http::Response::with_status(http::StatusCode::OK); - http::set_xml_body(&mut res, &x)?; + http::set_xml_body_no_decl(&mut res, &x)?; http::add_header(&mut res, X_AMZ_SERVER_SIDE_ENCRYPTION_BUCKET_KEY_ENABLED, x.bucket_key_enabled)?; http::add_opt_header(&mut res, X_AMZ_EXPIRATION, x.expiration)?; http::add_opt_header(&mut res, X_AMZ_REQUEST_CHARGED, x.request_charged)?; @@ -405,56 +405,6 @@ impl CompleteMultipartUpload { http::add_opt_header(&mut res, X_AMZ_VERSION_ID, x.version_id)?; Ok(res) } - - pub async fn call_shared(&self, s3: std::sync::Arc, req: &mut http::Request) -> S3Result { - let input = Self::deserialize_http(req)?; - let req = super::build_s3_request(input, req); - let fut = async move { - let res = s3.complete_multipart_upload(req).await; - match res { - Ok(output) => { - let mut res = http::Response::with_status(http::StatusCode::OK); - - let mut buf = Vec::with_capacity(256); - - let mut ser = crate::xml::Serializer::new(&mut buf); - crate::xml::Serialize::serialize(&output, &mut ser) - .map_err(S3Error::internal_error) - .unwrap(); - - res.body = crate::Body::from(buf); - - http::add_header(&mut res, X_AMZ_SERVER_SIDE_ENCRYPTION_BUCKET_KEY_ENABLED, output.bucket_key_enabled) - .unwrap(); - http::add_opt_header(&mut res, X_AMZ_EXPIRATION, output.expiration).unwrap(); - http::add_opt_header(&mut res, X_AMZ_REQUEST_CHARGED, output.request_charged).unwrap(); - http::add_opt_header(&mut res, X_AMZ_SERVER_SIDE_ENCRYPTION_AWS_KMS_KEY_ID, output.ssekms_key_id).unwrap(); - http::add_opt_header(&mut res, X_AMZ_SERVER_SIDE_ENCRYPTION, output.server_side_encryption).unwrap(); - http::add_opt_header(&mut res, X_AMZ_VERSION_ID, output.version_id).unwrap(); - - res - } - Err(err) => { - let mut res = http::Response::with_status(http::StatusCode::OK); - - let mut buf = Vec::with_capacity(256); - - let mut ser = crate::xml::Serializer::new(&mut buf); - crate::xml::Serialize::serialize(&err, &mut ser) - .map_err(S3Error::internal_error) - .unwrap(); - - res.body = crate::Body::from(buf); - res - } - } - }; - - let mut res = http::Response::with_status(http::StatusCode::OK); - http::set_keep_alive_xml_body(&mut res, sync_wrapper::SyncFuture::new(fut), std::time::Duration::from_millis(100))?; - - Ok(res) - } } #[async_trait::async_trait] @@ -466,11 +416,16 @@ impl super::Operation for CompleteMultipartUpload { async fn call(&self, s3: &Arc, req: &mut http::Request) -> S3Result { let input = Self::deserialize_http(req)?; let req = super::build_s3_request(input, req); - let result = s3.complete_multipart_upload(req).await; - let res = match result { - Ok(output) => Self::serialize_http(output)?, - Err(err) => super::serialize_error(err)?, + let s3 = s3.clone(); + let fut = async move { + let result = s3.complete_multipart_upload(req).await; + match result { + Ok(output) => Self::serialize_http(output).unwrap(), + Err(err) => super::serialize_error_no_decl(err).unwrap(), + } }; + let mut res = http::Response::with_status(http::StatusCode::OK); + http::set_keep_alive_xml_body(&mut res, sync_wrapper::SyncFuture::new(fut), std::time::Duration::from_millis(100))?; Ok(res) } } diff --git a/crates/s3s/src/ops/mod.rs b/crates/s3s/src/ops/mod.rs index 97a26b49..f353fa3a 100644 --- a/crates/s3s/src/ops/mod.rs +++ b/crates/s3s/src/ops/mod.rs @@ -67,6 +67,13 @@ fn serialize_error(x: S3Error) -> S3Result { Ok(res) } +fn serialize_error_no_decl(x: S3Error) -> S3Result { + let status = x.status_code().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); + let mut res = Response::with_status(status); + http::set_xml_body_no_decl(&mut res, &x)?; + Ok(res) +} + fn unknown_operation() -> S3Error { S3Error::with_message(S3ErrorCode::NotImplemented, "Unknown operation") } @@ -211,17 +218,7 @@ pub async fn call( } }; - if op.name() == "CompleteMultipartUpload" { - return match CompleteMultipartUpload.call_shared(s3.clone(), req).await { - Ok(res) => Ok(res), - Err(err) => { - debug!(op = %op.name(), ?err, "op returns error"); - serialize_error(err) - } - }; - } - - match op.call(&**s3, req).await { + match op.call(s3, req).await { Ok(res) => Ok(res), Err(err) => { debug!(op = %op.name(), ?err, "op returns error"); From fd9c0ea0a7abe0e3d1918d6df7d82eaf0ddbeae7 Mon Sep 17 00:00:00 2001 From: Liam Perlaki Date: Tue, 11 Apr 2023 13:58:56 +0200 Subject: [PATCH 4/4] make clippy happy --- crates/s3s/src/http/body.rs | 2 ++ crates/s3s/src/keep_alive_body.rs | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/s3s/src/http/body.rs b/crates/s3s/src/http/body.rs index f4603699..850e122e 100644 --- a/crates/s3s/src/http/body.rs +++ b/crates/s3s/src/http/body.rs @@ -67,6 +67,8 @@ impl Body { kind: Kind::DynStream { inner: stream }, } } + + #[must_use] pub fn http_body(body: http_body::combinators::BoxBody) -> Self { Self { kind: Kind::HttpBody { inner: body }, diff --git a/crates/s3s/src/keep_alive_body.rs b/crates/s3s/src/keep_alive_body.rs index 055aa2ac..5d13702d 100644 --- a/crates/s3s/src/keep_alive_body.rs +++ b/crates/s3s/src/keep_alive_body.rs @@ -76,9 +76,9 @@ where let this = self.project(); if let Some(response) = this.response { - return Pin::new(&mut response.body).poll_trailers(cx); + Pin::new(&mut response.body).poll_trailers(cx) } else { - return Poll::Ready(Ok(None)); + Poll::Ready(Ok(None)) } } }