From f43afc05d0f0a11aeef4942687e9bce4b8cc2dd5 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Fri, 8 Sep 2023 20:16:55 -0500 Subject: [PATCH 01/25] Remove `futures_core::stream::Stream` from `aws-smithy-async` --- rust-runtime/aws-smithy-async/Cargo.toml | 7 +- .../aws-smithy-async/external-types.toml | 3 - .../aws-smithy-async/src/future/mod.rs | 2 +- .../{fn_stream.rs => pagination_stream.rs} | 246 ++++++++++++------ .../src/future/pagination_stream/collect.rs | 75 ++++++ .../aws-smithy-async/src/future/rendezvous.rs | 22 +- 6 files changed, 255 insertions(+), 100 deletions(-) rename rust-runtime/aws-smithy-async/src/future/{fn_stream.rs => pagination_stream.rs} (50%) create mode 100644 rust-runtime/aws-smithy-async/src/future/pagination_stream/collect.rs diff --git a/rust-runtime/aws-smithy-async/Cargo.toml b/rust-runtime/aws-smithy-async/Cargo.toml index c95862d9ff..fd51b4fb1e 100644 --- a/rust-runtime/aws-smithy-async/Cargo.toml +++ b/rust-runtime/aws-smithy-async/Cargo.toml @@ -14,13 +14,18 @@ test-util = [] [dependencies] pin-project-lite = "0.2" tokio = { version = "1.23.1", features = ["sync"] } -tokio-stream = { version = "0.1.5", default-features = false } futures-util = { version = "0.3.16", default-features = false } [dev-dependencies] +pin-utils = "0.1" tokio = { version = "1.23.1", features = ["rt", "macros", "test-util"] } tokio-test = "0.4.2" +# futures-util is used by `now_or_later`, for instance, but the tooling +# reports a false positive, saying it is unused. +[package.metadata.cargo-udeps.ignore] +normal = ["futures-util"] + [package.metadata.docs.rs] all-features = true targets = ["x86_64-unknown-linux-gnu"] diff --git a/rust-runtime/aws-smithy-async/external-types.toml b/rust-runtime/aws-smithy-async/external-types.toml index 424f7dc1db..464456a2dc 100644 --- a/rust-runtime/aws-smithy-async/external-types.toml +++ b/rust-runtime/aws-smithy-async/external-types.toml @@ -2,7 +2,4 @@ allowed_external_types = [ "aws_smithy_types::config_bag::storable::Storable", "aws_smithy_types::config_bag::storable::StoreReplace", "aws_smithy_types::config_bag::storable::Storer", - - # TODO(https://github.com/awslabs/smithy-rs/issues/1193): Switch to AsyncIterator once standardized - "futures_core::stream::Stream", ] diff --git a/rust-runtime/aws-smithy-async/src/future/mod.rs b/rust-runtime/aws-smithy-async/src/future/mod.rs index 1e99bdc304..44894e0733 100644 --- a/rust-runtime/aws-smithy-async/src/future/mod.rs +++ b/rust-runtime/aws-smithy-async/src/future/mod.rs @@ -5,8 +5,8 @@ //! Useful runtime-agnostic future implementations. -pub mod fn_stream; pub mod never; pub mod now_or_later; +pub mod pagination_stream; pub mod rendezvous; pub mod timeout; diff --git a/rust-runtime/aws-smithy-async/src/future/fn_stream.rs b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs similarity index 50% rename from rust-runtime/aws-smithy-async/src/future/fn_stream.rs rename to rust-runtime/aws-smithy-async/src/future/pagination_stream.rs index 804b08f6bb..a8386dcfc5 100644 --- a/rust-runtime/aws-smithy-async/src/future/fn_stream.rs +++ b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs @@ -3,16 +3,51 @@ * SPDX-License-Identifier: Apache-2.0 */ -//! Utility to drive a stream with an async function and a channel. +//! Types to support stream-like operations for paginators. use crate::future::rendezvous; -use futures_util::StreamExt; use pin_project_lite::pin_project; +use std::fmt; +use std::future::poll_fn; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use tokio_stream::{Iter, Once, Stream}; +pub mod collect; + +/// A wrapper around [`FnStream`]. +/// +/// This type provides the same set of methods as [`FnStream`], but that is meant to be used +/// internally and not by external users. +#[derive(Debug)] +pub struct PaginationStream(FnStream); + +impl PaginationStream { + /// Creates a `PaginationStream` from the given [`FnStream`]. + pub fn new(stream: FnStream) -> Self { + Self(stream) + } + + /// Consumes and returns the next `Item` from this stream. + pub async fn next(&mut self) -> Option + where + Self: Unpin, + { + self.0.next().await + } + + /// Consumes this stream and gathers elements into a collection. + pub async fn collect>(self) -> T { + self.0.collect().await + } +} + +impl PaginationStream> { + /// Yields the next item in the stream or returns an error if an error is encountered. + pub async fn try_next(&mut self) -> Result, E> { + self.next().await.transpose() + } +} pin_project! { /// Utility to drive a stream with an async function and a channel. /// @@ -24,12 +59,14 @@ pin_project! { /// /// If `tx.send` returns an error, the function MUST return immediately. /// + /// Note `FnStream` is only `Send` but not `Sync` because `generator` is a boxed future that + /// is `Send` and returns `()` as output when it is done. + /// /// # Examples /// ```no_run - /// use tokio_stream::StreamExt; /// # async fn docs() { - /// use aws_smithy_async::future::fn_stream::FnStream; - /// let stream = FnStream::new(|tx| Box::pin(async move { + /// use aws_smithy_async::future::pagination_stream::FnStream; + /// let mut stream = FnStream::new(|tx| Box::pin(async move { /// if let Err(_) = tx.send("Hello!").await { /// return; /// } @@ -39,52 +76,82 @@ pin_project! { /// })); /// assert_eq!(stream.collect::>().await, vec!["Hello!", "Goodbye!"]); /// # } - pub struct FnStream { + pub struct FnStream { #[pin] rx: rendezvous::Receiver, - #[pin] - generator: Option, + generator: Option + Send + 'static>>>, + } +} + +impl fmt::Debug for FnStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let item_typename = std::any::type_name::(); + write!(f, "FnStream<{item_typename}>") } } -impl FnStream { +impl FnStream { /// Creates a new function based stream driven by `generator`. /// /// For examples, see the documentation for [`FnStream`] pub fn new(generator: T) -> Self where - T: FnOnce(rendezvous::Sender) -> F, + T: FnOnce(rendezvous::Sender) -> Pin + Send + 'static>>, { let (tx, rx) = rendezvous::channel::(); Self { rx, - generator: Some(generator(tx)), + generator: Some(Box::pin(generator(tx))), } } -} -impl Stream for FnStream -where - F: Future, -{ - type Item = Item; + /// Consumes and returns the next `Item` from this stream. + pub async fn next(&mut self) -> Option + where + Self: Unpin, + { + let mut me = Pin::new(self); + poll_fn(|cx| me.as_mut().poll_next(cx)).await + } - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + /// Attempts to pull out the next value of this stream, returning `None` if the stream is + /// exhausted. + pub fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut me = self.project(); match me.rx.poll_recv(cx) { Poll::Ready(item) => Poll::Ready(item), Poll::Pending => { - if let Some(generator) = me.generator.as_mut().as_pin_mut() { - if generator.poll(cx).is_ready() { - // if the generator returned ready we MUST NOT poll it again—doing so - // will cause a panic. - me.generator.set(None); + if let Some(generator) = me.generator { + if generator.as_mut().poll(cx).is_ready() { + // `generator` keeps writing items to `tx` and will not be `Poll::Ready` + // until it is done writing to `tx`. Once it is done, it returns `()` + // as output and is `Poll::Ready`, at which point we MUST NOT poll it again + // since doing so will cause a panic. + *me.generator = None; } } Poll::Pending } } } + + /// Consumes this stream and gathers elements into a collection. + pub async fn collect>(mut self) -> T { + let mut collection = T::initialize(); + while let Some(item) = self.next().await { + if !T::extend(&mut collection, item) { + break; + } + } + T::finalize(collection) + } +} + +impl FnStream> { + /// Yields the next item in the stream or returns an error if an error is encountered. + pub async fn try_next(&mut self) -> Result, E> { + self.next().await.transpose() + } } /// Utility wrapper to flatten paginated results @@ -93,62 +160,50 @@ where /// is present in each item. This provides `items()` which can wrap an stream of `Result` /// and produce a stream of `Result`. #[derive(Debug)] -pub struct TryFlatMap(I); +pub struct TryFlatMap(PaginationStream>); -impl TryFlatMap { - /// Create a `TryFlatMap` that wraps the input - pub fn new(i: I) -> Self { - Self(i) +impl TryFlatMap { + /// Creates a `TryFlatMap` that wraps the input. + pub fn new(stream: PaginationStream>) -> Self { + Self(stream) } - /// Produce a new [`Stream`] by mapping this stream with `map` then flattening the result - pub fn flat_map(self, map: M) -> impl Stream> + /// Produces a new [`PaginationStream`] by mapping this stream with `map` then flattening the result. + pub fn flat_map(mut self, map: M) -> PaginationStream> where - I: Stream>, - M: Fn(Page) -> Iter, - Iter: IntoIterator, + Page: Send + 'static, + Err: Send + 'static, + M: Fn(Page) -> Iter + Send + 'static, + Item: Send + 'static, + Iter: IntoIterator + Send, + ::IntoIter: Send, { - self.0.flat_map(move |page| match page { - Ok(page) => OnceOrMany::Many { - many: tokio_stream::iter(map(page).into_iter().map(Ok)), - }, - Err(e) => OnceOrMany::Once { - once: tokio_stream::once(Err(e)), - }, - }) - } -} - -pin_project! { - /// Helper enum to to support returning `Once` and `Iter` from `Items::items` - #[project = OnceOrManyProj] - enum OnceOrMany { - Many { #[pin] many: Iter }, - Once { #[pin] once: Once }, - } -} - -impl Stream for OnceOrMany -where - Iter: Iterator, -{ - type Item = Item; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let me = self.project(); - match me { - OnceOrManyProj::Many { many } => many.poll_next(cx), - OnceOrManyProj::Once { once } => once.poll_next(cx), - } + PaginationStream::new(FnStream::new(|tx| { + Box::pin(async move { + while let Some(page) = self.0.next().await { + match page { + Ok(page) => { + let mapped = map(page); + for item in mapped.into_iter() { + let _ = tx.send(Ok(item)).await; + } + } + Err(e) => { + let _ = tx.send(Err(e)).await; + break; + } + } + } + }) as Pin + Send>> + })) } } #[cfg(test)] mod test { - use crate::future::fn_stream::{FnStream, TryFlatMap}; + use crate::future::pagination_stream::{FnStream, PaginationStream, TryFlatMap}; use std::sync::{Arc, Mutex}; use std::time::Duration; - use tokio_stream::StreamExt; /// basic test of FnStream functionality #[tokio::test] @@ -168,7 +223,24 @@ mod test { while let Some(value) = stream.next().await { out.push(value); } - assert_eq!(out, vec!["1", "2", "3"]); + assert_eq!(vec!["1", "2", "3"], out); + } + + #[tokio::test] + async fn fn_stream_try_next() { + tokio::time::pause(); + let mut stream = FnStream::new(|tx| { + Box::pin(async move { + tx.send(Ok(1)).await.unwrap(); + tx.send(Ok(2)).await.unwrap(); + tx.send(Err("err")).await.unwrap(); + }) + }); + let mut out = vec![]; + while let Ok(value) = stream.try_next().await { + out.push(value); + } + assert_eq!(vec![Some(1), Some(2)], out); } // smithy-rs#1902: there was a bug where we could continue to poll the generator after it @@ -183,10 +255,16 @@ mod test { Box::leak(Box::new(tx)); }) }); - assert_eq!(stream.next().await, Some("blah")); + assert_eq!(Some("blah"), stream.next().await); let mut test_stream = tokio_test::task::spawn(stream); - assert!(test_stream.poll_next().is_pending()); - assert!(test_stream.poll_next().is_pending()); + let _ = test_stream.enter(|ctx, pin| { + let polled = pin.poll_next(ctx); + assert!(polled.is_pending()); + }); + let _ = test_stream.enter(|ctx, pin| { + let polled = pin.poll_next(ctx); + assert!(polled.is_pending()); + }); } /// Tests that the generator will not advance until demand exists @@ -209,13 +287,13 @@ mod test { stream.next().await.expect("ready"); assert_eq!(*progress.lock().unwrap(), 1); - assert_eq!(stream.next().await.expect("ready"), "2"); - assert_eq!(*progress.lock().unwrap(), 2); + assert_eq!("2", stream.next().await.expect("ready")); + assert_eq!(2, *progress.lock().unwrap()); let _ = stream.next().await.expect("ready"); - assert_eq!(*progress.lock().unwrap(), 3); - assert_eq!(stream.next().await, None); - assert_eq!(*progress.lock().unwrap(), 4); + assert_eq!(3, *progress.lock().unwrap()); + assert_eq!(None, stream.next().await); + assert_eq!(4, *progress.lock().unwrap()); } #[tokio::test] @@ -238,7 +316,7 @@ mod test { while let Some(Ok(value)) = stream.next().await { out.push(value); } - assert_eq!(out, vec![0, 1]); + assert_eq!(vec![0, 1], out); } #[tokio::test] @@ -262,12 +340,12 @@ mod test { }) }); assert_eq!( - TryFlatMap(stream) + Ok(vec![1, 2, 3, 4, 5, 6]), + TryFlatMap::new(PaginationStream::new(stream)) .flat_map(|output| output.items.into_iter()) .collect::, &str>>() .await, - Ok(vec![1, 2, 3, 4, 5, 6]) - ) + ); } #[tokio::test] @@ -287,11 +365,11 @@ mod test { }) }); assert_eq!( - TryFlatMap(stream) + Err("bummer"), + TryFlatMap::new(PaginationStream::new(stream)) .flat_map(|output| output.items.into_iter()) .collect::, &str>>() - .await, - Err("bummer") + .await ) } } diff --git a/rust-runtime/aws-smithy-async/src/future/pagination_stream/collect.rs b/rust-runtime/aws-smithy-async/src/future/pagination_stream/collect.rs new file mode 100644 index 0000000000..1a6fcfbf9a --- /dev/null +++ b/rust-runtime/aws-smithy-async/src/future/pagination_stream/collect.rs @@ -0,0 +1,75 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +//! Module to extend the functionality of types in `patination_stream` module to allow for +//! collecting elements of the stream into collection. +//! +//! Majority of the code is borrowed from +//! + +pub(crate) mod sealed { + /// A trait that signifies that elements can be collected into `T`. + /// + /// Currently the trait may not be implemented by clients so we can make changes in the future + /// without breaking code depending on it. + #[doc(hidden)] + pub trait Collectable { + type Collection; + + fn initialize() -> Self::Collection; + + fn extend(collection: &mut Self::Collection, item: T) -> bool; + + fn finalize(collection: Self::Collection) -> Self; + } +} + +impl sealed::Collectable for Vec { + type Collection = Self; + + fn initialize() -> Self::Collection { + Vec::default() + } + + fn extend(collection: &mut Self::Collection, item: T) -> bool { + collection.push(item); + true + } + + fn finalize(collection: Self::Collection) -> Self { + collection + } +} + +impl sealed::Collectable> for Result +where + U: sealed::Collectable, +{ + type Collection = Result; + + fn initialize() -> Self::Collection { + Ok(U::initialize()) + } + + fn extend(collection: &mut Self::Collection, item: Result) -> bool { + match item { + Ok(item) => { + let collection = collection.as_mut().ok().expect("invalid state"); + U::extend(collection, item) + } + Err(e) => { + *collection = Err(e); + false + } + } + } + + fn finalize(collection: Self::Collection) -> Self { + match collection { + Ok(collection) => Ok(U::finalize(collection)), + err @ Err(_) => Err(err.map(drop).unwrap_err()), + } + } +} diff --git a/rust-runtime/aws-smithy-async/src/future/rendezvous.rs b/rust-runtime/aws-smithy-async/src/future/rendezvous.rs index 16456f123e..52bb05ff66 100644 --- a/rust-runtime/aws-smithy-async/src/future/rendezvous.rs +++ b/rust-runtime/aws-smithy-async/src/future/rendezvous.rs @@ -10,8 +10,9 @@ //! and coordinate with the receiver. //! //! Rendezvous channels should be used with care—it's inherently easy to deadlock unless they're being -//! used from separate tasks or an a coroutine setup (e.g. [`crate::future::fn_stream::FnStream`]) +//! used from separate tasks or an a coroutine setup (e.g. [`crate::future::pagination_stream::FnStream`]) +use std::future::poll_fn; use std::sync::Arc; use std::task::{Context, Poll}; use tokio::sync::Semaphore; @@ -104,7 +105,11 @@ pub struct Receiver { impl Receiver { /// Polls to receive an item from the channel - pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { + pub async fn recv(&mut self) -> Option { + poll_fn(|cx| self.poll_recv(cx)).await + } + + pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { // This uses `needs_permit` to track whether this is the first poll since we last returned an item. // If it is, we will grant a permit to the semaphore. Otherwise, we'll just forward the response through. let resp = self.chan.poll_recv(cx); @@ -124,13 +129,8 @@ impl Receiver { #[cfg(test)] mod test { - use crate::future::rendezvous::{channel, Receiver}; + use crate::future::rendezvous::channel; use std::sync::{Arc, Mutex}; - use tokio::macros::support::poll_fn; - - async fn recv(rx: &mut Receiver) -> Option { - poll_fn(|cx| rx.poll_recv(cx)).await - } #[tokio::test] async fn send_blocks_caller() { @@ -145,11 +145,11 @@ mod test { *idone.lock().unwrap() = 3; }); assert_eq!(*done.lock().unwrap(), 0); - assert_eq!(recv(&mut rx).await, Some(0)); + assert_eq!(rx.recv().await, Some(0)); assert_eq!(*done.lock().unwrap(), 1); - assert_eq!(recv(&mut rx).await, Some(1)); + assert_eq!(rx.recv().await, Some(1)); assert_eq!(*done.lock().unwrap(), 2); - assert_eq!(recv(&mut rx).await, None); + assert_eq!(rx.recv().await, None); assert_eq!(*done.lock().unwrap(), 3); let _ = send.await; } From cbbaf7715bfd29aae3bc88d33c0f0935a41d5901 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Fri, 8 Sep 2023 20:17:05 -0500 Subject: [PATCH 02/25] Update codegen client to use `PaginationStream` --- .../smithy/generators/PaginatorGenerator.kt | 20 ++++++++++--------- .../client/FluentClientGenerator.kt | 2 +- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/PaginatorGenerator.kt b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/PaginatorGenerator.kt index dedc50f31f..d0fef52931 100644 --- a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/PaginatorGenerator.kt +++ b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/PaginatorGenerator.kt @@ -90,7 +90,7 @@ class PaginatorGenerator private constructor( "HttpResponse" to RuntimeType.smithyRuntimeApi(runtimeConfig).resolve("client::orchestrator::HttpResponse"), "SdkError" to RuntimeType.sdkError(runtimeConfig), "client" to RuntimeType.smithyClient(runtimeConfig), - "fn_stream" to RuntimeType.smithyAsync(runtimeConfig).resolve("future::fn_stream"), + "pagination_stream" to RuntimeType.smithyAsync(runtimeConfig).resolve("future::pagination_stream"), // External Types "Stream" to RuntimeType.TokioStream.resolve("Stream"), @@ -141,13 +141,14 @@ class PaginatorGenerator private constructor( /// Create the pagination stream /// - /// _Note:_ No requests will be dispatched until the stream is used (eg. with [`.next().await`](tokio_stream::StreamExt::next)). - pub fn send(self) -> impl #{Stream} + #{Unpin} { + /// _Note:_ No requests will be dispatched until the stream is used + /// (e.g. with the [`.next().await`](aws_smithy_async::future::pagination_stream::PaginationStream::next) method). + pub fn send(self) -> #{pagination_stream}::PaginationStream<#{item_type}> { // Move individual fields out of self for the borrow checker let builder = self.builder; let handle = self.handle; #{runtime_plugin_init} - #{fn_stream}::FnStream::new(move |tx| #{Box}::pin(async move { + #{pagination_stream}::PaginationStream::new(#{pagination_stream}::FnStream::new(move |tx| #{Box}::pin(async move { // Build the input for the first time. If required fields are missing, this is where we'll produce an early error. let mut input = match builder.build().map_err(#{SdkError}::construction_failure) { #{Ok}(input) => input, @@ -177,7 +178,7 @@ class PaginatorGenerator private constructor( return } } - })) + }))) } } """, @@ -257,11 +258,12 @@ class PaginatorGenerator private constructor( impl ${paginatorName}Items { /// Create the pagination stream /// - /// _Note: No requests will be dispatched until the stream is used (eg. with [`.next().await`](tokio_stream::StreamExt::next))._ + /// _Note_: No requests will be dispatched until the stream is used + /// (e.g. with the [`.next().await`](aws_smithy_async::future::pagination_stream::PaginationStream::next) method). /// - /// To read the entirety of the paginator, use [`.collect::, _>()`](tokio_stream::StreamExt::collect). - pub fn send(self) -> impl #{Stream} + #{Unpin} { - #{fn_stream}::TryFlatMap::new(self.0.send()).flat_map(|page| #{extract_items}(page).unwrap_or_default().into_iter()) + /// To read the entirety of the paginator, use [`.collect::, _>()`](aws_smithy_async::future::pagination_stream::PaginationStream::collect). + pub fn send(self) -> #{pagination_stream}::PaginationStream<#{item_type}> { + #{pagination_stream}::TryFlatMap::new(self.0.send()).flat_map(|page| #{extract_items}(page).unwrap_or_default().into_iter()) } } diff --git a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt index b2927dca8e..d5655c4879 100644 --- a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt +++ b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt @@ -433,7 +433,7 @@ class FluentClientGenerator( """ /// Create a paginator for this request /// - /// Paginators are used by calling [`send().await`](#{Paginator}::send) which returns a `Stream`. + /// Paginators are used by calling [`send().await`](#{Paginator}::send) which returns a [`PaginationStream`](aws_smithy_async::future::pagination_stream::PaginationStream). pub fn into_paginator(self) -> #{Paginator} { #{Paginator}::new(self.handle, self.inner) } From 8005c3de85d76856dee865a37f8b37adb8e20fc6 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Fri, 8 Sep 2023 20:17:17 -0500 Subject: [PATCH 03/25] Update canary to use `PaginationStream` --- .../src/latest/paginator_canary.rs | 1 - tools/ci-cdk/canary-lambda/src/main.rs | 10 +- ...se_2023_01_26.rs => release_2023_08_23.rs} | 0 .../paginator_canary.rs | 6 +- .../s3_canary.rs | 4 +- .../transcribe_canary.rs | 4 +- .../ci-cdk/canary-runner/src/build_bundle.rs | 100 +++++++++--------- 7 files changed, 62 insertions(+), 63 deletions(-) rename tools/ci-cdk/canary-lambda/src/{release_2023_01_26.rs => release_2023_08_23.rs} (100%) rename tools/ci-cdk/canary-lambda/src/{release_2023_01_26 => release_2023_08_23}/paginator_canary.rs (92%) rename tools/ci-cdk/canary-lambda/src/{release_2023_01_26 => release_2023_08_23}/s3_canary.rs (98%) rename tools/ci-cdk/canary-lambda/src/{release_2023_01_26 => release_2023_08_23}/transcribe_canary.rs (97%) diff --git a/tools/ci-cdk/canary-lambda/src/latest/paginator_canary.rs b/tools/ci-cdk/canary-lambda/src/latest/paginator_canary.rs index d50c4f2be8..11914660ad 100644 --- a/tools/ci-cdk/canary-lambda/src/latest/paginator_canary.rs +++ b/tools/ci-cdk/canary-lambda/src/latest/paginator_canary.rs @@ -10,7 +10,6 @@ use aws_sdk_ec2 as ec2; use aws_sdk_ec2::types::InstanceType; use crate::CanaryEnv; -use tokio_stream::StreamExt; mk_canary!( "ec2_paginator", diff --git a/tools/ci-cdk/canary-lambda/src/main.rs b/tools/ci-cdk/canary-lambda/src/main.rs index 688462031d..8fc6f4b2e7 100644 --- a/tools/ci-cdk/canary-lambda/src/main.rs +++ b/tools/ci-cdk/canary-lambda/src/main.rs @@ -26,11 +26,11 @@ mod latest; #[cfg(feature = "latest")] pub(crate) use latest as current_canary; -// NOTE: This module can be deleted 3 releases after release-2023-01-26 -#[cfg(feature = "release-2023-01-26")] -mod release_2023_01_26; -#[cfg(feature = "release-2023-01-26")] -pub(crate) use release_2023_01_26 as current_canary; +// NOTE: This module can be deleted 3 releases after release-2023-08-23 +#[cfg(feature = "release-2023-08-23")] +mod release_2023_08_23; +#[cfg(feature = "release-2023-08-23")] +pub(crate) use release_2023_08_23 as current_canary; #[tokio::main] async fn main() -> Result<(), Error> { diff --git a/tools/ci-cdk/canary-lambda/src/release_2023_01_26.rs b/tools/ci-cdk/canary-lambda/src/release_2023_08_23.rs similarity index 100% rename from tools/ci-cdk/canary-lambda/src/release_2023_01_26.rs rename to tools/ci-cdk/canary-lambda/src/release_2023_08_23.rs diff --git a/tools/ci-cdk/canary-lambda/src/release_2023_01_26/paginator_canary.rs b/tools/ci-cdk/canary-lambda/src/release_2023_08_23/paginator_canary.rs similarity index 92% rename from tools/ci-cdk/canary-lambda/src/release_2023_01_26/paginator_canary.rs rename to tools/ci-cdk/canary-lambda/src/release_2023_08_23/paginator_canary.rs index 72c9b40ed0..66df5a03e4 100644 --- a/tools/ci-cdk/canary-lambda/src/release_2023_01_26/paginator_canary.rs +++ b/tools/ci-cdk/canary-lambda/src/release_2023_08_23/paginator_canary.rs @@ -7,7 +7,7 @@ use crate::mk_canary; use anyhow::bail; use aws_sdk_ec2 as ec2; -use aws_sdk_ec2::model::InstanceType; +use aws_sdk_ec2::types::InstanceType; use crate::CanaryEnv; use tokio_stream::StreamExt; @@ -30,7 +30,7 @@ pub async fn paginator_canary(client: ec2::Client, page_size: usize) -> anyhow:: let mut num_pages = 0; while let Some(page) = history.try_next().await? { let items_in_page = page.spot_price_history.unwrap_or_default().len(); - if items_in_page > page_size as usize { + if items_in_page > page_size { bail!( "failed to retrieve results of correct page size (expected {}, got {})", page_size, @@ -60,7 +60,7 @@ pub async fn paginator_canary(client: ec2::Client, page_size: usize) -> anyhow:: #[cfg(test)] mod test { - use crate::paginator_canary::paginator_canary; + use crate::current_canary::paginator_canary::paginator_canary; #[tokio::test] async fn test_paginator() { diff --git a/tools/ci-cdk/canary-lambda/src/release_2023_01_26/s3_canary.rs b/tools/ci-cdk/canary-lambda/src/release_2023_08_23/s3_canary.rs similarity index 98% rename from tools/ci-cdk/canary-lambda/src/release_2023_01_26/s3_canary.rs rename to tools/ci-cdk/canary-lambda/src/release_2023_08_23/s3_canary.rs index 70e3d18c55..fbcba976d8 100644 --- a/tools/ci-cdk/canary-lambda/src/release_2023_01_26/s3_canary.rs +++ b/tools/ci-cdk/canary-lambda/src/release_2023_08_23/s3_canary.rs @@ -8,8 +8,8 @@ use crate::{mk_canary, CanaryEnv}; use anyhow::Context; use aws_config::SdkConfig; use aws_sdk_s3 as s3; -use aws_sdk_s3::presigning::config::PresigningConfig; -use s3::types::ByteStream; +use s3::presigning::PresigningConfig; +use s3::primitives::ByteStream; use std::time::Duration; use uuid::Uuid; diff --git a/tools/ci-cdk/canary-lambda/src/release_2023_01_26/transcribe_canary.rs b/tools/ci-cdk/canary-lambda/src/release_2023_08_23/transcribe_canary.rs similarity index 97% rename from tools/ci-cdk/canary-lambda/src/release_2023_01_26/transcribe_canary.rs rename to tools/ci-cdk/canary-lambda/src/release_2023_08_23/transcribe_canary.rs index 554f4c3ddf..8f6420fc1b 100644 --- a/tools/ci-cdk/canary-lambda/src/release_2023_01_26/transcribe_canary.rs +++ b/tools/ci-cdk/canary-lambda/src/release_2023_08_23/transcribe_canary.rs @@ -9,10 +9,10 @@ use async_stream::stream; use aws_config::SdkConfig; use aws_sdk_transcribestreaming as transcribe; use bytes::BufMut; -use transcribe::model::{ +use transcribe::primitives::Blob; +use transcribe::types::{ AudioEvent, AudioStream, LanguageCode, MediaEncoding, TranscriptResultStream, }; -use transcribe::types::Blob; const CHUNK_SIZE: usize = 8192; use crate::canary::CanaryEnv; diff --git a/tools/ci-cdk/canary-runner/src/build_bundle.rs b/tools/ci-cdk/canary-runner/src/build_bundle.rs index 464ee2e4ad..4ec7861460 100644 --- a/tools/ci-cdk/canary-runner/src/build_bundle.rs +++ b/tools/ci-cdk/canary-runner/src/build_bundle.rs @@ -63,9 +63,10 @@ const REQUIRED_SDK_CRATES: &[&str] = &[ "aws-sdk-transcribestreaming", ]; +// The elements in this `Vec` should be sorted in an ascending order by the release date. lazy_static! { static ref NOTABLE_SDK_RELEASE_TAGS: Vec = vec![ - ReleaseTag::from_str("release-2023-01-26").unwrap(), // last version before the crate reorg + ReleaseTag::from_str("release-2023-08-23").unwrap(), // last version before `Stream` trait removal ]; } @@ -112,38 +113,58 @@ enum CrateSource { }, } -fn enabled_features(crate_source: &CrateSource) -> Vec { - let mut enabled = Vec::new(); +fn enabled_feature(crate_source: &CrateSource) -> String { if let CrateSource::VersionsManifest { release_tag, .. } = crate_source { - // we want to select the newest module specified after this release + // we want to select the oldest module specified after this release for notable in NOTABLE_SDK_RELEASE_TAGS.iter() { tracing::debug!(release_tag = ?release_tag, notable = ?notable, "considering if release tag came before notable release"); if release_tag <= notable { tracing::debug!("selecting {} as chosen release", notable); - enabled.push(notable.as_str().into()); - break; + return notable.as_str().into(); } } } - if enabled.is_empty() { - enabled.push("latest".into()); - } - enabled + "latest".into() } fn generate_crate_manifest(crate_source: CrateSource) -> Result { let mut output = BASE_MANIFEST.to_string(); - for &sdk_crate in REQUIRED_SDK_CRATES { + write_dependencies(REQUIRED_SDK_CRATES, &mut output, &crate_source)?; + write!(output, "\n[features]\n").unwrap(); + writeln!(output, "latest = []").unwrap(); + for release_tag in NOTABLE_SDK_RELEASE_TAGS.iter() { + writeln!( + output, + "\"{release_tag}\" = []", + release_tag = release_tag.as_str() + ) + .unwrap(); + } + writeln!( + output, + "default = [\"{enabled}\"]", + enabled = enabled_feature(&crate_source) + ) + .unwrap(); + Ok(output) +} + +fn write_dependencies( + required_crates: &[&str], + output: &mut String, + crate_source: &CrateSource, +) -> Result<()> { + for &required_crate in required_crates { match &crate_source { CrateSource::Path(path) => { - let path_name = match sdk_crate.strip_prefix("aws-sdk-") { + let path_name = match required_crate.strip_prefix("aws-sdk-") { Some(path) => path, - None => sdk_crate, + None => required_crate, }; let crate_path = path.join(path_name); writeln!( output, - r#"{sdk_crate} = {{ path = "{path}" }}"#, + r#"{required_crate} = {{ path = "{path}" }}"#, path = crate_path.to_string_lossy() ) .unwrap() @@ -151,40 +172,20 @@ fn generate_crate_manifest(crate_source: CrateSource) -> Result { CrateSource::VersionsManifest { versions, release_tag, - } => match versions.crates.get(sdk_crate) { + } => match versions.crates.get(required_crate) { Some(version) => writeln!( output, - r#"{sdk_crate} = "{version}""#, + r#"{required_crate} = "{version}""#, version = version.version ) .unwrap(), None => { - bail!("Couldn't find `{sdk_crate}` in versions.toml for `{release_tag}`") + bail!("Couldn't find `{required_crate}` in versions.toml for `{release_tag}`") } }, } } - write!(output, "\n[features]\n").unwrap(); - writeln!(output, "latest = []").unwrap(); - for release_tag in NOTABLE_SDK_RELEASE_TAGS.iter() { - writeln!( - output, - "\"{release_tag}\" = []", - release_tag = release_tag.as_str() - ) - .unwrap(); - } - writeln!( - output, - "default = [{enabled}]", - enabled = enabled_features(&crate_source) - .into_iter() - .map(|f| format!("\"{f}\"")) - .collect::>() - .join(", ") - ) - .unwrap(); - Ok(output) + Ok(()) } fn sha1_file(path: &Path) -> Result { @@ -441,7 +442,7 @@ aws-sdk-transcribestreaming = { path = "some/sdk/path/transcribestreaming" } [features] latest = [] -"release-2023-01-26" = [] +"release-2023-08-23" = [] default = ["latest"] "#, generate_crate_manifest(CrateSource::Path("some/sdk/path".into())).expect("success") @@ -505,7 +506,7 @@ aws-sdk-transcribestreaming = "0.16.0" [features] latest = [] -"release-2023-01-26" = [] +"release-2023-08-23" = [] default = ["latest"] "#, generate_crate_manifest(CrateSource::VersionsManifest { @@ -523,7 +524,7 @@ default = ["latest"] .collect(), release: None, }, - release_tag: ReleaseTag::from_str("release-2023-05-26").unwrap(), + release_tag: ReleaseTag::from_str("release-2023-08-26").unwrap(), }) .expect("success") ); @@ -577,26 +578,25 @@ default = ["latest"] release: None, }; assert_eq!( - enabled_features(&CrateSource::VersionsManifest { + "latest".to_string(), + enabled_feature(&CrateSource::VersionsManifest { versions: versions.clone(), - release_tag: "release-2023-02-23".parse().unwrap(), + release_tag: "release-9999-12-31".parse().unwrap(), }), - vec!["latest".to_string()] ); - assert_eq!( - enabled_features(&CrateSource::VersionsManifest { + "release-2023-08-23".to_string(), + enabled_feature(&CrateSource::VersionsManifest { versions: versions.clone(), - release_tag: "release-2023-01-26".parse().unwrap(), + release_tag: "release-2023-08-23".parse().unwrap(), }), - vec!["release-2023-01-26".to_string()] ); assert_eq!( - enabled_features(&CrateSource::VersionsManifest { + "release-2023-08-23".to_string(), + enabled_feature(&CrateSource::VersionsManifest { versions, release_tag: "release-2023-01-13".parse().unwrap(), }), - vec!["release-2023-01-26".to_string()] ); } } From 75b96e351ac253fb24bab3e0867e8c13310e6bed Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Fri, 8 Sep 2023 20:17:29 -0500 Subject: [PATCH 04/25] Stop using `tokio_stream::StreamExt` in integration-tests --- aws/sdk/integration-tests/dynamodb/tests/paginators.rs | 2 -- aws/sdk/integration-tests/ec2/tests/paginators.rs | 2 -- 2 files changed, 4 deletions(-) diff --git a/aws/sdk/integration-tests/dynamodb/tests/paginators.rs b/aws/sdk/integration-tests/dynamodb/tests/paginators.rs index 807a11890d..a3d0c62473 100644 --- a/aws/sdk/integration-tests/dynamodb/tests/paginators.rs +++ b/aws/sdk/integration-tests/dynamodb/tests/paginators.rs @@ -6,8 +6,6 @@ use std::collections::HashMap; use std::iter::FromIterator; -use tokio_stream::StreamExt; - use aws_credential_types::Credentials; use aws_sdk_dynamodb::types::AttributeValue; use aws_sdk_dynamodb::{Client, Config}; diff --git a/aws/sdk/integration-tests/ec2/tests/paginators.rs b/aws/sdk/integration-tests/ec2/tests/paginators.rs index 83528f2075..d070971a4f 100644 --- a/aws/sdk/integration-tests/ec2/tests/paginators.rs +++ b/aws/sdk/integration-tests/ec2/tests/paginators.rs @@ -3,8 +3,6 @@ * SPDX-License-Identifier: Apache-2.0 */ -use tokio_stream::StreamExt; - use aws_sdk_ec2::{config::Credentials, config::Region, types::InstanceType, Client, Config}; use aws_smithy_client::http_connector::HttpConnector; use aws_smithy_client::test_connection::TestConnection; From 2b1fcb1a901f22ece2b5dbdb865c2f4ecaa5a3ed Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Mon, 11 Sep 2023 11:07:05 -0500 Subject: [PATCH 05/25] Add comment explaining why the test uses `.enter` This commit responses to https://github.com/awslabs/smithy-rs/pull/2978#discussion_r1321632649 --- rust-runtime/aws-smithy-async/src/future/pagination_stream.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs index a8386dcfc5..083f6190f8 100644 --- a/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs +++ b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs @@ -257,6 +257,9 @@ mod test { }); assert_eq!(Some("blah"), stream.next().await); let mut test_stream = tokio_test::task::spawn(stream); + // `tokio_test::task::Spawn::poll_next` can only be invoked when the wrapped + // type implements the `Stream` trait. Here, `FnStream` does not implement it, + // so we work around it by using the `enter` method. let _ = test_stream.enter(|ctx, pin| { let polled = pin.poll_next(ctx); assert!(polled.is_pending()); From 73966d4d1130bc42e6e71c7245848b2b408f6611 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Mon, 11 Sep 2023 12:12:53 -0500 Subject: [PATCH 06/25] Make `FnStream::poll_next` module private --- rust-runtime/aws-smithy-async/src/future/pagination_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs index 083f6190f8..bf37ade49e 100644 --- a/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs +++ b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs @@ -116,7 +116,7 @@ impl FnStream { /// Attempts to pull out the next value of this stream, returning `None` if the stream is /// exhausted. - pub fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut me = self.project(); match me.rx.poll_recv(cx) { Poll::Ready(item) => Poll::Ready(item), From e25a916ae7a3672f17b51f856c1574ed2e4d454c Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Mon, 11 Sep 2023 14:42:46 -0500 Subject: [PATCH 07/25] Update CHANGELOG.next.toml --- CHANGELOG.next.toml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/CHANGELOG.next.toml b/CHANGELOG.next.toml index 73c59508ae..261b7a2544 100644 --- a/CHANGELOG.next.toml +++ b/CHANGELOG.next.toml @@ -150,3 +150,15 @@ message = "Fix regression with redacting sensitive HTTP response bodies." references = ["smithy-rs#2926", "smithy-rs#2972"] meta = { "breaking" = false, "tada" = false, "bug" = true, "target" = "client" } author = "ysaito1001" + +[[aws-sdk-rust]] +message = "The `futures_core::stream::Stream` trait has been removed from public API. It should not affect usual SDK use cases. If your code uses paginators, you do not need to use the `Stream` trait or its exntension traits, but only the `next`, `try_next`, `collect` methods are supported on `PaginationStream`. Other stream operations made available through the trait or its extension traits are not supported, but we should be able to add them as needed in a backward compatible manner." +references = ["smithy-rs#2978"] +meta = { "breaking" = true, "tada" = false, "bug" = false } +author = "ysaito1001" + +[[smithy-rs]] +message = "The `futures_core::stream::Stream` trait has been removed from public API. [`FnStream`](https://docs.rs/aws-smithy-async/latest/aws_smithy_async/future/pagination_stream/struct.FnStream.html) only supports `next`, `try_next`, and `collect` methods. [`TryFlatMap::flat_map`](https://docs.rs/aws-smithy-async/latest/aws_smithy_async/future/pagination_stream/struct.TryFlatMap.html#method.flat_map) returns [`PaginationStream`](https://docs.rs/aws-smithy-async/latest/aws_smithy_async/future/pagination_stream/struct.PaginationStream.html), which should be preferred to `FnStream` at an interface level. For stream operations previously made available through the trait or its extension traits, we should be able to add them as needed in a backward compatible manner." +references = ["smithy-rs#2978"] +meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "client" } +author = "ysaito1001" From cce169c9ced02f53ed6536c9ba2e515a7de4f5b0 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Tue, 12 Sep 2023 16:42:23 -0500 Subject: [PATCH 08/25] Remove impl `Stream` trait for `ByteStream` --- .../aws-smithy-http/src/byte_stream.rs | 102 ++++++++++-------- .../src/futures_stream_adapter.rs | 61 +++++++++++ rust-runtime/aws-smithy-http/src/lib.rs | 2 + 3 files changed, 120 insertions(+), 45 deletions(-) create mode 100644 rust-runtime/aws-smithy-http/src/futures_stream_adapter.rs diff --git a/rust-runtime/aws-smithy-http/src/byte_stream.rs b/rust-runtime/aws-smithy-http/src/byte_stream.rs index e067018a9d..49e7b4d6a6 100644 --- a/rust-runtime/aws-smithy-http/src/byte_stream.rs +++ b/rust-runtime/aws-smithy-http/src/byte_stream.rs @@ -48,7 +48,8 @@ //! //! ### Stream a ByteStream into a file //! The previous example is recommended in cases where loading the entire file into memory first is desirable. For extremely large -//! files, you may wish to stream the data directly to the file system, chunk by chunk. This is posible using the `futures::Stream` implementation. +//! files, you may wish to stream the data directly to the file system, chunk by chunk. +//! This is possible using the [`.next()`](crate::byte_stream::ByteStream::next) method. //! //! ```no_run //! use bytes::{Buf, Bytes}; @@ -128,6 +129,7 @@ use bytes::Bytes; use bytes_utils::SegmentedBuf; use http_body::Body; use pin_project_lite::pin_project; +use std::future::poll_fn; use std::io::IoSlice; use std::pin::Pin; use std::task::{Context, Poll}; @@ -166,9 +168,7 @@ pin_project! { /// println!("first chunk: {:?}", data.chunk()); /// } /// ``` - /// 2. Via [`impl Stream`](futures_core::Stream): - /// - /// _Note: An import of `StreamExt` is required to use `.try_next()`._ + /// 2. Via [`.next()`](crate::byte_stream::ByteStream::next) or [`.try_next()`](crate::byte_stream::ByteStream::try_next): /// /// For use-cases where holding the entire ByteStream in memory is unnecessary, use the /// `Stream` implementation: @@ -183,7 +183,6 @@ pin_project! { /// # } /// use aws_smithy_http::byte_stream::{ByteStream, AggregatedBytes, error::Error}; /// use aws_smithy_http::body::SdkBody; - /// use tokio_stream::StreamExt; /// /// async fn example() -> Result<(), Error> { /// let mut stream = ByteStream::from(vec![1, 2, 3, 4, 5, 99]); @@ -276,7 +275,7 @@ impl ByteStream { } } - /// Consumes the ByteStream, returning the wrapped SdkBody + /// Consume the `ByteStream`, returning the wrapped SdkBody. // Backwards compatibility note: Because SdkBody has a dyn variant, // we will always be able to implement this method, even if we stop using // SdkBody as the internal representation @@ -284,6 +283,31 @@ impl ByteStream { self.inner.body } + /// Return the next item in the `ByteStream`. + pub async fn next(&mut self) -> Option> { + Some(self.inner.next().await?.map_err(Error::streaming)) + } + + /// Attempt to pull out the next value of this stream, returning `None` if the stream is + /// exhausted. + pub fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + self.project().inner.poll_next(cx).map_err(Error::streaming) + } + + /// Consume and return the next item in the `ByteStream` or return an error if an error is + /// encountered. + pub async fn try_next(&mut self) -> Result, Error> { + self.next().await.transpose() + } + + /// Return the bounds on the remaining length of the `ByteStream`. + pub fn size_hint(&self) -> (u64, Option) { + self.inner.size_hint() + } + /// Read all the data from this `ByteStream` into memory /// /// If an error in the underlying stream is encountered, `ByteStreamError` is returned. @@ -393,7 +417,9 @@ impl ByteStream { /// # } /// ``` pub fn into_async_read(self) -> impl tokio::io::AsyncRead { - tokio_util::io::StreamReader::new(self) + tokio_util::io::StreamReader::new( + crate::futures_stream_adapter::FuturesStreamCompatByteStream::new(self), + ) } /// Given a function to modify an [`SdkBody`], run it on the `SdkBody` inside this `Bytestream`. @@ -442,18 +468,6 @@ impl From for ByteStream { } } -impl futures_core::stream::Stream for ByteStream { - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().inner.poll_next(cx).map_err(Error::streaming) - } - - fn size_hint(&self) -> (usize, Option) { - self.inner.size_hint() - } -} - /// Non-contiguous Binary Data Storage /// /// When data is read from the network, it is read in a sequence of chunks that are not in @@ -524,6 +538,25 @@ impl Inner { Self { body } } + async fn next(&mut self) -> Option> + where + Self: Unpin, + B: http_body::Body, + { + let mut me = Pin::new(self); + poll_fn(|cx| me.as_mut().poll_next(cx)).await + } + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> + where + B: http_body::Body, + { + self.project().body.poll_data(cx) + } + async fn collect(self) -> Result where B: http_body::Body, @@ -536,34 +569,13 @@ impl Inner { } Ok(AggregatedBytes(output)) } -} - -const SIZE_HINT_32_BIT_PANIC_MESSAGE: &str = r#" -You're running a 32-bit system and this stream's length is too large to be represented with a usize. -Please limit stream length to less than 4.294Gb or run this program on a 64-bit computer architecture. -"#; - -impl futures_core::stream::Stream for Inner -where - B: http_body::Body, -{ - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().body.poll_data(cx) - } - fn size_hint(&self) -> (usize, Option) { + fn size_hint(&self) -> (u64, Option) + where + B: http_body::Body, + { let size_hint = http_body::Body::size_hint(&self.body); - let lower = size_hint.lower().try_into(); - let upper = size_hint.upper().map(|u| u.try_into()).transpose(); - - match (lower, upper) { - (Ok(lower), Ok(upper)) => (lower, upper), - (Err(_), _) | (_, Err(_)) => { - panic!("{}", SIZE_HINT_32_BIT_PANIC_MESSAGE) - } - } + (size_hint.lower(), size_hint.upper()) } } diff --git a/rust-runtime/aws-smithy-http/src/futures_stream_adapter.rs b/rust-runtime/aws-smithy-http/src/futures_stream_adapter.rs new file mode 100644 index 0000000000..0e718df493 --- /dev/null +++ b/rust-runtime/aws-smithy-http/src/futures_stream_adapter.rs @@ -0,0 +1,61 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +use crate::body::SdkBody; +use crate::byte_stream::error::Error as ByteStreamError; +use crate::byte_stream::ByteStream; +use bytes::Bytes; +use futures_core::stream::Stream; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// A new-type wrapper to enable the impl of the `futures_core::stream::Stream` trait +/// +/// [`ByteStream`] no longer implements `futures_core::stream::Stream` so we wrap it in the +/// new-type to enable the trait when it is required. +/// +/// This is meant to be used by codegen code, and users should not need to use it directly. +pub struct FuturesStreamCompatByteStream(ByteStream); + +impl FuturesStreamCompatByteStream { + /// Creates a new `FuturesStreamCompatByteStream` by wrapping `stream`. + pub fn new(stream: ByteStream) -> Self { + Self(stream) + } + + /// Returns [`SdkBody`] of the wrapped [`ByteStream`]. + pub fn into_inner(self) -> SdkBody { + self.0.into_inner() + } +} + +impl Stream for FuturesStreamCompatByteStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.0).poll_next(cx) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures_core::stream::Stream; + + fn check_compatible_with_hyper_wrap_stream(stream: S) -> S + where + S: Stream> + Send + 'static, + O: Into + 'static, + E: Into> + 'static, + { + stream + } + + #[test] + fn test_byte_stream_stream_can_be_made_compatible_with_hyper_wrap_stream() { + let stream = ByteStream::from_static(b"Hello world"); + check_compatible_with_hyper_wrap_stream(FuturesStreamCompatByteStream::new(stream)); + } +} diff --git a/rust-runtime/aws-smithy-http/src/lib.rs b/rust-runtime/aws-smithy-http/src/lib.rs index b24ecd3bf1..5a832e7973 100644 --- a/rust-runtime/aws-smithy-http/src/lib.rs +++ b/rust-runtime/aws-smithy-http/src/lib.rs @@ -27,6 +27,8 @@ pub mod body; pub mod endpoint; +#[doc(hidden)] +pub mod futures_stream_adapter; pub mod header; pub mod http; pub mod label; From 0362d9cd9a90cc128b04d2651f1e272889ee26fe Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Tue, 12 Sep 2023 16:43:07 -0500 Subject: [PATCH 09/25] Make rendering stream payload serializer configurable --- .../rust/codegen/core/smithy/RuntimeType.kt | 3 +- .../HttpBoundProtocolPayloadGenerator.kt | 62 +++++++++++++++---- 2 files changed, 52 insertions(+), 13 deletions(-) diff --git a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/RuntimeType.kt b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/RuntimeType.kt index c58596ad94..f7e463b0a7 100644 --- a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/RuntimeType.kt +++ b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/RuntimeType.kt @@ -407,9 +407,10 @@ data class RuntimeType(val path: String, val dependency: RustDependency? = null) fun retryErrorKind(runtimeConfig: RuntimeConfig) = smithyTypes(runtimeConfig).resolve("retry::ErrorKind") fun eventStreamReceiver(runtimeConfig: RuntimeConfig): RuntimeType = smithyHttp(runtimeConfig).resolve("event_stream::Receiver") - fun eventStreamSender(runtimeConfig: RuntimeConfig): RuntimeType = smithyHttp(runtimeConfig).resolve("event_stream::EventStreamSender") + fun futuresStreamCompatByteStream(runtimeConfig: RuntimeConfig): RuntimeType = + smithyHttp(runtimeConfig).resolve("futures_stream_adapter::FuturesStreamCompatByteStream") fun errorMetadata(runtimeConfig: RuntimeConfig) = smithyTypes(runtimeConfig).resolve("error::ErrorMetadata") fun errorMetadataBuilder(runtimeConfig: RuntimeConfig) = diff --git a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/protocols/HttpBoundProtocolPayloadGenerator.kt b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/protocols/HttpBoundProtocolPayloadGenerator.kt index 0ed594fc28..fbfc33cca0 100644 --- a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/protocols/HttpBoundProtocolPayloadGenerator.kt +++ b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/protocols/HttpBoundProtocolPayloadGenerator.kt @@ -6,6 +6,7 @@ package software.amazon.smithy.rust.codegen.core.smithy.protocols import software.amazon.smithy.codegen.core.CodegenException +import software.amazon.smithy.codegen.core.SymbolProvider import software.amazon.smithy.model.shapes.BlobShape import software.amazon.smithy.model.shapes.DocumentShape import software.amazon.smithy.model.shapes.MemberShape @@ -17,12 +18,14 @@ import software.amazon.smithy.model.traits.EnumTrait import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency import software.amazon.smithy.rust.codegen.core.rustlang.RustWriter import software.amazon.smithy.rust.codegen.core.rustlang.rust -import software.amazon.smithy.rust.codegen.core.rustlang.rustBlockTemplate +import software.amazon.smithy.rust.codegen.core.rustlang.rustTemplate import software.amazon.smithy.rust.codegen.core.rustlang.withBlock import software.amazon.smithy.rust.codegen.core.rustlang.withBlockTemplate import software.amazon.smithy.rust.codegen.core.smithy.CodegenContext import software.amazon.smithy.rust.codegen.core.smithy.CodegenTarget +import software.amazon.smithy.rust.codegen.core.smithy.RuntimeConfig import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType +import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType.Companion.preludeScope import software.amazon.smithy.rust.codegen.core.smithy.generators.http.HttpMessageType import software.amazon.smithy.rust.codegen.core.smithy.generators.operationBuildError import software.amazon.smithy.rust.codegen.core.smithy.generators.protocol.AdditionalPayloadContext @@ -50,11 +53,38 @@ data class EventStreamBodyParams( val additionalPayloadContext: AdditionalPayloadContext, ) +data class StreamPayloadSerializerParams( + val symbolProvider: SymbolProvider, + val runtimeConfig: RuntimeConfig, + val member: MemberShape, + val payloadName: String?, +) + +/** + * An interface to help customize how to render a stream payload serializer. + * + * When the output of the serializer is passed to `hyper::body::Body::wrap_stream`, + * it requires what's passed to implement `futures_core::stream::Stream` trait. + * However, a certain type, such as `aws_smithy_http::byte_stream::ByteStream` does not + * implement the trait, so we need to wrap it with a new-type that does implement the trait. + * + * Each implementing type of the interface can choose whether the payload should be wrapped + * with such a new-type or should simply be used as-is. + */ +interface StreamPayloadSerializerRenderer { + /** Renders the return type of stream payload serializer **/ + fun renderOutputType(writer: RustWriter, params: StreamPayloadSerializerParams) + + /** Renders the stream payload **/ + fun renderPayload(writer: RustWriter, params: StreamPayloadSerializerParams) +} + class HttpBoundProtocolPayloadGenerator( codegenContext: CodegenContext, private val protocol: Protocol, private val httpMessageType: HttpMessageType = HttpMessageType.REQUEST, private val renderEventStreamBody: (RustWriter, EventStreamBodyParams) -> Unit, + private val streamPayloadSerializerRenderer: StreamPayloadSerializerRenderer, ) : ProtocolPayloadGenerator { private val symbolProvider = codegenContext.symbolProvider private val model = codegenContext.model @@ -63,6 +93,7 @@ class HttpBoundProtocolPayloadGenerator( private val httpBindingResolver = protocol.httpBindingResolver private val smithyEventStream = RuntimeType.smithyEventStream(runtimeConfig) private val codegenScope = arrayOf( + *preludeScope, "hyper" to CargoDependency.HyperWithStream.toType(), "SdkBody" to RuntimeType.sdkBody(runtimeConfig), "BuildError" to runtimeConfig.operationBuildError(), @@ -238,17 +269,22 @@ class HttpBoundProtocolPayloadGenerator( ) { val ref = if (payloadMetadata.takesOwnership) "" else "&" val serializer = protocolFunctions.serializeFn(member, fnNameSuffix = "http_payload") { fnName -> - val outputT = if (member.isStreaming(model)) { - symbolProvider.toSymbol(member) - } else { - RuntimeType.ByteSlab.toSymbol() - } - rustBlockTemplate( - "pub fn $fnName(payload: $ref#{Member}) -> Result<#{outputT}, #{BuildError}>", + rustTemplate( + "pub(crate) fn $fnName(payload: $ref#{Member}) -> #{Result}<", "Member" to symbolProvider.toSymbol(member), - "outputT" to outputT, *codegenScope, - ) { + ) + if (member.isStreaming(model)) { + streamPayloadSerializerRenderer.renderOutputType( + this, + StreamPayloadSerializerParams(symbolProvider, runtimeConfig, member, null), + ) + } else { + rust("#T", RuntimeType.ByteSlab.toSymbol()) + } + rustTemplate(", #{BuildError}>", *codegenScope) + + withBlockTemplate("{", "}", *codegenScope) { val asRef = if (payloadMetadata.takesOwnership) "" else ".as_ref()" if (symbolProvider.toSymbol(member).isOptional()) { @@ -303,8 +339,10 @@ class HttpBoundProtocolPayloadGenerator( is BlobShape -> { // Write the raw blob to the payload. if (member.isStreaming(model)) { - // Return the `ByteStream`. - rust(payloadName) + streamPayloadSerializerRenderer.renderPayload( + this, + StreamPayloadSerializerParams(symbolProvider, runtimeConfig, member, payloadName), + ) } else { // Convert the `Blob` into a `Vec` and return it. rust("$payloadName.into_inner()") From 0099bdba694d5fa4bb59d4c70f5b3a6131bbc515 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Tue, 12 Sep 2023 16:43:47 -0500 Subject: [PATCH 10/25] Let client configure how to render stream payload serializer --- .../protocols/HttpBoundProtocolGenerator.kt | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/protocols/HttpBoundProtocolGenerator.kt b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/protocols/HttpBoundProtocolGenerator.kt index 1f60c251ac..6279d7ebb7 100644 --- a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/protocols/HttpBoundProtocolGenerator.kt +++ b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/protocols/HttpBoundProtocolGenerator.kt @@ -7,12 +7,32 @@ package software.amazon.smithy.rust.codegen.client.smithy.protocols import software.amazon.smithy.rust.codegen.client.smithy.ClientCodegenContext import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency +import software.amazon.smithy.rust.codegen.core.rustlang.RustWriter +import software.amazon.smithy.rust.codegen.core.rustlang.rust import software.amazon.smithy.rust.codegen.core.rustlang.rustTemplate import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType import software.amazon.smithy.rust.codegen.core.smithy.generators.http.HttpMessageType import software.amazon.smithy.rust.codegen.core.smithy.generators.protocol.ProtocolPayloadGenerator import software.amazon.smithy.rust.codegen.core.smithy.protocols.HttpBoundProtocolPayloadGenerator import software.amazon.smithy.rust.codegen.core.smithy.protocols.Protocol +import software.amazon.smithy.rust.codegen.core.smithy.protocols.StreamPayloadSerializerParams +import software.amazon.smithy.rust.codegen.core.smithy.protocols.StreamPayloadSerializerRenderer + +private class ClientStreamPayloadSerializerRenderer : StreamPayloadSerializerRenderer { + override fun renderOutputType(writer: RustWriter, params: StreamPayloadSerializerParams) { + writer.rust( + "#T", + RuntimeType.futuresStreamCompatByteStream(params.runtimeConfig).toSymbol(), + ) + } + + override fun renderPayload(writer: RustWriter, params: StreamPayloadSerializerParams) { + writer.rust( + "#T::new(${params.payloadName!!})", + RuntimeType.futuresStreamCompatByteStream(params.runtimeConfig), + ) + } +} class ClientHttpBoundProtocolPayloadGenerator( codegenContext: ClientCodegenContext, @@ -41,4 +61,5 @@ class ClientHttpBoundProtocolPayloadGenerator( "errorMarshallerConstructorFn" to params.errorMarshallerConstructorFn, ) }, + ClientStreamPayloadSerializerRenderer(), ) From 03e9278b98adcca613840a44195d3733046cb48b Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Tue, 12 Sep 2023 16:44:05 -0500 Subject: [PATCH 11/25] Let server configure how to render stream payload serializer --- .../protocols/PythonServerProtocolLoader.kt | 26 +++++++++ .../ServerHttpBoundProtocolGenerator.kt | 55 ++++++++++++++++++- .../smithy/protocols/ServerProtocolLoader.kt | 47 ++++++++++++++-- .../smithy/protocols/ServerRestXmlFactory.kt | 10 +++- .../src/types.rs | 1 - 5 files changed, 130 insertions(+), 9 deletions(-) diff --git a/codegen-server/python/src/main/kotlin/software/amazon/smithy/rust/codegen/server/python/smithy/protocols/PythonServerProtocolLoader.kt b/codegen-server/python/src/main/kotlin/software/amazon/smithy/rust/codegen/server/python/smithy/protocols/PythonServerProtocolLoader.kt index f84d35e7cb..5569c92526 100644 --- a/codegen-server/python/src/main/kotlin/software/amazon/smithy/rust/codegen/server/python/smithy/protocols/PythonServerProtocolLoader.kt +++ b/codegen-server/python/src/main/kotlin/software/amazon/smithy/rust/codegen/server/python/smithy/protocols/PythonServerProtocolLoader.kt @@ -61,6 +61,29 @@ class PythonServerAfterDeserializedMemberServerHttpBoundCustomization() : is ServerHttpBoundProtocolSection.AfterTimestampDeserializedMember -> writable { rust(".into()") } + + else -> emptySection + } +} + +/** + * Customization class used to determine the type of serialized stream payload and how it should be wrapped in a + * new-type wrapper to enable `futures_core::stream::Stream` trait. + */ +class PythonServerStreamPayloadSerializerCustomization() : ServerHttpBoundProtocolCustomization() { + override fun section(section: ServerHttpBoundProtocolSection): Writable = when (section) { + is ServerHttpBoundProtocolSection.TypeOfSerializedStreamPayload -> writable { + // `aws_smithy_http_server_python::types::ByteStream` already implements + // `futures::stream::Stream`, so no need to wrap it in a futures' stream-compatible + // wrapper. + rust("#T", section.params.symbolProvider.toSymbol(section.params.member)) + } + + is ServerHttpBoundProtocolSection.WrapStreamPayload -> writable { + // payloadName is always non-null within WrapStreamAfterPayloadGenerated + rust(section.params.payloadName!!) + } + else -> emptySection } } @@ -91,6 +114,7 @@ class PythonServerProtocolLoader( ), additionalServerHttpBoundProtocolCustomizations = listOf( PythonServerAfterDeserializedMemberServerHttpBoundCustomization(), + PythonServerStreamPayloadSerializerCustomization(), ), additionalHttpBindingCustomizations = listOf( PythonServerAfterDeserializedMemberHttpBindingCustomization(runtimeConfig), @@ -103,6 +127,7 @@ class PythonServerProtocolLoader( ), additionalServerHttpBoundProtocolCustomizations = listOf( PythonServerAfterDeserializedMemberServerHttpBoundCustomization(), + PythonServerStreamPayloadSerializerCustomization(), ), additionalHttpBindingCustomizations = listOf( PythonServerAfterDeserializedMemberHttpBindingCustomization(runtimeConfig), @@ -115,6 +140,7 @@ class PythonServerProtocolLoader( ), additionalServerHttpBoundProtocolCustomizations = listOf( PythonServerAfterDeserializedMemberServerHttpBoundCustomization(), + PythonServerStreamPayloadSerializerCustomization(), ), additionalHttpBindingCustomizations = listOf( PythonServerAfterDeserializedMemberHttpBindingCustomization(runtimeConfig), diff --git a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerHttpBoundProtocolGenerator.kt b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerHttpBoundProtocolGenerator.kt index c579d64415..22a183fae7 100644 --- a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerHttpBoundProtocolGenerator.kt +++ b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerHttpBoundProtocolGenerator.kt @@ -57,6 +57,8 @@ import software.amazon.smithy.rust.codegen.core.smithy.protocols.HttpBoundProtoc import software.amazon.smithy.rust.codegen.core.smithy.protocols.HttpLocation import software.amazon.smithy.rust.codegen.core.smithy.protocols.Protocol import software.amazon.smithy.rust.codegen.core.smithy.protocols.ProtocolFunctions +import software.amazon.smithy.rust.codegen.core.smithy.protocols.StreamPayloadSerializerParams +import software.amazon.smithy.rust.codegen.core.smithy.protocols.StreamPayloadSerializerRenderer import software.amazon.smithy.rust.codegen.core.smithy.protocols.parse.StructuredDataParserGenerator import software.amazon.smithy.rust.codegen.core.smithy.traits.SyntheticInputTrait import software.amazon.smithy.rust.codegen.core.smithy.transformers.operationErrors @@ -86,6 +88,23 @@ import java.util.logging.Logger */ sealed class ServerHttpBoundProtocolSection(name: String) : Section(name) { data class AfterTimestampDeserializedMember(val shape: MemberShape) : ServerHttpBoundProtocolSection("AfterTimestampDeserializedMember") + + /** + * Represent a section for rendering the return type of serialized stream payload. + * + * When overriding the `section` method, this should render [Symbol] for that return type. + */ + data class TypeOfSerializedStreamPayload(val params: StreamPayloadSerializerParams) : + ServerHttpBoundProtocolSection("TypeOfSerializedStreamPayload") + + /** + * Represent a section for rendering the serialized stream payload. + * + * When overriding the `section` method, this should render either the payload as-is or the payload wrapped + * with a new-type that implements the `futures_core::stream::Stream` trait. + */ + data class WrapStreamPayload(val params: StreamPayloadSerializerParams) : + ServerHttpBoundProtocolSection("WrapStreamPayload") } /** @@ -114,9 +133,40 @@ class ServerHttpBoundProtocolGenerator( } } +/** + * Server implementation of the [StreamPayloadSerializerRenderer] interface. + * + * The implementation of each method is delegated to [customizations]. Regular server codegen and python server + * have different requirements for how to render stream payload serializers, and they express their requirements + * through customizations, specifically with [TypeOfSerializedStreamPayload] and [WrapStreamPayload]. + */ +private class ServerStreamPayloadSerializerRenderer(private val customizations: List) : + StreamPayloadSerializerRenderer { + override fun renderOutputType(writer: RustWriter, params: StreamPayloadSerializerParams) { + for (customization in customizations) { + customization.section( + ServerHttpBoundProtocolSection.TypeOfSerializedStreamPayload( + params, + ), + )(writer) + } + } + + override fun renderPayload(writer: RustWriter, params: StreamPayloadSerializerParams) { + for (customization in customizations) { + customization.section( + ServerHttpBoundProtocolSection.WrapStreamPayload( + params, + ), + )(writer) + } + } +} + class ServerHttpBoundProtocolPayloadGenerator( codegenContext: CodegenContext, protocol: Protocol, + customizations: List = listOf(), ) : ProtocolPayloadGenerator by HttpBoundProtocolPayloadGenerator( codegenContext, protocol, HttpMessageType.RESPONSE, renderEventStreamBody = { writer, params -> @@ -137,6 +187,7 @@ class ServerHttpBoundProtocolPayloadGenerator( "errorMarshallerConstructorFn" to params.errorMarshallerConstructorFn, ) }, + ServerStreamPayloadSerializerRenderer(customizations), ) /* @@ -538,7 +589,7 @@ class ServerHttpBoundProtocolTraitImplGenerator( ?: serverRenderHttpResponseCode(httpTraitStatusCode)(this) operationShape.outputShape(model).findStreamingMember(model)?.let { - val payloadGenerator = ServerHttpBoundProtocolPayloadGenerator(codegenContext, protocol) + val payloadGenerator = ServerHttpBoundProtocolPayloadGenerator(codegenContext, protocol, customizations) withBlockTemplate("let body = #{SmithyHttpServer}::body::boxed(#{SmithyHttpServer}::body::Body::wrap_stream(", "));", *codegenScope) { payloadGenerator.generatePayload(this, "output", operationShape) } @@ -707,7 +758,7 @@ class ServerHttpBoundProtocolTraitImplGenerator( rustTemplate( """ #{SmithyHttpServer}::protocol::content_type_header_classifier( - &parts.headers, + &parts.headers, Some("$expectedRequestContentType"), )?; input = #{parser}(bytes.as_ref(), input)?; diff --git a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerProtocolLoader.kt b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerProtocolLoader.kt index e52d9e3a3b..e13220687d 100644 --- a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerProtocolLoader.kt +++ b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerProtocolLoader.kt @@ -9,21 +9,60 @@ import software.amazon.smithy.aws.traits.protocols.AwsJson1_0Trait import software.amazon.smithy.aws.traits.protocols.AwsJson1_1Trait import software.amazon.smithy.aws.traits.protocols.RestJson1Trait import software.amazon.smithy.aws.traits.protocols.RestXmlTrait +import software.amazon.smithy.rust.codegen.core.rustlang.Writable +import software.amazon.smithy.rust.codegen.core.rustlang.rust +import software.amazon.smithy.rust.codegen.core.rustlang.rustTemplate +import software.amazon.smithy.rust.codegen.core.rustlang.writable +import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType import software.amazon.smithy.rust.codegen.core.smithy.protocols.AwsJsonVersion import software.amazon.smithy.rust.codegen.core.smithy.protocols.ProtocolLoader import software.amazon.smithy.rust.codegen.core.smithy.protocols.ProtocolMap import software.amazon.smithy.rust.codegen.server.smithy.ServerCodegenContext import software.amazon.smithy.rust.codegen.server.smithy.generators.protocol.ServerProtocolGenerator +class StreamPayloadSerializerCustomization() : ServerHttpBoundProtocolCustomization() { + override fun section(section: ServerHttpBoundProtocolSection): Writable = when (section) { + is ServerHttpBoundProtocolSection.TypeOfSerializedStreamPayload -> writable { + rust( + "#T", + RuntimeType.futuresStreamCompatByteStream(section.params.runtimeConfig).toSymbol(), + ) + } + + is ServerHttpBoundProtocolSection.WrapStreamPayload -> writable { + rustTemplate( + "#{FuturesStreamCompatByteStream}::new(${section.params.payloadName!!})", + "FuturesStreamCompatByteStream" to RuntimeType.futuresStreamCompatByteStream(section.params.runtimeConfig), + ) + } + + else -> emptySection + } +} + class ServerProtocolLoader(supportedProtocols: ProtocolMap) : ProtocolLoader(supportedProtocols) { companion object { val DefaultProtocols = mapOf( - RestJson1Trait.ID to ServerRestJsonFactory(), - RestXmlTrait.ID to ServerRestXmlFactory(), - AwsJson1_0Trait.ID to ServerAwsJsonFactory(AwsJsonVersion.Json10), - AwsJson1_1Trait.ID to ServerAwsJsonFactory(AwsJsonVersion.Json11), + RestJson1Trait.ID to ServerRestJsonFactory( + additionalServerHttpBoundProtocolCustomizations = listOf( + StreamPayloadSerializerCustomization(), + ), + ), + RestXmlTrait.ID to ServerRestXmlFactory( + additionalServerHttpBoundProtocolCustomizations = listOf( + StreamPayloadSerializerCustomization(), + ), + ), + AwsJson1_0Trait.ID to ServerAwsJsonFactory( + AwsJsonVersion.Json10, + additionalServerHttpBoundProtocolCustomizations = listOf(StreamPayloadSerializerCustomization()), + ), + AwsJson1_1Trait.ID to ServerAwsJsonFactory( + AwsJsonVersion.Json11, + additionalServerHttpBoundProtocolCustomizations = listOf(StreamPayloadSerializerCustomization()), + ), ) } } diff --git a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerRestXmlFactory.kt b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerRestXmlFactory.kt index f5b3be454f..9207c56046 100644 --- a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerRestXmlFactory.kt +++ b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerRestXmlFactory.kt @@ -15,11 +15,17 @@ import software.amazon.smithy.rust.codegen.server.smithy.generators.protocol.Ser * RestXml server-side protocol factory. This factory creates the [ServerHttpProtocolGenerator] * with RestXml specific configurations. */ -class ServerRestXmlFactory : ProtocolGeneratorFactory { +class ServerRestXmlFactory( + private val additionalServerHttpBoundProtocolCustomizations: List = listOf(), +) : ProtocolGeneratorFactory { override fun protocol(codegenContext: ServerCodegenContext): Protocol = ServerRestXmlProtocol(codegenContext) override fun buildProtocolGenerator(codegenContext: ServerCodegenContext): ServerHttpBoundProtocolGenerator = - ServerHttpBoundProtocolGenerator(codegenContext, ServerRestXmlProtocol(codegenContext)) + ServerHttpBoundProtocolGenerator( + codegenContext, + ServerRestXmlProtocol(codegenContext), + additionalServerHttpBoundProtocolCustomizations, + ) override fun support(): ProtocolSupport { return ProtocolSupport( diff --git a/rust-runtime/aws-smithy-http-server-python/src/types.rs b/rust-runtime/aws-smithy-http-server-python/src/types.rs index a2fa308512..a274efe086 100644 --- a/rust-runtime/aws-smithy-http-server-python/src/types.rs +++ b/rust-runtime/aws-smithy-http-server-python/src/types.rs @@ -31,7 +31,6 @@ use pyo3::{ prelude::*, }; use tokio::{runtime::Handle, sync::Mutex}; -use tokio_stream::StreamExt; use crate::PyError; From b04b3ca4bf39eec8a8c1b725b942d1722eef7146 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Tue, 12 Sep 2023 19:16:10 -0500 Subject: [PATCH 12/25] Remove unused `tokio_stream::StreamExt` from `glacier_checksums` --- aws/rust-runtime/aws-inlineable/src/glacier_checksums.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/aws/rust-runtime/aws-inlineable/src/glacier_checksums.rs b/aws/rust-runtime/aws-inlineable/src/glacier_checksums.rs index 18f1d9219e..bf95910e00 100644 --- a/aws/rust-runtime/aws-inlineable/src/glacier_checksums.rs +++ b/aws/rust-runtime/aws-inlineable/src/glacier_checksums.rs @@ -14,7 +14,6 @@ use bytes::Buf; use bytes_utils::SegmentedBuf; use http::header::HeaderName; use ring::digest::{Context, Digest, SHA256}; -use tokio_stream::StreamExt; const TREE_HASH_HEADER: &str = "x-amz-sha256-tree-hash"; const X_AMZ_CONTENT_SHA256: &str = "x-amz-content-sha256"; From 6575943a8309dafca2828b80d7e7c6e1cad6778c Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Wed, 13 Sep 2023 10:48:09 -0500 Subject: [PATCH 13/25] Update CHANGELOG.next.toml --- CHANGELOG.next.toml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/CHANGELOG.next.toml b/CHANGELOG.next.toml index 261b7a2544..df210b45f1 100644 --- a/CHANGELOG.next.toml +++ b/CHANGELOG.next.toml @@ -162,3 +162,15 @@ message = "The `futures_core::stream::Stream` trait has been removed from public references = ["smithy-rs#2978"] meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "client" } author = "ysaito1001" + +[[aws-sdk-rust]] +message = "The `futures_core::stream::Stream` trait has been removed from [`ByteStream`](https://docs.rs/aws-smithy-http/latest/aws_smithy_http/byte_stream/struct.ByteStream.html). The methods mentioned in the [doc](https://docs.rs/aws-smithy-http/latest/aws_smithy_http/byte_stream/struct.ByteStream.html#getting-data-out-of-a-bytestream) will continue to be supported. Other stream operations made available through the trait or its extension traits are not supported, but we should be able to add them as needed in a backward compatible manner." +references = ["smithy-rs#2983"] +meta = { "breaking" = true, "tada" = false, "bug" = false } +author = "ysaito1001" + +[[smithy-rs]] +message = "The `futures_core::stream::Stream` trait has been removed from [`ByteStream`](https://docs.rs/aws-smithy-http/latest/aws_smithy_http/byte_stream/struct.ByteStream.html). The methods mentioned in the [doc](https://docs.rs/aws-smithy-http/latest/aws_smithy_http/byte_stream/struct.ByteStream.html#getting-data-out-of-a-bytestream) will continue to be supported. Other stream operations made available through the trait or its extension traits are not supported, but we should be able to add them as needed in a backward compatible manner." +references = ["smithy-rs#2983"] +meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "client" } +author = "ysaito1001" From 8147d81846261245b7d94efbe5e723f9ac5364e2 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Tue, 19 Sep 2023 10:17:48 -0500 Subject: [PATCH 14/25] Remove unnecessary `Unpin` bound This commit addresses https://github.com/awslabs/smithy-rs/pull/2978#discussion_r1327600053 --- .../aws-smithy-async/src/future/pagination_stream.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs index bf37ade49e..4011576fc2 100644 --- a/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs +++ b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs @@ -29,10 +29,7 @@ impl PaginationStream { } /// Consumes and returns the next `Item` from this stream. - pub async fn next(&mut self) -> Option - where - Self: Unpin, - { + pub async fn next(&mut self) -> Option { self.0.next().await } From 43318e7c36a98656a46b6a7199e29b9211ca5e80 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Wed, 20 Sep 2023 14:23:26 -0500 Subject: [PATCH 15/25] Add convenince method `try_collect` This commit addresses https://github.com/awslabs/smithy-rs/pull/2978#discussion_r1327605578 --- .../src/future/pagination_stream.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs index 4011576fc2..19534476cf 100644 --- a/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs +++ b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs @@ -44,6 +44,11 @@ impl PaginationStream> { pub async fn try_next(&mut self) -> Result, E> { self.next().await.transpose() } + + /// Convenience method for `.collect::, _>()`. + pub async fn try_collect(self) -> Result, E> { + self.collect::, E>>().await + } } pin_project! { /// Utility to drive a stream with an async function and a channel. @@ -149,6 +154,11 @@ impl FnStream> { pub async fn try_next(&mut self) -> Result, E> { self.next().await.transpose() } + + /// Convenience method for `.collect::, _>()`. + pub async fn try_collect(self) -> Result, E> { + self.collect::, E>>().await + } } /// Utility wrapper to flatten paginated results @@ -325,7 +335,7 @@ mod test { struct Output { items: Vec, } - let stream = FnStream::new(|tx| { + let stream: FnStream> = FnStream::new(|tx| { Box::pin(async move { tx.send(Ok(Output { items: vec![1, 2, 3], @@ -343,7 +353,7 @@ mod test { Ok(vec![1, 2, 3, 4, 5, 6]), TryFlatMap::new(PaginationStream::new(stream)) .flat_map(|output| output.items.into_iter()) - .collect::, &str>>() + .try_collect() .await, ); } @@ -368,7 +378,7 @@ mod test { Err("bummer"), TryFlatMap::new(PaginationStream::new(stream)) .flat_map(|output| output.items.into_iter()) - .collect::, &str>>() + .try_collect() .await ) } From e076f648bda1c52bb4ef932a80bdaaba8763504e Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Thu, 21 Sep 2023 15:17:33 -0500 Subject: [PATCH 16/25] Wrap stream payload in a new-type at call sites of serializer --- .../protocols/HttpBoundProtocolGenerator.kt | 21 ------ .../HttpBoundProtocolPayloadGenerator.kt | 60 ++++------------ .../protocols/PythonServerProtocolLoader.kt | 37 +++++----- .../ServerHttpBoundProtocolGenerator.kt | 69 +++++++------------ .../smithy/protocols/ServerProtocolLoader.kt | 34 +++++---- 5 files changed, 72 insertions(+), 149 deletions(-) diff --git a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/protocols/HttpBoundProtocolGenerator.kt b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/protocols/HttpBoundProtocolGenerator.kt index 6279d7ebb7..1f60c251ac 100644 --- a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/protocols/HttpBoundProtocolGenerator.kt +++ b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/protocols/HttpBoundProtocolGenerator.kt @@ -7,32 +7,12 @@ package software.amazon.smithy.rust.codegen.client.smithy.protocols import software.amazon.smithy.rust.codegen.client.smithy.ClientCodegenContext import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency -import software.amazon.smithy.rust.codegen.core.rustlang.RustWriter -import software.amazon.smithy.rust.codegen.core.rustlang.rust import software.amazon.smithy.rust.codegen.core.rustlang.rustTemplate import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType import software.amazon.smithy.rust.codegen.core.smithy.generators.http.HttpMessageType import software.amazon.smithy.rust.codegen.core.smithy.generators.protocol.ProtocolPayloadGenerator import software.amazon.smithy.rust.codegen.core.smithy.protocols.HttpBoundProtocolPayloadGenerator import software.amazon.smithy.rust.codegen.core.smithy.protocols.Protocol -import software.amazon.smithy.rust.codegen.core.smithy.protocols.StreamPayloadSerializerParams -import software.amazon.smithy.rust.codegen.core.smithy.protocols.StreamPayloadSerializerRenderer - -private class ClientStreamPayloadSerializerRenderer : StreamPayloadSerializerRenderer { - override fun renderOutputType(writer: RustWriter, params: StreamPayloadSerializerParams) { - writer.rust( - "#T", - RuntimeType.futuresStreamCompatByteStream(params.runtimeConfig).toSymbol(), - ) - } - - override fun renderPayload(writer: RustWriter, params: StreamPayloadSerializerParams) { - writer.rust( - "#T::new(${params.payloadName!!})", - RuntimeType.futuresStreamCompatByteStream(params.runtimeConfig), - ) - } -} class ClientHttpBoundProtocolPayloadGenerator( codegenContext: ClientCodegenContext, @@ -61,5 +41,4 @@ class ClientHttpBoundProtocolPayloadGenerator( "errorMarshallerConstructorFn" to params.errorMarshallerConstructorFn, ) }, - ClientStreamPayloadSerializerRenderer(), ) diff --git a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/protocols/HttpBoundProtocolPayloadGenerator.kt b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/protocols/HttpBoundProtocolPayloadGenerator.kt index fbfc33cca0..9b12cd978c 100644 --- a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/protocols/HttpBoundProtocolPayloadGenerator.kt +++ b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/protocols/HttpBoundProtocolPayloadGenerator.kt @@ -6,7 +6,6 @@ package software.amazon.smithy.rust.codegen.core.smithy.protocols import software.amazon.smithy.codegen.core.CodegenException -import software.amazon.smithy.codegen.core.SymbolProvider import software.amazon.smithy.model.shapes.BlobShape import software.amazon.smithy.model.shapes.DocumentShape import software.amazon.smithy.model.shapes.MemberShape @@ -18,12 +17,11 @@ import software.amazon.smithy.model.traits.EnumTrait import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency import software.amazon.smithy.rust.codegen.core.rustlang.RustWriter import software.amazon.smithy.rust.codegen.core.rustlang.rust -import software.amazon.smithy.rust.codegen.core.rustlang.rustTemplate +import software.amazon.smithy.rust.codegen.core.rustlang.rustBlockTemplate import software.amazon.smithy.rust.codegen.core.rustlang.withBlock import software.amazon.smithy.rust.codegen.core.rustlang.withBlockTemplate import software.amazon.smithy.rust.codegen.core.smithy.CodegenContext import software.amazon.smithy.rust.codegen.core.smithy.CodegenTarget -import software.amazon.smithy.rust.codegen.core.smithy.RuntimeConfig import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType.Companion.preludeScope import software.amazon.smithy.rust.codegen.core.smithy.generators.http.HttpMessageType @@ -53,38 +51,11 @@ data class EventStreamBodyParams( val additionalPayloadContext: AdditionalPayloadContext, ) -data class StreamPayloadSerializerParams( - val symbolProvider: SymbolProvider, - val runtimeConfig: RuntimeConfig, - val member: MemberShape, - val payloadName: String?, -) - -/** - * An interface to help customize how to render a stream payload serializer. - * - * When the output of the serializer is passed to `hyper::body::Body::wrap_stream`, - * it requires what's passed to implement `futures_core::stream::Stream` trait. - * However, a certain type, such as `aws_smithy_http::byte_stream::ByteStream` does not - * implement the trait, so we need to wrap it with a new-type that does implement the trait. - * - * Each implementing type of the interface can choose whether the payload should be wrapped - * with such a new-type or should simply be used as-is. - */ -interface StreamPayloadSerializerRenderer { - /** Renders the return type of stream payload serializer **/ - fun renderOutputType(writer: RustWriter, params: StreamPayloadSerializerParams) - - /** Renders the stream payload **/ - fun renderPayload(writer: RustWriter, params: StreamPayloadSerializerParams) -} - class HttpBoundProtocolPayloadGenerator( codegenContext: CodegenContext, private val protocol: Protocol, private val httpMessageType: HttpMessageType = HttpMessageType.REQUEST, private val renderEventStreamBody: (RustWriter, EventStreamBodyParams) -> Unit, - private val streamPayloadSerializerRenderer: StreamPayloadSerializerRenderer, ) : ProtocolPayloadGenerator { private val symbolProvider = codegenContext.symbolProvider private val model = codegenContext.model @@ -269,22 +240,17 @@ class HttpBoundProtocolPayloadGenerator( ) { val ref = if (payloadMetadata.takesOwnership) "" else "&" val serializer = protocolFunctions.serializeFn(member, fnNameSuffix = "http_payload") { fnName -> - rustTemplate( - "pub(crate) fn $fnName(payload: $ref#{Member}) -> #{Result}<", - "Member" to symbolProvider.toSymbol(member), - *codegenScope, - ) - if (member.isStreaming(model)) { - streamPayloadSerializerRenderer.renderOutputType( - this, - StreamPayloadSerializerParams(symbolProvider, runtimeConfig, member, null), - ) + val outputT = if (member.isStreaming(model)) { + symbolProvider.toSymbol(member) } else { - rust("#T", RuntimeType.ByteSlab.toSymbol()) + RuntimeType.ByteSlab.toSymbol() } - rustTemplate(", #{BuildError}>", *codegenScope) - - withBlockTemplate("{", "}", *codegenScope) { + rustBlockTemplate( + "pub fn $fnName(payload: $ref#{Member}) -> Result<#{outputT}, #{BuildError}>", + "Member" to symbolProvider.toSymbol(member), + "outputT" to outputT, + *codegenScope, + ) { val asRef = if (payloadMetadata.takesOwnership) "" else ".as_ref()" if (symbolProvider.toSymbol(member).isOptional()) { @@ -339,10 +305,8 @@ class HttpBoundProtocolPayloadGenerator( is BlobShape -> { // Write the raw blob to the payload. if (member.isStreaming(model)) { - streamPayloadSerializerRenderer.renderPayload( - this, - StreamPayloadSerializerParams(symbolProvider, runtimeConfig, member, payloadName), - ) + // Return the `ByteStream`. + rust(payloadName) } else { // Convert the `Blob` into a `Vec` and return it. rust("$payloadName.into_inner()") diff --git a/codegen-server/python/src/main/kotlin/software/amazon/smithy/rust/codegen/server/python/smithy/protocols/PythonServerProtocolLoader.kt b/codegen-server/python/src/main/kotlin/software/amazon/smithy/rust/codegen/server/python/smithy/protocols/PythonServerProtocolLoader.kt index 5569c92526..dbfa9e029d 100644 --- a/codegen-server/python/src/main/kotlin/software/amazon/smithy/rust/codegen/server/python/smithy/protocols/PythonServerProtocolLoader.kt +++ b/codegen-server/python/src/main/kotlin/software/amazon/smithy/rust/codegen/server/python/smithy/protocols/PythonServerProtocolLoader.kt @@ -67,36 +67,31 @@ class PythonServerAfterDeserializedMemberServerHttpBoundCustomization() : } /** - * Customization class used to determine the type of serialized stream payload and how it should be wrapped in a - * new-type wrapper to enable `futures_core::stream::Stream` trait. + * Customization class used to force casting a `Vec` into one a Python `Vec` */ -class PythonServerStreamPayloadSerializerCustomization() : ServerHttpBoundProtocolCustomization() { - override fun section(section: ServerHttpBoundProtocolSection): Writable = when (section) { - is ServerHttpBoundProtocolSection.TypeOfSerializedStreamPayload -> writable { - // `aws_smithy_http_server_python::types::ByteStream` already implements - // `futures::stream::Stream`, so no need to wrap it in a futures' stream-compatible - // wrapper. - rust("#T", section.params.symbolProvider.toSymbol(section.params.member)) - } - - is ServerHttpBoundProtocolSection.WrapStreamPayload -> writable { - // payloadName is always non-null within WrapStreamAfterPayloadGenerated - rust(section.params.payloadName!!) +class PythonServerAfterDeserializedMemberHttpBindingCustomization(private val runtimeConfig: RuntimeConfig) : + HttpBindingCustomization() { + override fun section(section: HttpBindingSection): Writable = when (section) { + is HttpBindingSection.AfterDeserializingIntoADateTimeOfHttpHeaders -> writable { + rust(".into_iter().map(#T::from).collect()", PythonServerRuntimeType.dateTime(runtimeConfig).toSymbol()) } - else -> emptySection } } /** - * Customization class used to force casting a `Vec` into one a Python `Vec` + * Customization class used to determine how serialized stream payload should be rendered for the Python server. + * + * In this customization, we do not need to wrap the payload in a new-type wrapper to enable the + * `futures_core::stream::Stream` trait since the payload in question has a type + * `aws_smithy_http_server_python::types::ByteStream` which already implements the `Stream` trait. */ -class PythonServerAfterDeserializedMemberHttpBindingCustomization(private val runtimeConfig: RuntimeConfig) : - HttpBindingCustomization() { - override fun section(section: HttpBindingSection): Writable = when (section) { - is HttpBindingSection.AfterDeserializingIntoADateTimeOfHttpHeaders -> writable { - rust(".into_iter().map(#T::from).collect()", PythonServerRuntimeType.dateTime(runtimeConfig).toSymbol()) +class PythonServerStreamPayloadSerializerCustomization() : ServerHttpBoundProtocolCustomization() { + override fun section(section: ServerHttpBoundProtocolSection): Writable = when (section) { + is ServerHttpBoundProtocolSection.WrapStreamPayload -> writable { + section.params.payloadGenerator.generatePayload(this, section.params.shapeName, section.params.shape) } + else -> emptySection } } diff --git a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerHttpBoundProtocolGenerator.kt b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerHttpBoundProtocolGenerator.kt index 22a183fae7..11a9a9558b 100644 --- a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerHttpBoundProtocolGenerator.kt +++ b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerHttpBoundProtocolGenerator.kt @@ -57,8 +57,6 @@ import software.amazon.smithy.rust.codegen.core.smithy.protocols.HttpBoundProtoc import software.amazon.smithy.rust.codegen.core.smithy.protocols.HttpLocation import software.amazon.smithy.rust.codegen.core.smithy.protocols.Protocol import software.amazon.smithy.rust.codegen.core.smithy.protocols.ProtocolFunctions -import software.amazon.smithy.rust.codegen.core.smithy.protocols.StreamPayloadSerializerParams -import software.amazon.smithy.rust.codegen.core.smithy.protocols.StreamPayloadSerializerRenderer import software.amazon.smithy.rust.codegen.core.smithy.protocols.parse.StructuredDataParserGenerator import software.amazon.smithy.rust.codegen.core.smithy.traits.SyntheticInputTrait import software.amazon.smithy.rust.codegen.core.smithy.transformers.operationErrors @@ -83,25 +81,25 @@ import software.amazon.smithy.rust.codegen.server.smithy.generators.protocol.Ser import software.amazon.smithy.rust.codegen.server.smithy.generators.serverBuilderSymbol import java.util.logging.Logger +data class StreamPayloadSerializerParams( + val codegenContext: ServerCodegenContext, + val payloadGenerator: ServerHttpBoundProtocolPayloadGenerator, + val shapeName: String, + val shape: OperationShape, +) + /** * Class describing a ServerHttpBoundProtocol section that can be used in a customization. */ sealed class ServerHttpBoundProtocolSection(name: String) : Section(name) { data class AfterTimestampDeserializedMember(val shape: MemberShape) : ServerHttpBoundProtocolSection("AfterTimestampDeserializedMember") - /** - * Represent a section for rendering the return type of serialized stream payload. - * - * When overriding the `section` method, this should render [Symbol] for that return type. - */ - data class TypeOfSerializedStreamPayload(val params: StreamPayloadSerializerParams) : - ServerHttpBoundProtocolSection("TypeOfSerializedStreamPayload") - /** * Represent a section for rendering the serialized stream payload. * - * When overriding the `section` method, this should render either the payload as-is or the payload wrapped - * with a new-type that implements the `futures_core::stream::Stream` trait. + * If the payload does not implement the `futures_core::stream::Stream`, which is the case for + * `aws_smithy_http::byte_stream::ByteStream`, the section needs to be overridden and renders a new-type wrapper + * around the payload to enable the `Stream` trait. */ data class WrapStreamPayload(val params: StreamPayloadSerializerParams) : ServerHttpBoundProtocolSection("WrapStreamPayload") @@ -133,40 +131,9 @@ class ServerHttpBoundProtocolGenerator( } } -/** - * Server implementation of the [StreamPayloadSerializerRenderer] interface. - * - * The implementation of each method is delegated to [customizations]. Regular server codegen and python server - * have different requirements for how to render stream payload serializers, and they express their requirements - * through customizations, specifically with [TypeOfSerializedStreamPayload] and [WrapStreamPayload]. - */ -private class ServerStreamPayloadSerializerRenderer(private val customizations: List) : - StreamPayloadSerializerRenderer { - override fun renderOutputType(writer: RustWriter, params: StreamPayloadSerializerParams) { - for (customization in customizations) { - customization.section( - ServerHttpBoundProtocolSection.TypeOfSerializedStreamPayload( - params, - ), - )(writer) - } - } - - override fun renderPayload(writer: RustWriter, params: StreamPayloadSerializerParams) { - for (customization in customizations) { - customization.section( - ServerHttpBoundProtocolSection.WrapStreamPayload( - params, - ), - )(writer) - } - } -} - class ServerHttpBoundProtocolPayloadGenerator( codegenContext: CodegenContext, protocol: Protocol, - customizations: List = listOf(), ) : ProtocolPayloadGenerator by HttpBoundProtocolPayloadGenerator( codegenContext, protocol, HttpMessageType.RESPONSE, renderEventStreamBody = { writer, params -> @@ -187,7 +154,6 @@ class ServerHttpBoundProtocolPayloadGenerator( "errorMarshallerConstructorFn" to params.errorMarshallerConstructorFn, ) }, - ServerStreamPayloadSerializerRenderer(customizations), ) /* @@ -589,9 +555,20 @@ class ServerHttpBoundProtocolTraitImplGenerator( ?: serverRenderHttpResponseCode(httpTraitStatusCode)(this) operationShape.outputShape(model).findStreamingMember(model)?.let { - val payloadGenerator = ServerHttpBoundProtocolPayloadGenerator(codegenContext, protocol, customizations) + val payloadGenerator = ServerHttpBoundProtocolPayloadGenerator(codegenContext, protocol) withBlockTemplate("let body = #{SmithyHttpServer}::body::boxed(#{SmithyHttpServer}::body::Body::wrap_stream(", "));", *codegenScope) { - payloadGenerator.generatePayload(this, "output", operationShape) + for (customization in customizations) { + customization.section( + ServerHttpBoundProtocolSection.WrapStreamPayload( + StreamPayloadSerializerParams( + codegenContext, + payloadGenerator, + "output", + operationShape, + ), + ), + )(this) + } } } ?: run { val payloadGenerator = ServerHttpBoundProtocolPayloadGenerator(codegenContext, protocol) diff --git a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerProtocolLoader.kt b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerProtocolLoader.kt index e13220687d..92dde3b2f6 100644 --- a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerProtocolLoader.kt +++ b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerProtocolLoader.kt @@ -10,30 +10,38 @@ import software.amazon.smithy.aws.traits.protocols.AwsJson1_1Trait import software.amazon.smithy.aws.traits.protocols.RestJson1Trait import software.amazon.smithy.aws.traits.protocols.RestXmlTrait import software.amazon.smithy.rust.codegen.core.rustlang.Writable -import software.amazon.smithy.rust.codegen.core.rustlang.rust -import software.amazon.smithy.rust.codegen.core.rustlang.rustTemplate +import software.amazon.smithy.rust.codegen.core.rustlang.withBlockTemplate import software.amazon.smithy.rust.codegen.core.rustlang.writable import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType import software.amazon.smithy.rust.codegen.core.smithy.protocols.AwsJsonVersion import software.amazon.smithy.rust.codegen.core.smithy.protocols.ProtocolLoader import software.amazon.smithy.rust.codegen.core.smithy.protocols.ProtocolMap +import software.amazon.smithy.rust.codegen.core.util.isOutputEventStream import software.amazon.smithy.rust.codegen.server.smithy.ServerCodegenContext import software.amazon.smithy.rust.codegen.server.smithy.generators.protocol.ServerProtocolGenerator class StreamPayloadSerializerCustomization() : ServerHttpBoundProtocolCustomization() { override fun section(section: ServerHttpBoundProtocolSection): Writable = when (section) { - is ServerHttpBoundProtocolSection.TypeOfSerializedStreamPayload -> writable { - rust( - "#T", - RuntimeType.futuresStreamCompatByteStream(section.params.runtimeConfig).toSymbol(), - ) - } - is ServerHttpBoundProtocolSection.WrapStreamPayload -> writable { - rustTemplate( - "#{FuturesStreamCompatByteStream}::new(${section.params.payloadName!!})", - "FuturesStreamCompatByteStream" to RuntimeType.futuresStreamCompatByteStream(section.params.runtimeConfig), - ) + if (section.params.shape.isOutputEventStream(section.params.codegenContext.model)) { + // Event stream payload, of type `aws_smithy_http::event_stream::MessageStreamAdapter`, already + // implements the `Stream` trait, so no need to wrap it in the new-type. + section.params.payloadGenerator.generatePayload(this, section.params.shapeName, section.params.shape) + } else { + // Otherwise, the stream payload is `aws_smithy_http::byte_stream::ByteStream`. We wrap it in the + // new-type to enable the `Stream` trait. + withBlockTemplate( + "#{FuturesStreamCompatByteStream}::new(", + ")", + "FuturesStreamCompatByteStream" to RuntimeType.futuresStreamCompatByteStream(section.params.codegenContext.runtimeConfig), + ) { + section.params.payloadGenerator.generatePayload( + this, + section.params.shapeName, + section.params.shape, + ) + } + } } else -> emptySection From 5381cfe2295fb51873c9d2075f5adca0dd316048 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Thu, 21 Sep 2023 15:51:44 -0500 Subject: [PATCH 17/25] Revert HttpBoundProtocolPayloadGenerator.kt since no change is needed --- .../core/smithy/protocols/HttpBoundProtocolPayloadGenerator.kt | 2 -- 1 file changed, 2 deletions(-) diff --git a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/protocols/HttpBoundProtocolPayloadGenerator.kt b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/protocols/HttpBoundProtocolPayloadGenerator.kt index 9b12cd978c..0ed594fc28 100644 --- a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/protocols/HttpBoundProtocolPayloadGenerator.kt +++ b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/protocols/HttpBoundProtocolPayloadGenerator.kt @@ -23,7 +23,6 @@ import software.amazon.smithy.rust.codegen.core.rustlang.withBlockTemplate import software.amazon.smithy.rust.codegen.core.smithy.CodegenContext import software.amazon.smithy.rust.codegen.core.smithy.CodegenTarget import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType -import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType.Companion.preludeScope import software.amazon.smithy.rust.codegen.core.smithy.generators.http.HttpMessageType import software.amazon.smithy.rust.codegen.core.smithy.generators.operationBuildError import software.amazon.smithy.rust.codegen.core.smithy.generators.protocol.AdditionalPayloadContext @@ -64,7 +63,6 @@ class HttpBoundProtocolPayloadGenerator( private val httpBindingResolver = protocol.httpBindingResolver private val smithyEventStream = RuntimeType.smithyEventStream(runtimeConfig) private val codegenScope = arrayOf( - *preludeScope, "hyper" to CargoDependency.HyperWithStream.toType(), "SdkBody" to RuntimeType.sdkBody(runtimeConfig), "BuildError" to runtimeConfig.operationBuildError(), From 1add37ff44a8a5538ee379e2763f6c035f5710e1 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Tue, 26 Sep 2023 21:55:53 -0500 Subject: [PATCH 18/25] Move `fn_stream` to its own module This commit addresses https://github.com/awslabs/smithy-rs/pull/2978#discussion_r1337313976 --- .../smithy/generators/PaginatorGenerator.kt | 2 +- .../src/future/pagination_stream.rs | 121 +---------------- .../src/future/pagination_stream/fn_stream.rs | 124 ++++++++++++++++++ .../aws-smithy-async/src/future/rendezvous.rs | 2 +- 4 files changed, 130 insertions(+), 119 deletions(-) create mode 100644 rust-runtime/aws-smithy-async/src/future/pagination_stream/fn_stream.rs diff --git a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/PaginatorGenerator.kt b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/PaginatorGenerator.kt index d0fef52931..6f9892b2cc 100644 --- a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/PaginatorGenerator.kt +++ b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/PaginatorGenerator.kt @@ -148,7 +148,7 @@ class PaginatorGenerator private constructor( let builder = self.builder; let handle = self.handle; #{runtime_plugin_init} - #{pagination_stream}::PaginationStream::new(#{pagination_stream}::FnStream::new(move |tx| #{Box}::pin(async move { + #{pagination_stream}::PaginationStream::new(#{pagination_stream}::fn_stream::FnStream::new(move |tx| #{Box}::pin(async move { // Build the input for the first time. If required fields are missing, this is where we'll produce an early error. let mut input = match builder.build().map_err(#{SdkError}::construction_failure) { #{Ok}(input) => input, diff --git a/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs index 19534476cf..3ee302fce8 100644 --- a/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs +++ b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs @@ -5,15 +5,12 @@ //! Types to support stream-like operations for paginators. -use crate::future::rendezvous; -use pin_project_lite::pin_project; -use std::fmt; -use std::future::poll_fn; +use crate::future::pagination_stream::collect::sealed::Collectable; use std::future::Future; use std::pin::Pin; -use std::task::{Context, Poll}; - pub mod collect; +pub mod fn_stream; +use fn_stream::FnStream; /// A wrapper around [`FnStream`]. /// @@ -34,7 +31,7 @@ impl PaginationStream { } /// Consumes this stream and gathers elements into a collection. - pub async fn collect>(self) -> T { + pub async fn collect>(self) -> T { self.0.collect().await } } @@ -50,116 +47,6 @@ impl PaginationStream> { self.collect::, E>>().await } } -pin_project! { - /// Utility to drive a stream with an async function and a channel. - /// - /// The closure is passed a reference to a `Sender` which acts as a rendezvous channel. Messages - /// sent to the sender will be emitted to the stream. Because the stream is 1-bounded, the function - /// will not proceed until the stream is read. - /// - /// This utility is used by generated paginators to generate a stream of paginated results. - /// - /// If `tx.send` returns an error, the function MUST return immediately. - /// - /// Note `FnStream` is only `Send` but not `Sync` because `generator` is a boxed future that - /// is `Send` and returns `()` as output when it is done. - /// - /// # Examples - /// ```no_run - /// # async fn docs() { - /// use aws_smithy_async::future::pagination_stream::FnStream; - /// let mut stream = FnStream::new(|tx| Box::pin(async move { - /// if let Err(_) = tx.send("Hello!").await { - /// return; - /// } - /// if let Err(_) = tx.send("Goodbye!").await { - /// return; - /// } - /// })); - /// assert_eq!(stream.collect::>().await, vec!["Hello!", "Goodbye!"]); - /// # } - pub struct FnStream { - #[pin] - rx: rendezvous::Receiver, - generator: Option + Send + 'static>>>, - } -} - -impl fmt::Debug for FnStream { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let item_typename = std::any::type_name::(); - write!(f, "FnStream<{item_typename}>") - } -} - -impl FnStream { - /// Creates a new function based stream driven by `generator`. - /// - /// For examples, see the documentation for [`FnStream`] - pub fn new(generator: T) -> Self - where - T: FnOnce(rendezvous::Sender) -> Pin + Send + 'static>>, - { - let (tx, rx) = rendezvous::channel::(); - Self { - rx, - generator: Some(Box::pin(generator(tx))), - } - } - - /// Consumes and returns the next `Item` from this stream. - pub async fn next(&mut self) -> Option - where - Self: Unpin, - { - let mut me = Pin::new(self); - poll_fn(|cx| me.as_mut().poll_next(cx)).await - } - - /// Attempts to pull out the next value of this stream, returning `None` if the stream is - /// exhausted. - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut me = self.project(); - match me.rx.poll_recv(cx) { - Poll::Ready(item) => Poll::Ready(item), - Poll::Pending => { - if let Some(generator) = me.generator { - if generator.as_mut().poll(cx).is_ready() { - // `generator` keeps writing items to `tx` and will not be `Poll::Ready` - // until it is done writing to `tx`. Once it is done, it returns `()` - // as output and is `Poll::Ready`, at which point we MUST NOT poll it again - // since doing so will cause a panic. - *me.generator = None; - } - } - Poll::Pending - } - } - } - - /// Consumes this stream and gathers elements into a collection. - pub async fn collect>(mut self) -> T { - let mut collection = T::initialize(); - while let Some(item) = self.next().await { - if !T::extend(&mut collection, item) { - break; - } - } - T::finalize(collection) - } -} - -impl FnStream> { - /// Yields the next item in the stream or returns an error if an error is encountered. - pub async fn try_next(&mut self) -> Result, E> { - self.next().await.transpose() - } - - /// Convenience method for `.collect::, _>()`. - pub async fn try_collect(self) -> Result, E> { - self.collect::, E>>().await - } -} /// Utility wrapper to flatten paginated results /// diff --git a/rust-runtime/aws-smithy-async/src/future/pagination_stream/fn_stream.rs b/rust-runtime/aws-smithy-async/src/future/pagination_stream/fn_stream.rs new file mode 100644 index 0000000000..260bfb9e87 --- /dev/null +++ b/rust-runtime/aws-smithy-async/src/future/pagination_stream/fn_stream.rs @@ -0,0 +1,124 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +//! Module to define utility to drive a stream with an async function and a channel. + +use crate::future::pagination_stream::collect::sealed::Collectable; +use crate::future::rendezvous; +use pin_project_lite::pin_project; +use std::fmt; +use std::future::poll_fn; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pin_project! { + /// The closure is passed a reference to a `Sender` which acts as a rendezvous channel. Messages + /// sent to the sender will be emitted to the stream. Because the stream is 1-bounded, the function + /// will not proceed until the stream is read. + /// + /// This utility is used by generated paginators to generate a stream of paginated results. + /// + /// If `tx.send` returns an error, the function MUST return immediately. + /// + /// Note `FnStream` is only `Send` but not `Sync` because `generator` is a boxed future that + /// is `Send` and returns `()` as output when it is done. + /// + /// # Examples + /// ```no_run + /// # async fn docs() { + /// use aws_smithy_async::future::pagination_stream::fn_stream::FnStream; + /// let mut stream = FnStream::new(|tx| Box::pin(async move { + /// if let Err(_) = tx.send("Hello!").await { + /// return; + /// } + /// if let Err(_) = tx.send("Goodbye!").await { + /// return; + /// } + /// })); + /// assert_eq!(stream.collect::>().await, vec!["Hello!", "Goodbye!"]); + /// # } + pub struct FnStream { + #[pin] + rx: rendezvous::Receiver, + generator: Option + Send + 'static>>>, + } +} + +impl fmt::Debug for FnStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let item_typename = std::any::type_name::(); + write!(f, "FnStream<{item_typename}>") + } +} + +impl FnStream { + /// Creates a new function based stream driven by `generator`. + /// + /// For examples, see the documentation for [`FnStream`] + pub fn new(generator: T) -> Self + where + T: FnOnce(rendezvous::Sender) -> Pin + Send + 'static>>, + { + let (tx, rx) = rendezvous::channel::(); + Self { + rx, + generator: Some(Box::pin(generator(tx))), + } + } + + /// Consumes and returns the next `Item` from this stream. + pub async fn next(&mut self) -> Option + where + Self: Unpin, + { + let mut me = Pin::new(self); + poll_fn(|cx| me.as_mut().poll_next(cx)).await + } + + /// Attempts to pull out the next value of this stream, returning `None` if the stream is + /// exhausted. + pub(crate) fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut me = self.project(); + match me.rx.poll_recv(cx) { + Poll::Ready(item) => Poll::Ready(item), + Poll::Pending => { + if let Some(generator) = me.generator { + if generator.as_mut().poll(cx).is_ready() { + // `generator` keeps writing items to `tx` and will not be `Poll::Ready` + // until it is done writing to `tx`. Once it is done, it returns `()` + // as output and is `Poll::Ready`, at which point we MUST NOT poll it again + // since doing so will cause a panic. + *me.generator = None; + } + } + Poll::Pending + } + } + } + + /// Consumes this stream and gathers elements into a collection. + pub async fn collect>(mut self) -> T { + let mut collection = T::initialize(); + while let Some(item) = self.next().await { + if !T::extend(&mut collection, item) { + break; + } + } + T::finalize(collection) + } +} + +impl FnStream> { + /// Yields the next item in the stream or returns an error if an error is encountered. + pub async fn try_next(&mut self) -> Result, E> { + self.next().await.transpose() + } + + /// Convenience method for `.collect::, _>()`. + pub async fn try_collect(self) -> Result, E> { + self.collect::, E>>().await + } +} diff --git a/rust-runtime/aws-smithy-async/src/future/rendezvous.rs b/rust-runtime/aws-smithy-async/src/future/rendezvous.rs index 52bb05ff66..78762866e2 100644 --- a/rust-runtime/aws-smithy-async/src/future/rendezvous.rs +++ b/rust-runtime/aws-smithy-async/src/future/rendezvous.rs @@ -10,7 +10,7 @@ //! and coordinate with the receiver. //! //! Rendezvous channels should be used with care—it's inherently easy to deadlock unless they're being -//! used from separate tasks or an a coroutine setup (e.g. [`crate::future::pagination_stream::FnStream`]) +//! used from separate tasks or an a coroutine setup (e.g. [`crate::future::pagination_stream::fn_stream::FnStream`]) use std::future::poll_fn; use std::sync::Arc; From b8f37ea96b17220c604fbb3ce4d79402efec5b99 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Tue, 26 Sep 2023 22:04:10 -0500 Subject: [PATCH 19/25] Provide docs for `PaginationStream` for end users This commit addresses https://github.com/awslabs/smithy-rs/pull/2978#discussion_r1337309986 --- .../src/future/pagination_stream.rs | 38 +++++++++++++++++-- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs index 3ee302fce8..f263685b49 100644 --- a/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs +++ b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -//! Types to support stream-like operations for paginators. +//! Provides types to support stream-like operations for paginators. use crate::future::pagination_stream::collect::sealed::Collectable; use std::future::Future; @@ -12,10 +12,40 @@ pub mod collect; pub mod fn_stream; use fn_stream::FnStream; -/// A wrapper around [`FnStream`]. +/// Stream specifically made to support paginators. /// -/// This type provides the same set of methods as [`FnStream`], but that is meant to be used -/// internally and not by external users. +/// `PaginationStream` provides two primary mechanisms for accessing stream of data. +/// 1. With [`.next()`](PaginationStream::next) (or [`try_next()`](PaginationStream::try_next)): +/// +/// ```no_run +/// # async fn docs() { +/// # use aws_smithy_async::future::pagination_stream::PaginationStream; +/// # fn operation_to_yield_paginator() -> PaginationStream { +/// # todo!() +/// # } +/// # struct Page; +/// let mut stream: PaginationStream = operation_to_yield_paginator(); +/// while let Some(page) = stream.next().await { +/// // process `page` +/// } +/// # } +/// ``` +/// 2. With [`.collect()`](PaginationStream::collect) (or [`try_collect()`](PaginationStream::try_collect)): +/// +/// ```no_run +/// # async fn docs() { +/// # use aws_smithy_async::future::pagination_stream::PaginationStream; +/// # fn operation_to_yield_paginator() -> PaginationStream { +/// # todo!() +/// # } +/// # struct Page; +/// let mut stream: PaginationStream = operation_to_yield_paginator(); +/// let result = stream.collect::>().await; +/// # } +/// ``` +/// +/// [`PaginationStream`] is implemented in terms of [`FnStream`], but the latter is meant to be +/// used internally and not by external users. #[derive(Debug)] pub struct PaginationStream(FnStream); From 7571c8ce6691355147c324c9fba429f3861af589 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Tue, 26 Sep 2023 22:23:35 -0500 Subject: [PATCH 20/25] Update CHANGELOG.next.toml This commit addresses https://github.com/awslabs/smithy-rs/pull/2978#discussion_r1337308673 --- CHANGELOG.next.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.next.toml b/CHANGELOG.next.toml index 0d8cf2a58e..bffedc818e 100644 --- a/CHANGELOG.next.toml +++ b/CHANGELOG.next.toml @@ -185,13 +185,13 @@ meta = { "breaking" = false, "tada" = false, "bug" = false, "target" = "client" author = "rcoh" [[aws-sdk-rust]] -message = "The `futures_core::stream::Stream` trait has been removed from public API. It should not affect usual SDK use cases. If your code uses paginators, you do not need to use the `Stream` trait or its exntension traits, but only the `next`, `try_next`, `collect` methods are supported on `PaginationStream`. Other stream operations made available through the trait or its extension traits are not supported, but we should be able to add them as needed in a backward compatible manner." +message = "The `futures_core::stream::Stream` trait has been removed from public API. It should not affect usual SDK use cases. If your code uses paginators, you do not need to use the `Stream` trait or its exntension traits, but only the `next`, `try_next`, `collect`, and `try_collect` methods are supported on `PaginationStream`. Other stream operations that were previously available through the trait or its extension traits can be added later in a backward compatible manner. Finally, `fn_stream` has been moved to be a child module of `pagination_stream`." references = ["smithy-rs#2978"] meta = { "breaking" = true, "tada" = false, "bug" = false } author = "ysaito1001" [[smithy-rs]] -message = "The `futures_core::stream::Stream` trait has been removed from public API. [`FnStream`](https://docs.rs/aws-smithy-async/latest/aws_smithy_async/future/pagination_stream/struct.FnStream.html) only supports `next`, `try_next`, and `collect` methods. [`TryFlatMap::flat_map`](https://docs.rs/aws-smithy-async/latest/aws_smithy_async/future/pagination_stream/struct.TryFlatMap.html#method.flat_map) returns [`PaginationStream`](https://docs.rs/aws-smithy-async/latest/aws_smithy_async/future/pagination_stream/struct.PaginationStream.html), which should be preferred to `FnStream` at an interface level. For stream operations previously made available through the trait or its extension traits, we should be able to add them as needed in a backward compatible manner." +message = "The `futures_core::stream::Stream` trait has been removed from public API. `FnStream` only supports `next`, `try_next`, `collect`, and `try_collect` methods. [`TryFlatMap::flat_map`](https://docs.rs/aws-smithy-async/latest/aws_smithy_async/future/pagination_stream/struct.TryFlatMap.html#method.flat_map) returns [`PaginationStream`](https://docs.rs/aws-smithy-async/latest/aws_smithy_async/future/pagination_stream/struct.PaginationStream.html), which should be preferred to `FnStream` at an interface level. Other stream operations that were previously available through the trait or its extension traits can be added later in a backward compatible manner. Finally, `fn_stream` has been moved to be a child module of `pagination_stream`." references = ["smithy-rs#2978"] meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "client" } author = "ysaito1001" From a802799960a4094a55e3cd26ddaecadc6077ce0e Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Thu, 28 Sep 2023 12:07:32 -0500 Subject: [PATCH 21/25] Make `ByteStream::poll_next` to be `pub(crate)` This commit addresses https://github.com/awslabs/smithy-rs/pull/2983#discussion_r1337294255 --- rust-runtime/aws-smithy-http/src/byte_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust-runtime/aws-smithy-http/src/byte_stream.rs b/rust-runtime/aws-smithy-http/src/byte_stream.rs index 49e7b4d6a6..ca69da34f1 100644 --- a/rust-runtime/aws-smithy-http/src/byte_stream.rs +++ b/rust-runtime/aws-smithy-http/src/byte_stream.rs @@ -290,7 +290,7 @@ impl ByteStream { /// Attempt to pull out the next value of this stream, returning `None` if the stream is /// exhausted. - pub fn poll_next( + pub(crate) fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { From 5d07b16a4fb109ce905906218275ee75c304db4d Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Thu, 28 Sep 2023 12:37:13 -0500 Subject: [PATCH 22/25] Doc link `ByteStream::next` to `ByteStream::try_next` This commit addresses https://github.com/awslabs/smithy-rs/pull/2983#discussion_r1337295665 --- rust-runtime/aws-smithy-http/src/byte_stream.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rust-runtime/aws-smithy-http/src/byte_stream.rs b/rust-runtime/aws-smithy-http/src/byte_stream.rs index ca69da34f1..af82f501f5 100644 --- a/rust-runtime/aws-smithy-http/src/byte_stream.rs +++ b/rust-runtime/aws-smithy-http/src/byte_stream.rs @@ -284,6 +284,9 @@ impl ByteStream { } /// Return the next item in the `ByteStream`. + /// + /// There is also a sibling method [`try_next`](ByteStream::try_next), which returns a `Result, Error>` + /// instead of an `Option>`. pub async fn next(&mut self) -> Option> { Some(self.inner.next().await?.map_err(Error::streaming)) } From 6fc9a96e77f2d4fdb14e20f6cc88f2814eb33c52 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Thu, 28 Sep 2023 12:38:15 -0500 Subject: [PATCH 23/25] Doc link `ByteStream::try_next` to `ByteStream::next` This commit addresses https://github.com/awslabs/smithy-rs/pull/2983#discussion_r1337296451 --- rust-runtime/aws-smithy-http/src/byte_stream.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rust-runtime/aws-smithy-http/src/byte_stream.rs b/rust-runtime/aws-smithy-http/src/byte_stream.rs index af82f501f5..27ccbf8600 100644 --- a/rust-runtime/aws-smithy-http/src/byte_stream.rs +++ b/rust-runtime/aws-smithy-http/src/byte_stream.rs @@ -302,6 +302,9 @@ impl ByteStream { /// Consume and return the next item in the `ByteStream` or return an error if an error is /// encountered. + /// + /// Similar to the [`next`](ByteStream::next) method, but this returns a `Result, Error>` rather than + /// an `Option>`, making for easy use with the `?` operator. pub async fn try_next(&mut self) -> Result, Error> { self.next().await.transpose() } From b01b1d4021182b10afed1a4ae2006ca1f2e1e8d6 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Thu, 28 Sep 2023 12:54:04 -0500 Subject: [PATCH 24/25] Explain why `futures_stream_adapter` is `doc(hidden)` This commit addresses https://github.com/awslabs/smithy-rs/pull/2983#discussion_r1337301683 --- rust-runtime/aws-smithy-http/src/lib.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rust-runtime/aws-smithy-http/src/lib.rs b/rust-runtime/aws-smithy-http/src/lib.rs index 5a832e7973..52a259b422 100644 --- a/rust-runtime/aws-smithy-http/src/lib.rs +++ b/rust-runtime/aws-smithy-http/src/lib.rs @@ -27,6 +27,10 @@ pub mod body; pub mod endpoint; +// Marked as `doc(hidden)` because a type in the module is used both by this crate and by the code +// generator, but not by external users. Also, by the module being `doc(hidden)` instead of it being +// in `rust-runtime/inlineable`, each user won't have to pay the cost of running the module's tests +// when compiling their generated SDK. #[doc(hidden)] pub mod futures_stream_adapter; pub mod header; From 5bcf5ca974c728992965093ac2125364626c7153 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Thu, 28 Sep 2023 14:19:47 -0500 Subject: [PATCH 25/25] Make `ByteStream::poll_next` `pub` again The method in question is used by the python server's custom `ByteStream` so it needs to be `pub`. --- rust-runtime/aws-smithy-http/src/byte_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust-runtime/aws-smithy-http/src/byte_stream.rs b/rust-runtime/aws-smithy-http/src/byte_stream.rs index 27ccbf8600..e648431749 100644 --- a/rust-runtime/aws-smithy-http/src/byte_stream.rs +++ b/rust-runtime/aws-smithy-http/src/byte_stream.rs @@ -293,7 +293,7 @@ impl ByteStream { /// Attempt to pull out the next value of this stream, returning `None` if the stream is /// exhausted. - pub(crate) fn poll_next( + pub fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> {