From f43afc05d0f0a11aeef4942687e9bce4b8cc2dd5 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Fri, 8 Sep 2023 20:16:55 -0500 Subject: [PATCH 01/12] Remove `futures_core::stream::Stream` from `aws-smithy-async` --- rust-runtime/aws-smithy-async/Cargo.toml | 7 +- .../aws-smithy-async/external-types.toml | 3 - .../aws-smithy-async/src/future/mod.rs | 2 +- .../{fn_stream.rs => pagination_stream.rs} | 246 ++++++++++++------ .../src/future/pagination_stream/collect.rs | 75 ++++++ .../aws-smithy-async/src/future/rendezvous.rs | 22 +- 6 files changed, 255 insertions(+), 100 deletions(-) rename rust-runtime/aws-smithy-async/src/future/{fn_stream.rs => pagination_stream.rs} (50%) create mode 100644 rust-runtime/aws-smithy-async/src/future/pagination_stream/collect.rs diff --git a/rust-runtime/aws-smithy-async/Cargo.toml b/rust-runtime/aws-smithy-async/Cargo.toml index c95862d9ff..fd51b4fb1e 100644 --- a/rust-runtime/aws-smithy-async/Cargo.toml +++ b/rust-runtime/aws-smithy-async/Cargo.toml @@ -14,13 +14,18 @@ test-util = [] [dependencies] pin-project-lite = "0.2" tokio = { version = "1.23.1", features = ["sync"] } -tokio-stream = { version = "0.1.5", default-features = false } futures-util = { version = "0.3.16", default-features = false } [dev-dependencies] +pin-utils = "0.1" tokio = { version = "1.23.1", features = ["rt", "macros", "test-util"] } tokio-test = "0.4.2" +# futures-util is used by `now_or_later`, for instance, but the tooling +# reports a false positive, saying it is unused. +[package.metadata.cargo-udeps.ignore] +normal = ["futures-util"] + [package.metadata.docs.rs] all-features = true targets = ["x86_64-unknown-linux-gnu"] diff --git a/rust-runtime/aws-smithy-async/external-types.toml b/rust-runtime/aws-smithy-async/external-types.toml index 424f7dc1db..464456a2dc 100644 --- a/rust-runtime/aws-smithy-async/external-types.toml +++ b/rust-runtime/aws-smithy-async/external-types.toml @@ -2,7 +2,4 @@ allowed_external_types = [ "aws_smithy_types::config_bag::storable::Storable", "aws_smithy_types::config_bag::storable::StoreReplace", "aws_smithy_types::config_bag::storable::Storer", - - # TODO(https://github.com/awslabs/smithy-rs/issues/1193): Switch to AsyncIterator once standardized - "futures_core::stream::Stream", ] diff --git a/rust-runtime/aws-smithy-async/src/future/mod.rs b/rust-runtime/aws-smithy-async/src/future/mod.rs index 1e99bdc304..44894e0733 100644 --- a/rust-runtime/aws-smithy-async/src/future/mod.rs +++ b/rust-runtime/aws-smithy-async/src/future/mod.rs @@ -5,8 +5,8 @@ //! Useful runtime-agnostic future implementations. -pub mod fn_stream; pub mod never; pub mod now_or_later; +pub mod pagination_stream; pub mod rendezvous; pub mod timeout; diff --git a/rust-runtime/aws-smithy-async/src/future/fn_stream.rs b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs similarity index 50% rename from rust-runtime/aws-smithy-async/src/future/fn_stream.rs rename to rust-runtime/aws-smithy-async/src/future/pagination_stream.rs index 804b08f6bb..a8386dcfc5 100644 --- a/rust-runtime/aws-smithy-async/src/future/fn_stream.rs +++ b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs @@ -3,16 +3,51 @@ * SPDX-License-Identifier: Apache-2.0 */ -//! Utility to drive a stream with an async function and a channel. +//! Types to support stream-like operations for paginators. use crate::future::rendezvous; -use futures_util::StreamExt; use pin_project_lite::pin_project; +use std::fmt; +use std::future::poll_fn; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use tokio_stream::{Iter, Once, Stream}; +pub mod collect; + +/// A wrapper around [`FnStream`]. +/// +/// This type provides the same set of methods as [`FnStream`], but that is meant to be used +/// internally and not by external users. +#[derive(Debug)] +pub struct PaginationStream(FnStream); + +impl PaginationStream { + /// Creates a `PaginationStream` from the given [`FnStream`]. + pub fn new(stream: FnStream) -> Self { + Self(stream) + } + + /// Consumes and returns the next `Item` from this stream. + pub async fn next(&mut self) -> Option + where + Self: Unpin, + { + self.0.next().await + } + + /// Consumes this stream and gathers elements into a collection. + pub async fn collect>(self) -> T { + self.0.collect().await + } +} + +impl PaginationStream> { + /// Yields the next item in the stream or returns an error if an error is encountered. + pub async fn try_next(&mut self) -> Result, E> { + self.next().await.transpose() + } +} pin_project! { /// Utility to drive a stream with an async function and a channel. /// @@ -24,12 +59,14 @@ pin_project! { /// /// If `tx.send` returns an error, the function MUST return immediately. /// + /// Note `FnStream` is only `Send` but not `Sync` because `generator` is a boxed future that + /// is `Send` and returns `()` as output when it is done. + /// /// # Examples /// ```no_run - /// use tokio_stream::StreamExt; /// # async fn docs() { - /// use aws_smithy_async::future::fn_stream::FnStream; - /// let stream = FnStream::new(|tx| Box::pin(async move { + /// use aws_smithy_async::future::pagination_stream::FnStream; + /// let mut stream = FnStream::new(|tx| Box::pin(async move { /// if let Err(_) = tx.send("Hello!").await { /// return; /// } @@ -39,52 +76,82 @@ pin_project! { /// })); /// assert_eq!(stream.collect::>().await, vec!["Hello!", "Goodbye!"]); /// # } - pub struct FnStream { + pub struct FnStream { #[pin] rx: rendezvous::Receiver, - #[pin] - generator: Option, + generator: Option + Send + 'static>>>, + } +} + +impl fmt::Debug for FnStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let item_typename = std::any::type_name::(); + write!(f, "FnStream<{item_typename}>") } } -impl FnStream { +impl FnStream { /// Creates a new function based stream driven by `generator`. /// /// For examples, see the documentation for [`FnStream`] pub fn new(generator: T) -> Self where - T: FnOnce(rendezvous::Sender) -> F, + T: FnOnce(rendezvous::Sender) -> Pin + Send + 'static>>, { let (tx, rx) = rendezvous::channel::(); Self { rx, - generator: Some(generator(tx)), + generator: Some(Box::pin(generator(tx))), } } -} -impl Stream for FnStream -where - F: Future, -{ - type Item = Item; + /// Consumes and returns the next `Item` from this stream. + pub async fn next(&mut self) -> Option + where + Self: Unpin, + { + let mut me = Pin::new(self); + poll_fn(|cx| me.as_mut().poll_next(cx)).await + } - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + /// Attempts to pull out the next value of this stream, returning `None` if the stream is + /// exhausted. + pub fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut me = self.project(); match me.rx.poll_recv(cx) { Poll::Ready(item) => Poll::Ready(item), Poll::Pending => { - if let Some(generator) = me.generator.as_mut().as_pin_mut() { - if generator.poll(cx).is_ready() { - // if the generator returned ready we MUST NOT poll it again—doing so - // will cause a panic. - me.generator.set(None); + if let Some(generator) = me.generator { + if generator.as_mut().poll(cx).is_ready() { + // `generator` keeps writing items to `tx` and will not be `Poll::Ready` + // until it is done writing to `tx`. Once it is done, it returns `()` + // as output and is `Poll::Ready`, at which point we MUST NOT poll it again + // since doing so will cause a panic. + *me.generator = None; } } Poll::Pending } } } + + /// Consumes this stream and gathers elements into a collection. + pub async fn collect>(mut self) -> T { + let mut collection = T::initialize(); + while let Some(item) = self.next().await { + if !T::extend(&mut collection, item) { + break; + } + } + T::finalize(collection) + } +} + +impl FnStream> { + /// Yields the next item in the stream or returns an error if an error is encountered. + pub async fn try_next(&mut self) -> Result, E> { + self.next().await.transpose() + } } /// Utility wrapper to flatten paginated results @@ -93,62 +160,50 @@ where /// is present in each item. This provides `items()` which can wrap an stream of `Result` /// and produce a stream of `Result`. #[derive(Debug)] -pub struct TryFlatMap(I); +pub struct TryFlatMap(PaginationStream>); -impl TryFlatMap { - /// Create a `TryFlatMap` that wraps the input - pub fn new(i: I) -> Self { - Self(i) +impl TryFlatMap { + /// Creates a `TryFlatMap` that wraps the input. + pub fn new(stream: PaginationStream>) -> Self { + Self(stream) } - /// Produce a new [`Stream`] by mapping this stream with `map` then flattening the result - pub fn flat_map(self, map: M) -> impl Stream> + /// Produces a new [`PaginationStream`] by mapping this stream with `map` then flattening the result. + pub fn flat_map(mut self, map: M) -> PaginationStream> where - I: Stream>, - M: Fn(Page) -> Iter, - Iter: IntoIterator, + Page: Send + 'static, + Err: Send + 'static, + M: Fn(Page) -> Iter + Send + 'static, + Item: Send + 'static, + Iter: IntoIterator + Send, + ::IntoIter: Send, { - self.0.flat_map(move |page| match page { - Ok(page) => OnceOrMany::Many { - many: tokio_stream::iter(map(page).into_iter().map(Ok)), - }, - Err(e) => OnceOrMany::Once { - once: tokio_stream::once(Err(e)), - }, - }) - } -} - -pin_project! { - /// Helper enum to to support returning `Once` and `Iter` from `Items::items` - #[project = OnceOrManyProj] - enum OnceOrMany { - Many { #[pin] many: Iter }, - Once { #[pin] once: Once }, - } -} - -impl Stream for OnceOrMany -where - Iter: Iterator, -{ - type Item = Item; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let me = self.project(); - match me { - OnceOrManyProj::Many { many } => many.poll_next(cx), - OnceOrManyProj::Once { once } => once.poll_next(cx), - } + PaginationStream::new(FnStream::new(|tx| { + Box::pin(async move { + while let Some(page) = self.0.next().await { + match page { + Ok(page) => { + let mapped = map(page); + for item in mapped.into_iter() { + let _ = tx.send(Ok(item)).await; + } + } + Err(e) => { + let _ = tx.send(Err(e)).await; + break; + } + } + } + }) as Pin + Send>> + })) } } #[cfg(test)] mod test { - use crate::future::fn_stream::{FnStream, TryFlatMap}; + use crate::future::pagination_stream::{FnStream, PaginationStream, TryFlatMap}; use std::sync::{Arc, Mutex}; use std::time::Duration; - use tokio_stream::StreamExt; /// basic test of FnStream functionality #[tokio::test] @@ -168,7 +223,24 @@ mod test { while let Some(value) = stream.next().await { out.push(value); } - assert_eq!(out, vec!["1", "2", "3"]); + assert_eq!(vec!["1", "2", "3"], out); + } + + #[tokio::test] + async fn fn_stream_try_next() { + tokio::time::pause(); + let mut stream = FnStream::new(|tx| { + Box::pin(async move { + tx.send(Ok(1)).await.unwrap(); + tx.send(Ok(2)).await.unwrap(); + tx.send(Err("err")).await.unwrap(); + }) + }); + let mut out = vec![]; + while let Ok(value) = stream.try_next().await { + out.push(value); + } + assert_eq!(vec![Some(1), Some(2)], out); } // smithy-rs#1902: there was a bug where we could continue to poll the generator after it @@ -183,10 +255,16 @@ mod test { Box::leak(Box::new(tx)); }) }); - assert_eq!(stream.next().await, Some("blah")); + assert_eq!(Some("blah"), stream.next().await); let mut test_stream = tokio_test::task::spawn(stream); - assert!(test_stream.poll_next().is_pending()); - assert!(test_stream.poll_next().is_pending()); + let _ = test_stream.enter(|ctx, pin| { + let polled = pin.poll_next(ctx); + assert!(polled.is_pending()); + }); + let _ = test_stream.enter(|ctx, pin| { + let polled = pin.poll_next(ctx); + assert!(polled.is_pending()); + }); } /// Tests that the generator will not advance until demand exists @@ -209,13 +287,13 @@ mod test { stream.next().await.expect("ready"); assert_eq!(*progress.lock().unwrap(), 1); - assert_eq!(stream.next().await.expect("ready"), "2"); - assert_eq!(*progress.lock().unwrap(), 2); + assert_eq!("2", stream.next().await.expect("ready")); + assert_eq!(2, *progress.lock().unwrap()); let _ = stream.next().await.expect("ready"); - assert_eq!(*progress.lock().unwrap(), 3); - assert_eq!(stream.next().await, None); - assert_eq!(*progress.lock().unwrap(), 4); + assert_eq!(3, *progress.lock().unwrap()); + assert_eq!(None, stream.next().await); + assert_eq!(4, *progress.lock().unwrap()); } #[tokio::test] @@ -238,7 +316,7 @@ mod test { while let Some(Ok(value)) = stream.next().await { out.push(value); } - assert_eq!(out, vec![0, 1]); + assert_eq!(vec![0, 1], out); } #[tokio::test] @@ -262,12 +340,12 @@ mod test { }) }); assert_eq!( - TryFlatMap(stream) + Ok(vec![1, 2, 3, 4, 5, 6]), + TryFlatMap::new(PaginationStream::new(stream)) .flat_map(|output| output.items.into_iter()) .collect::, &str>>() .await, - Ok(vec![1, 2, 3, 4, 5, 6]) - ) + ); } #[tokio::test] @@ -287,11 +365,11 @@ mod test { }) }); assert_eq!( - TryFlatMap(stream) + Err("bummer"), + TryFlatMap::new(PaginationStream::new(stream)) .flat_map(|output| output.items.into_iter()) .collect::, &str>>() - .await, - Err("bummer") + .await ) } } diff --git a/rust-runtime/aws-smithy-async/src/future/pagination_stream/collect.rs b/rust-runtime/aws-smithy-async/src/future/pagination_stream/collect.rs new file mode 100644 index 0000000000..1a6fcfbf9a --- /dev/null +++ b/rust-runtime/aws-smithy-async/src/future/pagination_stream/collect.rs @@ -0,0 +1,75 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +//! Module to extend the functionality of types in `patination_stream` module to allow for +//! collecting elements of the stream into collection. +//! +//! Majority of the code is borrowed from +//! + +pub(crate) mod sealed { + /// A trait that signifies that elements can be collected into `T`. + /// + /// Currently the trait may not be implemented by clients so we can make changes in the future + /// without breaking code depending on it. + #[doc(hidden)] + pub trait Collectable { + type Collection; + + fn initialize() -> Self::Collection; + + fn extend(collection: &mut Self::Collection, item: T) -> bool; + + fn finalize(collection: Self::Collection) -> Self; + } +} + +impl sealed::Collectable for Vec { + type Collection = Self; + + fn initialize() -> Self::Collection { + Vec::default() + } + + fn extend(collection: &mut Self::Collection, item: T) -> bool { + collection.push(item); + true + } + + fn finalize(collection: Self::Collection) -> Self { + collection + } +} + +impl sealed::Collectable> for Result +where + U: sealed::Collectable, +{ + type Collection = Result; + + fn initialize() -> Self::Collection { + Ok(U::initialize()) + } + + fn extend(collection: &mut Self::Collection, item: Result) -> bool { + match item { + Ok(item) => { + let collection = collection.as_mut().ok().expect("invalid state"); + U::extend(collection, item) + } + Err(e) => { + *collection = Err(e); + false + } + } + } + + fn finalize(collection: Self::Collection) -> Self { + match collection { + Ok(collection) => Ok(U::finalize(collection)), + err @ Err(_) => Err(err.map(drop).unwrap_err()), + } + } +} diff --git a/rust-runtime/aws-smithy-async/src/future/rendezvous.rs b/rust-runtime/aws-smithy-async/src/future/rendezvous.rs index 16456f123e..52bb05ff66 100644 --- a/rust-runtime/aws-smithy-async/src/future/rendezvous.rs +++ b/rust-runtime/aws-smithy-async/src/future/rendezvous.rs @@ -10,8 +10,9 @@ //! and coordinate with the receiver. //! //! Rendezvous channels should be used with care—it's inherently easy to deadlock unless they're being -//! used from separate tasks or an a coroutine setup (e.g. [`crate::future::fn_stream::FnStream`]) +//! used from separate tasks or an a coroutine setup (e.g. [`crate::future::pagination_stream::FnStream`]) +use std::future::poll_fn; use std::sync::Arc; use std::task::{Context, Poll}; use tokio::sync::Semaphore; @@ -104,7 +105,11 @@ pub struct Receiver { impl Receiver { /// Polls to receive an item from the channel - pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { + pub async fn recv(&mut self) -> Option { + poll_fn(|cx| self.poll_recv(cx)).await + } + + pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { // This uses `needs_permit` to track whether this is the first poll since we last returned an item. // If it is, we will grant a permit to the semaphore. Otherwise, we'll just forward the response through. let resp = self.chan.poll_recv(cx); @@ -124,13 +129,8 @@ impl Receiver { #[cfg(test)] mod test { - use crate::future::rendezvous::{channel, Receiver}; + use crate::future::rendezvous::channel; use std::sync::{Arc, Mutex}; - use tokio::macros::support::poll_fn; - - async fn recv(rx: &mut Receiver) -> Option { - poll_fn(|cx| rx.poll_recv(cx)).await - } #[tokio::test] async fn send_blocks_caller() { @@ -145,11 +145,11 @@ mod test { *idone.lock().unwrap() = 3; }); assert_eq!(*done.lock().unwrap(), 0); - assert_eq!(recv(&mut rx).await, Some(0)); + assert_eq!(rx.recv().await, Some(0)); assert_eq!(*done.lock().unwrap(), 1); - assert_eq!(recv(&mut rx).await, Some(1)); + assert_eq!(rx.recv().await, Some(1)); assert_eq!(*done.lock().unwrap(), 2); - assert_eq!(recv(&mut rx).await, None); + assert_eq!(rx.recv().await, None); assert_eq!(*done.lock().unwrap(), 3); let _ = send.await; } From cbbaf7715bfd29aae3bc88d33c0f0935a41d5901 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Fri, 8 Sep 2023 20:17:05 -0500 Subject: [PATCH 02/12] Update codegen client to use `PaginationStream` --- .../smithy/generators/PaginatorGenerator.kt | 20 ++++++++++--------- .../client/FluentClientGenerator.kt | 2 +- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/PaginatorGenerator.kt b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/PaginatorGenerator.kt index dedc50f31f..d0fef52931 100644 --- a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/PaginatorGenerator.kt +++ b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/PaginatorGenerator.kt @@ -90,7 +90,7 @@ class PaginatorGenerator private constructor( "HttpResponse" to RuntimeType.smithyRuntimeApi(runtimeConfig).resolve("client::orchestrator::HttpResponse"), "SdkError" to RuntimeType.sdkError(runtimeConfig), "client" to RuntimeType.smithyClient(runtimeConfig), - "fn_stream" to RuntimeType.smithyAsync(runtimeConfig).resolve("future::fn_stream"), + "pagination_stream" to RuntimeType.smithyAsync(runtimeConfig).resolve("future::pagination_stream"), // External Types "Stream" to RuntimeType.TokioStream.resolve("Stream"), @@ -141,13 +141,14 @@ class PaginatorGenerator private constructor( /// Create the pagination stream /// - /// _Note:_ No requests will be dispatched until the stream is used (eg. with [`.next().await`](tokio_stream::StreamExt::next)). - pub fn send(self) -> impl #{Stream} + #{Unpin} { + /// _Note:_ No requests will be dispatched until the stream is used + /// (e.g. with the [`.next().await`](aws_smithy_async::future::pagination_stream::PaginationStream::next) method). + pub fn send(self) -> #{pagination_stream}::PaginationStream<#{item_type}> { // Move individual fields out of self for the borrow checker let builder = self.builder; let handle = self.handle; #{runtime_plugin_init} - #{fn_stream}::FnStream::new(move |tx| #{Box}::pin(async move { + #{pagination_stream}::PaginationStream::new(#{pagination_stream}::FnStream::new(move |tx| #{Box}::pin(async move { // Build the input for the first time. If required fields are missing, this is where we'll produce an early error. let mut input = match builder.build().map_err(#{SdkError}::construction_failure) { #{Ok}(input) => input, @@ -177,7 +178,7 @@ class PaginatorGenerator private constructor( return } } - })) + }))) } } """, @@ -257,11 +258,12 @@ class PaginatorGenerator private constructor( impl ${paginatorName}Items { /// Create the pagination stream /// - /// _Note: No requests will be dispatched until the stream is used (eg. with [`.next().await`](tokio_stream::StreamExt::next))._ + /// _Note_: No requests will be dispatched until the stream is used + /// (e.g. with the [`.next().await`](aws_smithy_async::future::pagination_stream::PaginationStream::next) method). /// - /// To read the entirety of the paginator, use [`.collect::, _>()`](tokio_stream::StreamExt::collect). - pub fn send(self) -> impl #{Stream} + #{Unpin} { - #{fn_stream}::TryFlatMap::new(self.0.send()).flat_map(|page| #{extract_items}(page).unwrap_or_default().into_iter()) + /// To read the entirety of the paginator, use [`.collect::, _>()`](aws_smithy_async::future::pagination_stream::PaginationStream::collect). + pub fn send(self) -> #{pagination_stream}::PaginationStream<#{item_type}> { + #{pagination_stream}::TryFlatMap::new(self.0.send()).flat_map(|page| #{extract_items}(page).unwrap_or_default().into_iter()) } } diff --git a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt index b2927dca8e..d5655c4879 100644 --- a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt +++ b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt @@ -433,7 +433,7 @@ class FluentClientGenerator( """ /// Create a paginator for this request /// - /// Paginators are used by calling [`send().await`](#{Paginator}::send) which returns a `Stream`. + /// Paginators are used by calling [`send().await`](#{Paginator}::send) which returns a [`PaginationStream`](aws_smithy_async::future::pagination_stream::PaginationStream). pub fn into_paginator(self) -> #{Paginator} { #{Paginator}::new(self.handle, self.inner) } From 8005c3de85d76856dee865a37f8b37adb8e20fc6 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Fri, 8 Sep 2023 20:17:17 -0500 Subject: [PATCH 03/12] Update canary to use `PaginationStream` --- .../src/latest/paginator_canary.rs | 1 - tools/ci-cdk/canary-lambda/src/main.rs | 10 +- ...se_2023_01_26.rs => release_2023_08_23.rs} | 0 .../paginator_canary.rs | 6 +- .../s3_canary.rs | 4 +- .../transcribe_canary.rs | 4 +- .../ci-cdk/canary-runner/src/build_bundle.rs | 100 +++++++++--------- 7 files changed, 62 insertions(+), 63 deletions(-) rename tools/ci-cdk/canary-lambda/src/{release_2023_01_26.rs => release_2023_08_23.rs} (100%) rename tools/ci-cdk/canary-lambda/src/{release_2023_01_26 => release_2023_08_23}/paginator_canary.rs (92%) rename tools/ci-cdk/canary-lambda/src/{release_2023_01_26 => release_2023_08_23}/s3_canary.rs (98%) rename tools/ci-cdk/canary-lambda/src/{release_2023_01_26 => release_2023_08_23}/transcribe_canary.rs (97%) diff --git a/tools/ci-cdk/canary-lambda/src/latest/paginator_canary.rs b/tools/ci-cdk/canary-lambda/src/latest/paginator_canary.rs index d50c4f2be8..11914660ad 100644 --- a/tools/ci-cdk/canary-lambda/src/latest/paginator_canary.rs +++ b/tools/ci-cdk/canary-lambda/src/latest/paginator_canary.rs @@ -10,7 +10,6 @@ use aws_sdk_ec2 as ec2; use aws_sdk_ec2::types::InstanceType; use crate::CanaryEnv; -use tokio_stream::StreamExt; mk_canary!( "ec2_paginator", diff --git a/tools/ci-cdk/canary-lambda/src/main.rs b/tools/ci-cdk/canary-lambda/src/main.rs index 688462031d..8fc6f4b2e7 100644 --- a/tools/ci-cdk/canary-lambda/src/main.rs +++ b/tools/ci-cdk/canary-lambda/src/main.rs @@ -26,11 +26,11 @@ mod latest; #[cfg(feature = "latest")] pub(crate) use latest as current_canary; -// NOTE: This module can be deleted 3 releases after release-2023-01-26 -#[cfg(feature = "release-2023-01-26")] -mod release_2023_01_26; -#[cfg(feature = "release-2023-01-26")] -pub(crate) use release_2023_01_26 as current_canary; +// NOTE: This module can be deleted 3 releases after release-2023-08-23 +#[cfg(feature = "release-2023-08-23")] +mod release_2023_08_23; +#[cfg(feature = "release-2023-08-23")] +pub(crate) use release_2023_08_23 as current_canary; #[tokio::main] async fn main() -> Result<(), Error> { diff --git a/tools/ci-cdk/canary-lambda/src/release_2023_01_26.rs b/tools/ci-cdk/canary-lambda/src/release_2023_08_23.rs similarity index 100% rename from tools/ci-cdk/canary-lambda/src/release_2023_01_26.rs rename to tools/ci-cdk/canary-lambda/src/release_2023_08_23.rs diff --git a/tools/ci-cdk/canary-lambda/src/release_2023_01_26/paginator_canary.rs b/tools/ci-cdk/canary-lambda/src/release_2023_08_23/paginator_canary.rs similarity index 92% rename from tools/ci-cdk/canary-lambda/src/release_2023_01_26/paginator_canary.rs rename to tools/ci-cdk/canary-lambda/src/release_2023_08_23/paginator_canary.rs index 72c9b40ed0..66df5a03e4 100644 --- a/tools/ci-cdk/canary-lambda/src/release_2023_01_26/paginator_canary.rs +++ b/tools/ci-cdk/canary-lambda/src/release_2023_08_23/paginator_canary.rs @@ -7,7 +7,7 @@ use crate::mk_canary; use anyhow::bail; use aws_sdk_ec2 as ec2; -use aws_sdk_ec2::model::InstanceType; +use aws_sdk_ec2::types::InstanceType; use crate::CanaryEnv; use tokio_stream::StreamExt; @@ -30,7 +30,7 @@ pub async fn paginator_canary(client: ec2::Client, page_size: usize) -> anyhow:: let mut num_pages = 0; while let Some(page) = history.try_next().await? { let items_in_page = page.spot_price_history.unwrap_or_default().len(); - if items_in_page > page_size as usize { + if items_in_page > page_size { bail!( "failed to retrieve results of correct page size (expected {}, got {})", page_size, @@ -60,7 +60,7 @@ pub async fn paginator_canary(client: ec2::Client, page_size: usize) -> anyhow:: #[cfg(test)] mod test { - use crate::paginator_canary::paginator_canary; + use crate::current_canary::paginator_canary::paginator_canary; #[tokio::test] async fn test_paginator() { diff --git a/tools/ci-cdk/canary-lambda/src/release_2023_01_26/s3_canary.rs b/tools/ci-cdk/canary-lambda/src/release_2023_08_23/s3_canary.rs similarity index 98% rename from tools/ci-cdk/canary-lambda/src/release_2023_01_26/s3_canary.rs rename to tools/ci-cdk/canary-lambda/src/release_2023_08_23/s3_canary.rs index 70e3d18c55..fbcba976d8 100644 --- a/tools/ci-cdk/canary-lambda/src/release_2023_01_26/s3_canary.rs +++ b/tools/ci-cdk/canary-lambda/src/release_2023_08_23/s3_canary.rs @@ -8,8 +8,8 @@ use crate::{mk_canary, CanaryEnv}; use anyhow::Context; use aws_config::SdkConfig; use aws_sdk_s3 as s3; -use aws_sdk_s3::presigning::config::PresigningConfig; -use s3::types::ByteStream; +use s3::presigning::PresigningConfig; +use s3::primitives::ByteStream; use std::time::Duration; use uuid::Uuid; diff --git a/tools/ci-cdk/canary-lambda/src/release_2023_01_26/transcribe_canary.rs b/tools/ci-cdk/canary-lambda/src/release_2023_08_23/transcribe_canary.rs similarity index 97% rename from tools/ci-cdk/canary-lambda/src/release_2023_01_26/transcribe_canary.rs rename to tools/ci-cdk/canary-lambda/src/release_2023_08_23/transcribe_canary.rs index 554f4c3ddf..8f6420fc1b 100644 --- a/tools/ci-cdk/canary-lambda/src/release_2023_01_26/transcribe_canary.rs +++ b/tools/ci-cdk/canary-lambda/src/release_2023_08_23/transcribe_canary.rs @@ -9,10 +9,10 @@ use async_stream::stream; use aws_config::SdkConfig; use aws_sdk_transcribestreaming as transcribe; use bytes::BufMut; -use transcribe::model::{ +use transcribe::primitives::Blob; +use transcribe::types::{ AudioEvent, AudioStream, LanguageCode, MediaEncoding, TranscriptResultStream, }; -use transcribe::types::Blob; const CHUNK_SIZE: usize = 8192; use crate::canary::CanaryEnv; diff --git a/tools/ci-cdk/canary-runner/src/build_bundle.rs b/tools/ci-cdk/canary-runner/src/build_bundle.rs index 464ee2e4ad..4ec7861460 100644 --- a/tools/ci-cdk/canary-runner/src/build_bundle.rs +++ b/tools/ci-cdk/canary-runner/src/build_bundle.rs @@ -63,9 +63,10 @@ const REQUIRED_SDK_CRATES: &[&str] = &[ "aws-sdk-transcribestreaming", ]; +// The elements in this `Vec` should be sorted in an ascending order by the release date. lazy_static! { static ref NOTABLE_SDK_RELEASE_TAGS: Vec = vec![ - ReleaseTag::from_str("release-2023-01-26").unwrap(), // last version before the crate reorg + ReleaseTag::from_str("release-2023-08-23").unwrap(), // last version before `Stream` trait removal ]; } @@ -112,38 +113,58 @@ enum CrateSource { }, } -fn enabled_features(crate_source: &CrateSource) -> Vec { - let mut enabled = Vec::new(); +fn enabled_feature(crate_source: &CrateSource) -> String { if let CrateSource::VersionsManifest { release_tag, .. } = crate_source { - // we want to select the newest module specified after this release + // we want to select the oldest module specified after this release for notable in NOTABLE_SDK_RELEASE_TAGS.iter() { tracing::debug!(release_tag = ?release_tag, notable = ?notable, "considering if release tag came before notable release"); if release_tag <= notable { tracing::debug!("selecting {} as chosen release", notable); - enabled.push(notable.as_str().into()); - break; + return notable.as_str().into(); } } } - if enabled.is_empty() { - enabled.push("latest".into()); - } - enabled + "latest".into() } fn generate_crate_manifest(crate_source: CrateSource) -> Result { let mut output = BASE_MANIFEST.to_string(); - for &sdk_crate in REQUIRED_SDK_CRATES { + write_dependencies(REQUIRED_SDK_CRATES, &mut output, &crate_source)?; + write!(output, "\n[features]\n").unwrap(); + writeln!(output, "latest = []").unwrap(); + for release_tag in NOTABLE_SDK_RELEASE_TAGS.iter() { + writeln!( + output, + "\"{release_tag}\" = []", + release_tag = release_tag.as_str() + ) + .unwrap(); + } + writeln!( + output, + "default = [\"{enabled}\"]", + enabled = enabled_feature(&crate_source) + ) + .unwrap(); + Ok(output) +} + +fn write_dependencies( + required_crates: &[&str], + output: &mut String, + crate_source: &CrateSource, +) -> Result<()> { + for &required_crate in required_crates { match &crate_source { CrateSource::Path(path) => { - let path_name = match sdk_crate.strip_prefix("aws-sdk-") { + let path_name = match required_crate.strip_prefix("aws-sdk-") { Some(path) => path, - None => sdk_crate, + None => required_crate, }; let crate_path = path.join(path_name); writeln!( output, - r#"{sdk_crate} = {{ path = "{path}" }}"#, + r#"{required_crate} = {{ path = "{path}" }}"#, path = crate_path.to_string_lossy() ) .unwrap() @@ -151,40 +172,20 @@ fn generate_crate_manifest(crate_source: CrateSource) -> Result { CrateSource::VersionsManifest { versions, release_tag, - } => match versions.crates.get(sdk_crate) { + } => match versions.crates.get(required_crate) { Some(version) => writeln!( output, - r#"{sdk_crate} = "{version}""#, + r#"{required_crate} = "{version}""#, version = version.version ) .unwrap(), None => { - bail!("Couldn't find `{sdk_crate}` in versions.toml for `{release_tag}`") + bail!("Couldn't find `{required_crate}` in versions.toml for `{release_tag}`") } }, } } - write!(output, "\n[features]\n").unwrap(); - writeln!(output, "latest = []").unwrap(); - for release_tag in NOTABLE_SDK_RELEASE_TAGS.iter() { - writeln!( - output, - "\"{release_tag}\" = []", - release_tag = release_tag.as_str() - ) - .unwrap(); - } - writeln!( - output, - "default = [{enabled}]", - enabled = enabled_features(&crate_source) - .into_iter() - .map(|f| format!("\"{f}\"")) - .collect::>() - .join(", ") - ) - .unwrap(); - Ok(output) + Ok(()) } fn sha1_file(path: &Path) -> Result { @@ -441,7 +442,7 @@ aws-sdk-transcribestreaming = { path = "some/sdk/path/transcribestreaming" } [features] latest = [] -"release-2023-01-26" = [] +"release-2023-08-23" = [] default = ["latest"] "#, generate_crate_manifest(CrateSource::Path("some/sdk/path".into())).expect("success") @@ -505,7 +506,7 @@ aws-sdk-transcribestreaming = "0.16.0" [features] latest = [] -"release-2023-01-26" = [] +"release-2023-08-23" = [] default = ["latest"] "#, generate_crate_manifest(CrateSource::VersionsManifest { @@ -523,7 +524,7 @@ default = ["latest"] .collect(), release: None, }, - release_tag: ReleaseTag::from_str("release-2023-05-26").unwrap(), + release_tag: ReleaseTag::from_str("release-2023-08-26").unwrap(), }) .expect("success") ); @@ -577,26 +578,25 @@ default = ["latest"] release: None, }; assert_eq!( - enabled_features(&CrateSource::VersionsManifest { + "latest".to_string(), + enabled_feature(&CrateSource::VersionsManifest { versions: versions.clone(), - release_tag: "release-2023-02-23".parse().unwrap(), + release_tag: "release-9999-12-31".parse().unwrap(), }), - vec!["latest".to_string()] ); - assert_eq!( - enabled_features(&CrateSource::VersionsManifest { + "release-2023-08-23".to_string(), + enabled_feature(&CrateSource::VersionsManifest { versions: versions.clone(), - release_tag: "release-2023-01-26".parse().unwrap(), + release_tag: "release-2023-08-23".parse().unwrap(), }), - vec!["release-2023-01-26".to_string()] ); assert_eq!( - enabled_features(&CrateSource::VersionsManifest { + "release-2023-08-23".to_string(), + enabled_feature(&CrateSource::VersionsManifest { versions, release_tag: "release-2023-01-13".parse().unwrap(), }), - vec!["release-2023-01-26".to_string()] ); } } From 75b96e351ac253fb24bab3e0867e8c13310e6bed Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Fri, 8 Sep 2023 20:17:29 -0500 Subject: [PATCH 04/12] Stop using `tokio_stream::StreamExt` in integration-tests --- aws/sdk/integration-tests/dynamodb/tests/paginators.rs | 2 -- aws/sdk/integration-tests/ec2/tests/paginators.rs | 2 -- 2 files changed, 4 deletions(-) diff --git a/aws/sdk/integration-tests/dynamodb/tests/paginators.rs b/aws/sdk/integration-tests/dynamodb/tests/paginators.rs index 807a11890d..a3d0c62473 100644 --- a/aws/sdk/integration-tests/dynamodb/tests/paginators.rs +++ b/aws/sdk/integration-tests/dynamodb/tests/paginators.rs @@ -6,8 +6,6 @@ use std::collections::HashMap; use std::iter::FromIterator; -use tokio_stream::StreamExt; - use aws_credential_types::Credentials; use aws_sdk_dynamodb::types::AttributeValue; use aws_sdk_dynamodb::{Client, Config}; diff --git a/aws/sdk/integration-tests/ec2/tests/paginators.rs b/aws/sdk/integration-tests/ec2/tests/paginators.rs index 83528f2075..d070971a4f 100644 --- a/aws/sdk/integration-tests/ec2/tests/paginators.rs +++ b/aws/sdk/integration-tests/ec2/tests/paginators.rs @@ -3,8 +3,6 @@ * SPDX-License-Identifier: Apache-2.0 */ -use tokio_stream::StreamExt; - use aws_sdk_ec2::{config::Credentials, config::Region, types::InstanceType, Client, Config}; use aws_smithy_client::http_connector::HttpConnector; use aws_smithy_client::test_connection::TestConnection; From 2b1fcb1a901f22ece2b5dbdb865c2f4ecaa5a3ed Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Mon, 11 Sep 2023 11:07:05 -0500 Subject: [PATCH 05/12] Add comment explaining why the test uses `.enter` This commit responses to https://github.com/awslabs/smithy-rs/pull/2978#discussion_r1321632649 --- rust-runtime/aws-smithy-async/src/future/pagination_stream.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs index a8386dcfc5..083f6190f8 100644 --- a/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs +++ b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs @@ -257,6 +257,9 @@ mod test { }); assert_eq!(Some("blah"), stream.next().await); let mut test_stream = tokio_test::task::spawn(stream); + // `tokio_test::task::Spawn::poll_next` can only be invoked when the wrapped + // type implements the `Stream` trait. Here, `FnStream` does not implement it, + // so we work around it by using the `enter` method. let _ = test_stream.enter(|ctx, pin| { let polled = pin.poll_next(ctx); assert!(polled.is_pending()); From 73966d4d1130bc42e6e71c7245848b2b408f6611 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Mon, 11 Sep 2023 12:12:53 -0500 Subject: [PATCH 06/12] Make `FnStream::poll_next` module private --- rust-runtime/aws-smithy-async/src/future/pagination_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs index 083f6190f8..bf37ade49e 100644 --- a/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs +++ b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs @@ -116,7 +116,7 @@ impl FnStream { /// Attempts to pull out the next value of this stream, returning `None` if the stream is /// exhausted. - pub fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut me = self.project(); match me.rx.poll_recv(cx) { Poll::Ready(item) => Poll::Ready(item), From e25a916ae7a3672f17b51f856c1574ed2e4d454c Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Mon, 11 Sep 2023 14:42:46 -0500 Subject: [PATCH 07/12] Update CHANGELOG.next.toml --- CHANGELOG.next.toml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/CHANGELOG.next.toml b/CHANGELOG.next.toml index 73c59508ae..261b7a2544 100644 --- a/CHANGELOG.next.toml +++ b/CHANGELOG.next.toml @@ -150,3 +150,15 @@ message = "Fix regression with redacting sensitive HTTP response bodies." references = ["smithy-rs#2926", "smithy-rs#2972"] meta = { "breaking" = false, "tada" = false, "bug" = true, "target" = "client" } author = "ysaito1001" + +[[aws-sdk-rust]] +message = "The `futures_core::stream::Stream` trait has been removed from public API. It should not affect usual SDK use cases. If your code uses paginators, you do not need to use the `Stream` trait or its exntension traits, but only the `next`, `try_next`, `collect` methods are supported on `PaginationStream`. Other stream operations made available through the trait or its extension traits are not supported, but we should be able to add them as needed in a backward compatible manner." +references = ["smithy-rs#2978"] +meta = { "breaking" = true, "tada" = false, "bug" = false } +author = "ysaito1001" + +[[smithy-rs]] +message = "The `futures_core::stream::Stream` trait has been removed from public API. [`FnStream`](https://docs.rs/aws-smithy-async/latest/aws_smithy_async/future/pagination_stream/struct.FnStream.html) only supports `next`, `try_next`, and `collect` methods. [`TryFlatMap::flat_map`](https://docs.rs/aws-smithy-async/latest/aws_smithy_async/future/pagination_stream/struct.TryFlatMap.html#method.flat_map) returns [`PaginationStream`](https://docs.rs/aws-smithy-async/latest/aws_smithy_async/future/pagination_stream/struct.PaginationStream.html), which should be preferred to `FnStream` at an interface level. For stream operations previously made available through the trait or its extension traits, we should be able to add them as needed in a backward compatible manner." +references = ["smithy-rs#2978"] +meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "client" } +author = "ysaito1001" From 8147d81846261245b7d94efbe5e723f9ac5364e2 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Tue, 19 Sep 2023 10:17:48 -0500 Subject: [PATCH 08/12] Remove unnecessary `Unpin` bound This commit addresses https://github.com/awslabs/smithy-rs/pull/2978#discussion_r1327600053 --- .../aws-smithy-async/src/future/pagination_stream.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs index bf37ade49e..4011576fc2 100644 --- a/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs +++ b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs @@ -29,10 +29,7 @@ impl PaginationStream { } /// Consumes and returns the next `Item` from this stream. - pub async fn next(&mut self) -> Option - where - Self: Unpin, - { + pub async fn next(&mut self) -> Option { self.0.next().await } From 43318e7c36a98656a46b6a7199e29b9211ca5e80 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Wed, 20 Sep 2023 14:23:26 -0500 Subject: [PATCH 09/12] Add convenince method `try_collect` This commit addresses https://github.com/awslabs/smithy-rs/pull/2978#discussion_r1327605578 --- .../src/future/pagination_stream.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs index 4011576fc2..19534476cf 100644 --- a/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs +++ b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs @@ -44,6 +44,11 @@ impl PaginationStream> { pub async fn try_next(&mut self) -> Result, E> { self.next().await.transpose() } + + /// Convenience method for `.collect::, _>()`. + pub async fn try_collect(self) -> Result, E> { + self.collect::, E>>().await + } } pin_project! { /// Utility to drive a stream with an async function and a channel. @@ -149,6 +154,11 @@ impl FnStream> { pub async fn try_next(&mut self) -> Result, E> { self.next().await.transpose() } + + /// Convenience method for `.collect::, _>()`. + pub async fn try_collect(self) -> Result, E> { + self.collect::, E>>().await + } } /// Utility wrapper to flatten paginated results @@ -325,7 +335,7 @@ mod test { struct Output { items: Vec, } - let stream = FnStream::new(|tx| { + let stream: FnStream> = FnStream::new(|tx| { Box::pin(async move { tx.send(Ok(Output { items: vec![1, 2, 3], @@ -343,7 +353,7 @@ mod test { Ok(vec![1, 2, 3, 4, 5, 6]), TryFlatMap::new(PaginationStream::new(stream)) .flat_map(|output| output.items.into_iter()) - .collect::, &str>>() + .try_collect() .await, ); } @@ -368,7 +378,7 @@ mod test { Err("bummer"), TryFlatMap::new(PaginationStream::new(stream)) .flat_map(|output| output.items.into_iter()) - .collect::, &str>>() + .try_collect() .await ) } From 1add37ff44a8a5538ee379e2763f6c035f5710e1 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Tue, 26 Sep 2023 21:55:53 -0500 Subject: [PATCH 10/12] Move `fn_stream` to its own module This commit addresses https://github.com/awslabs/smithy-rs/pull/2978#discussion_r1337313976 --- .../smithy/generators/PaginatorGenerator.kt | 2 +- .../src/future/pagination_stream.rs | 121 +---------------- .../src/future/pagination_stream/fn_stream.rs | 124 ++++++++++++++++++ .../aws-smithy-async/src/future/rendezvous.rs | 2 +- 4 files changed, 130 insertions(+), 119 deletions(-) create mode 100644 rust-runtime/aws-smithy-async/src/future/pagination_stream/fn_stream.rs diff --git a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/PaginatorGenerator.kt b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/PaginatorGenerator.kt index d0fef52931..6f9892b2cc 100644 --- a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/PaginatorGenerator.kt +++ b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/PaginatorGenerator.kt @@ -148,7 +148,7 @@ class PaginatorGenerator private constructor( let builder = self.builder; let handle = self.handle; #{runtime_plugin_init} - #{pagination_stream}::PaginationStream::new(#{pagination_stream}::FnStream::new(move |tx| #{Box}::pin(async move { + #{pagination_stream}::PaginationStream::new(#{pagination_stream}::fn_stream::FnStream::new(move |tx| #{Box}::pin(async move { // Build the input for the first time. If required fields are missing, this is where we'll produce an early error. let mut input = match builder.build().map_err(#{SdkError}::construction_failure) { #{Ok}(input) => input, diff --git a/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs index 19534476cf..3ee302fce8 100644 --- a/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs +++ b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs @@ -5,15 +5,12 @@ //! Types to support stream-like operations for paginators. -use crate::future::rendezvous; -use pin_project_lite::pin_project; -use std::fmt; -use std::future::poll_fn; +use crate::future::pagination_stream::collect::sealed::Collectable; use std::future::Future; use std::pin::Pin; -use std::task::{Context, Poll}; - pub mod collect; +pub mod fn_stream; +use fn_stream::FnStream; /// A wrapper around [`FnStream`]. /// @@ -34,7 +31,7 @@ impl PaginationStream { } /// Consumes this stream and gathers elements into a collection. - pub async fn collect>(self) -> T { + pub async fn collect>(self) -> T { self.0.collect().await } } @@ -50,116 +47,6 @@ impl PaginationStream> { self.collect::, E>>().await } } -pin_project! { - /// Utility to drive a stream with an async function and a channel. - /// - /// The closure is passed a reference to a `Sender` which acts as a rendezvous channel. Messages - /// sent to the sender will be emitted to the stream. Because the stream is 1-bounded, the function - /// will not proceed until the stream is read. - /// - /// This utility is used by generated paginators to generate a stream of paginated results. - /// - /// If `tx.send` returns an error, the function MUST return immediately. - /// - /// Note `FnStream` is only `Send` but not `Sync` because `generator` is a boxed future that - /// is `Send` and returns `()` as output when it is done. - /// - /// # Examples - /// ```no_run - /// # async fn docs() { - /// use aws_smithy_async::future::pagination_stream::FnStream; - /// let mut stream = FnStream::new(|tx| Box::pin(async move { - /// if let Err(_) = tx.send("Hello!").await { - /// return; - /// } - /// if let Err(_) = tx.send("Goodbye!").await { - /// return; - /// } - /// })); - /// assert_eq!(stream.collect::>().await, vec!["Hello!", "Goodbye!"]); - /// # } - pub struct FnStream { - #[pin] - rx: rendezvous::Receiver, - generator: Option + Send + 'static>>>, - } -} - -impl fmt::Debug for FnStream { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let item_typename = std::any::type_name::(); - write!(f, "FnStream<{item_typename}>") - } -} - -impl FnStream { - /// Creates a new function based stream driven by `generator`. - /// - /// For examples, see the documentation for [`FnStream`] - pub fn new(generator: T) -> Self - where - T: FnOnce(rendezvous::Sender) -> Pin + Send + 'static>>, - { - let (tx, rx) = rendezvous::channel::(); - Self { - rx, - generator: Some(Box::pin(generator(tx))), - } - } - - /// Consumes and returns the next `Item` from this stream. - pub async fn next(&mut self) -> Option - where - Self: Unpin, - { - let mut me = Pin::new(self); - poll_fn(|cx| me.as_mut().poll_next(cx)).await - } - - /// Attempts to pull out the next value of this stream, returning `None` if the stream is - /// exhausted. - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut me = self.project(); - match me.rx.poll_recv(cx) { - Poll::Ready(item) => Poll::Ready(item), - Poll::Pending => { - if let Some(generator) = me.generator { - if generator.as_mut().poll(cx).is_ready() { - // `generator` keeps writing items to `tx` and will not be `Poll::Ready` - // until it is done writing to `tx`. Once it is done, it returns `()` - // as output and is `Poll::Ready`, at which point we MUST NOT poll it again - // since doing so will cause a panic. - *me.generator = None; - } - } - Poll::Pending - } - } - } - - /// Consumes this stream and gathers elements into a collection. - pub async fn collect>(mut self) -> T { - let mut collection = T::initialize(); - while let Some(item) = self.next().await { - if !T::extend(&mut collection, item) { - break; - } - } - T::finalize(collection) - } -} - -impl FnStream> { - /// Yields the next item in the stream or returns an error if an error is encountered. - pub async fn try_next(&mut self) -> Result, E> { - self.next().await.transpose() - } - - /// Convenience method for `.collect::, _>()`. - pub async fn try_collect(self) -> Result, E> { - self.collect::, E>>().await - } -} /// Utility wrapper to flatten paginated results /// diff --git a/rust-runtime/aws-smithy-async/src/future/pagination_stream/fn_stream.rs b/rust-runtime/aws-smithy-async/src/future/pagination_stream/fn_stream.rs new file mode 100644 index 0000000000..260bfb9e87 --- /dev/null +++ b/rust-runtime/aws-smithy-async/src/future/pagination_stream/fn_stream.rs @@ -0,0 +1,124 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +//! Module to define utility to drive a stream with an async function and a channel. + +use crate::future::pagination_stream::collect::sealed::Collectable; +use crate::future::rendezvous; +use pin_project_lite::pin_project; +use std::fmt; +use std::future::poll_fn; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pin_project! { + /// The closure is passed a reference to a `Sender` which acts as a rendezvous channel. Messages + /// sent to the sender will be emitted to the stream. Because the stream is 1-bounded, the function + /// will not proceed until the stream is read. + /// + /// This utility is used by generated paginators to generate a stream of paginated results. + /// + /// If `tx.send` returns an error, the function MUST return immediately. + /// + /// Note `FnStream` is only `Send` but not `Sync` because `generator` is a boxed future that + /// is `Send` and returns `()` as output when it is done. + /// + /// # Examples + /// ```no_run + /// # async fn docs() { + /// use aws_smithy_async::future::pagination_stream::fn_stream::FnStream; + /// let mut stream = FnStream::new(|tx| Box::pin(async move { + /// if let Err(_) = tx.send("Hello!").await { + /// return; + /// } + /// if let Err(_) = tx.send("Goodbye!").await { + /// return; + /// } + /// })); + /// assert_eq!(stream.collect::>().await, vec!["Hello!", "Goodbye!"]); + /// # } + pub struct FnStream { + #[pin] + rx: rendezvous::Receiver, + generator: Option + Send + 'static>>>, + } +} + +impl fmt::Debug for FnStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let item_typename = std::any::type_name::(); + write!(f, "FnStream<{item_typename}>") + } +} + +impl FnStream { + /// Creates a new function based stream driven by `generator`. + /// + /// For examples, see the documentation for [`FnStream`] + pub fn new(generator: T) -> Self + where + T: FnOnce(rendezvous::Sender) -> Pin + Send + 'static>>, + { + let (tx, rx) = rendezvous::channel::(); + Self { + rx, + generator: Some(Box::pin(generator(tx))), + } + } + + /// Consumes and returns the next `Item` from this stream. + pub async fn next(&mut self) -> Option + where + Self: Unpin, + { + let mut me = Pin::new(self); + poll_fn(|cx| me.as_mut().poll_next(cx)).await + } + + /// Attempts to pull out the next value of this stream, returning `None` if the stream is + /// exhausted. + pub(crate) fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut me = self.project(); + match me.rx.poll_recv(cx) { + Poll::Ready(item) => Poll::Ready(item), + Poll::Pending => { + if let Some(generator) = me.generator { + if generator.as_mut().poll(cx).is_ready() { + // `generator` keeps writing items to `tx` and will not be `Poll::Ready` + // until it is done writing to `tx`. Once it is done, it returns `()` + // as output and is `Poll::Ready`, at which point we MUST NOT poll it again + // since doing so will cause a panic. + *me.generator = None; + } + } + Poll::Pending + } + } + } + + /// Consumes this stream and gathers elements into a collection. + pub async fn collect>(mut self) -> T { + let mut collection = T::initialize(); + while let Some(item) = self.next().await { + if !T::extend(&mut collection, item) { + break; + } + } + T::finalize(collection) + } +} + +impl FnStream> { + /// Yields the next item in the stream or returns an error if an error is encountered. + pub async fn try_next(&mut self) -> Result, E> { + self.next().await.transpose() + } + + /// Convenience method for `.collect::, _>()`. + pub async fn try_collect(self) -> Result, E> { + self.collect::, E>>().await + } +} diff --git a/rust-runtime/aws-smithy-async/src/future/rendezvous.rs b/rust-runtime/aws-smithy-async/src/future/rendezvous.rs index 52bb05ff66..78762866e2 100644 --- a/rust-runtime/aws-smithy-async/src/future/rendezvous.rs +++ b/rust-runtime/aws-smithy-async/src/future/rendezvous.rs @@ -10,7 +10,7 @@ //! and coordinate with the receiver. //! //! Rendezvous channels should be used with care—it's inherently easy to deadlock unless they're being -//! used from separate tasks or an a coroutine setup (e.g. [`crate::future::pagination_stream::FnStream`]) +//! used from separate tasks or an a coroutine setup (e.g. [`crate::future::pagination_stream::fn_stream::FnStream`]) use std::future::poll_fn; use std::sync::Arc; From b8f37ea96b17220c604fbb3ce4d79402efec5b99 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Tue, 26 Sep 2023 22:04:10 -0500 Subject: [PATCH 11/12] Provide docs for `PaginationStream` for end users This commit addresses https://github.com/awslabs/smithy-rs/pull/2978#discussion_r1337309986 --- .../src/future/pagination_stream.rs | 38 +++++++++++++++++-- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs index 3ee302fce8..f263685b49 100644 --- a/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs +++ b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -//! Types to support stream-like operations for paginators. +//! Provides types to support stream-like operations for paginators. use crate::future::pagination_stream::collect::sealed::Collectable; use std::future::Future; @@ -12,10 +12,40 @@ pub mod collect; pub mod fn_stream; use fn_stream::FnStream; -/// A wrapper around [`FnStream`]. +/// Stream specifically made to support paginators. /// -/// This type provides the same set of methods as [`FnStream`], but that is meant to be used -/// internally and not by external users. +/// `PaginationStream` provides two primary mechanisms for accessing stream of data. +/// 1. With [`.next()`](PaginationStream::next) (or [`try_next()`](PaginationStream::try_next)): +/// +/// ```no_run +/// # async fn docs() { +/// # use aws_smithy_async::future::pagination_stream::PaginationStream; +/// # fn operation_to_yield_paginator() -> PaginationStream { +/// # todo!() +/// # } +/// # struct Page; +/// let mut stream: PaginationStream = operation_to_yield_paginator(); +/// while let Some(page) = stream.next().await { +/// // process `page` +/// } +/// # } +/// ``` +/// 2. With [`.collect()`](PaginationStream::collect) (or [`try_collect()`](PaginationStream::try_collect)): +/// +/// ```no_run +/// # async fn docs() { +/// # use aws_smithy_async::future::pagination_stream::PaginationStream; +/// # fn operation_to_yield_paginator() -> PaginationStream { +/// # todo!() +/// # } +/// # struct Page; +/// let mut stream: PaginationStream = operation_to_yield_paginator(); +/// let result = stream.collect::>().await; +/// # } +/// ``` +/// +/// [`PaginationStream`] is implemented in terms of [`FnStream`], but the latter is meant to be +/// used internally and not by external users. #[derive(Debug)] pub struct PaginationStream(FnStream); From 7571c8ce6691355147c324c9fba429f3861af589 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Tue, 26 Sep 2023 22:23:35 -0500 Subject: [PATCH 12/12] Update CHANGELOG.next.toml This commit addresses https://github.com/awslabs/smithy-rs/pull/2978#discussion_r1337308673 --- CHANGELOG.next.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.next.toml b/CHANGELOG.next.toml index 0d8cf2a58e..bffedc818e 100644 --- a/CHANGELOG.next.toml +++ b/CHANGELOG.next.toml @@ -185,13 +185,13 @@ meta = { "breaking" = false, "tada" = false, "bug" = false, "target" = "client" author = "rcoh" [[aws-sdk-rust]] -message = "The `futures_core::stream::Stream` trait has been removed from public API. It should not affect usual SDK use cases. If your code uses paginators, you do not need to use the `Stream` trait or its exntension traits, but only the `next`, `try_next`, `collect` methods are supported on `PaginationStream`. Other stream operations made available through the trait or its extension traits are not supported, but we should be able to add them as needed in a backward compatible manner." +message = "The `futures_core::stream::Stream` trait has been removed from public API. It should not affect usual SDK use cases. If your code uses paginators, you do not need to use the `Stream` trait or its exntension traits, but only the `next`, `try_next`, `collect`, and `try_collect` methods are supported on `PaginationStream`. Other stream operations that were previously available through the trait or its extension traits can be added later in a backward compatible manner. Finally, `fn_stream` has been moved to be a child module of `pagination_stream`." references = ["smithy-rs#2978"] meta = { "breaking" = true, "tada" = false, "bug" = false } author = "ysaito1001" [[smithy-rs]] -message = "The `futures_core::stream::Stream` trait has been removed from public API. [`FnStream`](https://docs.rs/aws-smithy-async/latest/aws_smithy_async/future/pagination_stream/struct.FnStream.html) only supports `next`, `try_next`, and `collect` methods. [`TryFlatMap::flat_map`](https://docs.rs/aws-smithy-async/latest/aws_smithy_async/future/pagination_stream/struct.TryFlatMap.html#method.flat_map) returns [`PaginationStream`](https://docs.rs/aws-smithy-async/latest/aws_smithy_async/future/pagination_stream/struct.PaginationStream.html), which should be preferred to `FnStream` at an interface level. For stream operations previously made available through the trait or its extension traits, we should be able to add them as needed in a backward compatible manner." +message = "The `futures_core::stream::Stream` trait has been removed from public API. `FnStream` only supports `next`, `try_next`, `collect`, and `try_collect` methods. [`TryFlatMap::flat_map`](https://docs.rs/aws-smithy-async/latest/aws_smithy_async/future/pagination_stream/struct.TryFlatMap.html#method.flat_map) returns [`PaginationStream`](https://docs.rs/aws-smithy-async/latest/aws_smithy_async/future/pagination_stream/struct.PaginationStream.html), which should be preferred to `FnStream` at an interface level. Other stream operations that were previously available through the trait or its extension traits can be added later in a backward compatible manner. Finally, `fn_stream` has been moved to be a child module of `pagination_stream`." references = ["smithy-rs#2978"] meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "client" } author = "ysaito1001"