diff --git a/CHANGELOG.next.toml b/CHANGELOG.next.toml index 77a969b1940..42e5b69a620 100644 --- a/CHANGELOG.next.toml +++ b/CHANGELOG.next.toml @@ -97,3 +97,15 @@ message = "[`Number`](https://docs.rs/aws-smithy-types/latest/aws_smithy_types/e references = ["smithy-rs#3294"] meta = { "breaking" = false, "tada" = false, "bug" = false, "target" = "all" } author = "rcoh" + +[[smithy-rs]] +message = "Add support for constructing [`SdkBody`] and [`ByteStream`] from `http-body` 1.0 bodies. Note that this is initial support and works via a backwards compatibility shim to http-body 0.4. Hyper 1.0 is not supported." +references = ["smithy-rs#3300", "aws-sdk-rust#977"] +meta = { "breaking" = false, "tada" = true, "bug" = false, "target" = "all" } +author = "rcoh" + +[[aws-sdk-rust]] +message = "Add support for constructing [`SdkBody`] and [`ByteStream`] from `http-body` 1.0 bodies. Note that this is initial support and works via a backwards compatibility shim to http-body 0.4. Hyper 1.0 is not supported." +references = ["smithy-rs#3300", "aws-sdk-rust#977"] +meta = { "breaking" = false, "tada" = true, "bug" = false } +author = "rcoh" diff --git a/rust-runtime/aws-smithy-types/Cargo.toml b/rust-runtime/aws-smithy-types/Cargo.toml index a46f9924fa4..7118fb5dbad 100644 --- a/rust-runtime/aws-smithy-types/Cargo.toml +++ b/rust-runtime/aws-smithy-types/Cargo.toml @@ -13,6 +13,7 @@ repository = "https://github.com/smithy-lang/smithy-rs" [features] byte-stream-poll-next = [] http-body-0-4-x = ["dep:http-body-0-4"] +http-body-1-x = ["dep:http-body-1-0", "dep:http-body-util", "dep:http-body-0-4", "dep:http-1x"] hyper-0-14-x = ["dep:hyper-0-14"] rt-tokio = [ "dep:http-body-0-4", @@ -32,7 +33,10 @@ base64-simd = "0.8" bytes = "1" bytes-utils = "0.1" http = "0.2.3" +http-1x = { package = "http", version = "1", optional = true } http-body-0-4 = { package = "http-body", version = "0.4.4", optional = true } +http-body-1-0 = { package = "http-body", version = "1", optional = true } +http-body-util = { version = "0.1.0", optional = true } hyper-0-14 = { package = "hyper", version = "0.14.26", optional = true } itoa = "1.0.0" num-integer = "0.1.44" diff --git a/rust-runtime/aws-smithy-types/additional-ci b/rust-runtime/aws-smithy-types/additional-ci index 2d9305d5103..7ce1a378ac5 100755 --- a/rust-runtime/aws-smithy-types/additional-ci +++ b/rust-runtime/aws-smithy-types/additional-ci @@ -12,3 +12,6 @@ cargo tree -d --edges normal --all-features echo "### Checking whether the features are properly feature-gated" ! cargo tree -e no-dev | grep serde + +echo "### Checking feature powerset" +cargo hack check --feature-powerset --exclude-all-features diff --git a/rust-runtime/aws-smithy-types/src/body.rs b/rust-runtime/aws-smithy-types/src/body.rs index 33e5912a935..97985243870 100644 --- a/rust-runtime/aws-smithy-types/src/body.rs +++ b/rust-runtime/aws-smithy-types/src/body.rs @@ -19,6 +19,8 @@ use std::task::{Context, Poll}; /// The name has a suffix `_x` to avoid name collision with a third-party `http-body-0-4`. #[cfg(feature = "http-body-0-4-x")] pub mod http_body_0_4_x; +#[cfg(feature = "http-body-1-x")] +pub mod http_body_1_x; /// A generic, boxed error that's `Send` and `Sync` pub type Error = Box; @@ -55,7 +57,13 @@ impl Debug for SdkBody { /// A boxed generic HTTP body that, when consumed, will result in [`Bytes`] or an [`Error`]. enum BoxBody { - #[cfg(feature = "http-body-0-4-x")] + // This is enabled by the **dependency**, not the feature. This allows us to construct it + // whenever we have the dependency and keep the APIs private + #[cfg(any( + feature = "http-body-0-4-x", + feature = "http-body-1-x", + feature = "rt-tokio" + ))] HttpBody04(http_body_0_4::combinators::BoxBody), } @@ -162,6 +170,27 @@ impl SdkBody { } } + #[cfg(any( + feature = "http-body-0-4-x", + feature = "http-body-1-x", + feature = "rt-tokio" + ))] + pub(crate) fn from_body_0_4_internal(body: T) -> Self + where + T: http_body_0_4::Body + Send + Sync + 'static, + E: Into + 'static, + { + Self { + inner: Inner::Dyn { + inner: BoxBody::HttpBody04(http_body_0_4::combinators::BoxBody::new( + body.map_err(Into::into), + )), + }, + rebuild: None, + bytes_contents: None, + } + } + #[cfg(feature = "http-body-0-4-x")] pub(crate) fn poll_next_trailers( self: Pin<&mut Self>, diff --git a/rust-runtime/aws-smithy-types/src/body/http_body_0_4_x.rs b/rust-runtime/aws-smithy-types/src/body/http_body_0_4_x.rs index a6719d1fa2d..d4eae4cf4cc 100644 --- a/rust-runtime/aws-smithy-types/src/body/http_body_0_4_x.rs +++ b/rust-runtime/aws-smithy-types/src/body/http_body_0_4_x.rs @@ -3,11 +3,13 @@ * SPDX-License-Identifier: Apache-2.0 */ -use crate::body::{BoxBody, Error, Inner, SdkBody}; -use bytes::Bytes; use std::pin::Pin; use std::task::{Context, Poll}; +use bytes::Bytes; + +use crate::body::{Error, SdkBody}; + impl SdkBody { /// Construct an `SdkBody` from a type that implements [`http_body_0_4::Body`](http_body_0_4::Body). /// @@ -17,15 +19,7 @@ impl SdkBody { T: http_body_0_4::Body + Send + Sync + 'static, E: Into + 'static, { - Self { - inner: Inner::Dyn { - inner: BoxBody::HttpBody04(http_body_0_4::combinators::BoxBody::new( - body.map_err(Into::into), - )), - }, - rebuild: None, - bytes_contents: None, - } + SdkBody::from_body_0_4_internal(body) } } diff --git a/rust-runtime/aws-smithy-types/src/body/http_body_1_x.rs b/rust-runtime/aws-smithy-types/src/body/http_body_1_x.rs new file mode 100644 index 00000000000..6b3278d9875 --- /dev/null +++ b/rust-runtime/aws-smithy-types/src/body/http_body_1_x.rs @@ -0,0 +1,267 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +//! Adapters to use http-body 1.0 bodies with SdkBody & ByteStream + +use std::pin::Pin; +use std::task::{ready, Context, Poll}; + +use bytes::Bytes; +use http_body_util::BodyExt; +use pin_project_lite::pin_project; + +use crate::body::{Error, SdkBody}; + +impl SdkBody { + /// Construct an `SdkBody` from a type that implements [`http_body_1_0::Body`](http_body_1_0::Body). + pub fn from_body_1_x(body: T) -> Self + where + T: http_body_1_0::Body + Send + Sync + 'static, + E: Into + 'static, + { + SdkBody::from_body_0_4_internal(Http1toHttp04::new(body.map_err(Into::into))) + } +} + +pin_project! { + struct Http1toHttp04 { + #[pin] + inner: B, + trailers: Option, + } +} + +impl Http1toHttp04 { + fn new(inner: B) -> Self { + Self { + inner, + trailers: None, + } + } +} + +impl http_body_0_4::Body for Http1toHttp04 +where + B: http_body_1_0::Body, +{ + type Data = B::Data; + type Error = B::Error; + + fn poll_data( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + loop { + let this = self.as_mut().project(); + match ready!(this.inner.poll_frame(cx)) { + Some(Ok(frame)) => { + let frame = match frame.into_data() { + Ok(data) => return Poll::Ready(Some(Ok(data))), + Err(frame) => frame, + }; + // when we get a trailers frame, store the trailers for the next poll + if let Ok(trailers) = frame.into_trailers() { + this.trailers.replace(trailers); + return Poll::Ready(None); + }; + // if the frame type was unknown, discard it. the next one might be something + // useful + } + Some(Err(e)) => return Poll::Ready(Some(Err(e))), + None => return Poll::Ready(None), + } + } + } + + fn poll_trailers( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + // all of the polling happens in poll_data, once we get to the trailers we've actually + // already read everything + let this = self.project(); + match this.trailers.take() { + Some(headers) => Poll::Ready(Ok(Some(convert_header_map(headers)))), + None => Poll::Ready(Ok(None)), + } + } + + fn is_end_stream(&self) -> bool { + self.inner.is_end_stream() + } + + fn size_hint(&self) -> http_body_0_4::SizeHint { + let mut size_hint = http_body_0_4::SizeHint::new(); + let inner_hint = self.inner.size_hint(); + if let Some(exact) = inner_hint.exact() { + size_hint.set_exact(exact); + } else { + size_hint.set_lower(inner_hint.lower()); + if let Some(upper) = inner_hint.upper() { + size_hint.set_upper(upper); + } + } + size_hint + } +} + +fn convert_header_map(input: http_1x::HeaderMap) -> http::HeaderMap { + let mut map = http::HeaderMap::with_capacity(input.capacity()); + let mut mem: Option = None; + for (k, v) in input.into_iter() { + let name = k.or_else(|| mem.clone()).unwrap(); + map.append( + http::HeaderName::from_bytes(name.as_str().as_bytes()).expect("already validated"), + http::HeaderValue::from_bytes(v.as_bytes()).expect("already validated"), + ); + mem = Some(name); + } + map +} + +#[cfg(test)] +mod test { + use std::collections::VecDeque; + use std::pin::Pin; + use std::task::{Context, Poll}; + + use bytes::Bytes; + use http::header::{CONTENT_LENGTH as CL0, CONTENT_TYPE as CT0}; + use http_1x::header::{CONTENT_LENGTH as CL1, CONTENT_TYPE as CT1}; + use http_1x::{HeaderMap, HeaderName, HeaderValue}; + use http_body_1_0::Frame; + + use crate::body::http_body_1_x::convert_header_map; + use crate::body::{Error, SdkBody}; + use crate::byte_stream::ByteStream; + + struct TestBody { + chunks: VecDeque, + } + + enum Chunk { + Data(&'static str), + Error(&'static str), + Trailers(HeaderMap), + } + + impl http_body_1_0::Body for TestBody { + type Data = Bytes; + type Error = Error; + + fn poll_frame( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + let next = self.chunks.pop_front(); + let mk = |v: Frame| Poll::Ready(Some(Ok(v))); + + match next { + Some(Chunk::Data(s)) => mk(Frame::data(Bytes::from_static(s.as_bytes()))), + Some(Chunk::Trailers(headers)) => mk(Frame::trailers(headers)), + Some(Chunk::Error(err)) => Poll::Ready(Some(Err(err.into()))), + None => Poll::Ready(None), + } + } + } + + fn trailers() -> HeaderMap { + let mut map = HeaderMap::new(); + map.insert( + HeaderName::from_static("x-test"), + HeaderValue::from_static("x-test-value"), + ); + map.append( + HeaderName::from_static("x-test"), + HeaderValue::from_static("x-test-value-2"), + ); + map.append( + HeaderName::from_static("y-test"), + HeaderValue::from_static("y-test-value-2"), + ); + map + } + + #[tokio::test] + async fn test_body_with_trailers() { + let body = TestBody { + chunks: vec![ + Chunk::Data("123"), + Chunk::Data("456"), + Chunk::Data("789"), + Chunk::Trailers(trailers()), + ] + .into(), + }; + let body = SdkBody::from_body_1_x(body); + let data = ByteStream::new(body); + assert_eq!(data.collect().await.unwrap().to_vec(), b"123456789"); + } + + #[tokio::test] + async fn test_read_trailers() { + let body = TestBody { + chunks: vec![ + Chunk::Data("123"), + Chunk::Data("456"), + Chunk::Data("789"), + Chunk::Trailers(trailers()), + ] + .into(), + }; + let mut body = SdkBody::from_body_1_x(body); + while let Some(_data) = http_body_0_4::Body::data(&mut body).await {} + assert_eq!( + http_body_0_4::Body::trailers(&mut body).await.unwrap(), + Some(convert_header_map(trailers())) + ); + } + + #[tokio::test] + async fn test_errors() { + let body = TestBody { + chunks: vec![ + Chunk::Data("123"), + Chunk::Data("456"), + Chunk::Data("789"), + Chunk::Error("errors!"), + ] + .into(), + }; + + let body = SdkBody::from_body_1_x(body); + let body = ByteStream::new(body); + body.collect().await.expect_err("body returned an error"); + } + #[tokio::test] + async fn test_no_trailers() { + let body = TestBody { + chunks: vec![Chunk::Data("123"), Chunk::Data("456"), Chunk::Data("789")].into(), + }; + + let body = SdkBody::from_body_1_x(body); + let body = ByteStream::new(body); + assert_eq!(body.collect().await.unwrap().to_vec(), b"123456789"); + } + + #[test] + fn test_convert_headers() { + let mut http1_headermap = http_1x::HeaderMap::new(); + http1_headermap.append(CT1, HeaderValue::from_static("a")); + http1_headermap.append(CT1, HeaderValue::from_static("b")); + http1_headermap.append(CT1, HeaderValue::from_static("c")); + + http1_headermap.insert(CL1, HeaderValue::from_static("1234")); + + let mut expect = http::HeaderMap::new(); + expect.append(CT0, http::HeaderValue::from_static("a")); + expect.append(CT0, http::HeaderValue::from_static("b")); + expect.append(CT0, http::HeaderValue::from_static("c")); + + expect.insert(CL0, http::HeaderValue::from_static("1234")); + + assert_eq!(convert_header_map(http1_headermap), expect); + } +} diff --git a/rust-runtime/aws-smithy-types/src/byte_stream.rs b/rust-runtime/aws-smithy-types/src/byte_stream.rs index 2721b1b6b21..3acb18b5625 100644 --- a/rust-runtime/aws-smithy-types/src/byte_stream.rs +++ b/rust-runtime/aws-smithy-types/src/byte_stream.rs @@ -148,6 +148,8 @@ pub use self::bytestream_util::FsBuilder; /// The name has a suffix `_x` to avoid name collision with a third-party `http-body-0-4`. #[cfg(feature = "http-body-0-4-x")] pub mod http_body_0_4_x; +#[cfg(feature = "http-body-1-x")] +pub mod http_body_1_x; pin_project! { /// Stream of binary data diff --git a/rust-runtime/aws-smithy-types/src/byte_stream/bytestream_util.rs b/rust-runtime/aws-smithy-types/src/byte_stream/bytestream_util.rs index 39965c90c56..2ed7341eeb6 100644 --- a/rust-runtime/aws-smithy-types/src/byte_stream/bytestream_util.rs +++ b/rust-runtime/aws-smithy-types/src/byte_stream/bytestream_util.rs @@ -193,7 +193,7 @@ impl FsBuilder { let body_loader = move || { // If an offset was provided, seeking will be handled in `PathBody::poll_data` each // time the file is loaded. - SdkBody::from_body_0_4(PathBody::from_path( + SdkBody::from_body_0_4_internal(PathBody::from_path( path.clone(), length, buffer_size, @@ -208,7 +208,8 @@ impl FsBuilder { let _s = file.seek(io::SeekFrom::Start(offset)).await?; } - let body = SdkBody::from_body_0_4(PathBody::from_file(file, length, buffer_size)); + let body = + SdkBody::from_body_0_4_internal(PathBody::from_file(file, length, buffer_size)); Ok(ByteStream::new(body)) } else { diff --git a/rust-runtime/aws-smithy-types/src/byte_stream/http_body_1_x.rs b/rust-runtime/aws-smithy-types/src/byte_stream/http_body_1_x.rs new file mode 100644 index 00000000000..bff8b201ebb --- /dev/null +++ b/rust-runtime/aws-smithy-types/src/byte_stream/http_body_1_x.rs @@ -0,0 +1,21 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +//! Adapters to use http-body 1.0 bodies with SdkBody & ByteStream + +use crate::body::SdkBody; +use crate::byte_stream::ByteStream; +use bytes::Bytes; + +impl ByteStream { + /// Construct a `ByteStream` from a type that implements [`http_body_1_0::Body`](http_body_1_0::Body). + pub fn from_body_1_x(body: T) -> Self + where + T: http_body_1_0::Body + Send + Sync + 'static, + E: Into + 'static, + { + ByteStream::new(SdkBody::from_body_1_x(body)) + } +}