From 86e554dc6d00f0fbae524100c6c85aaceb525712 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Mon, 30 Oct 2023 15:06:54 -0500 Subject: [PATCH 01/11] Add inlined wrapper for event-stream receiver --- rust-runtime/inlineable/Cargo.toml | 2 +- .../inlineable/src/event_stream_receiver.rs | 30 +++++++++++++++++++ rust-runtime/inlineable/src/lib.rs | 2 ++ 3 files changed, 33 insertions(+), 1 deletion(-) create mode 100644 rust-runtime/inlineable/src/event_stream_receiver.rs diff --git a/rust-runtime/inlineable/Cargo.toml b/rust-runtime/inlineable/Cargo.toml index 6d0c099429..fd460dc08f 100644 --- a/rust-runtime/inlineable/Cargo.toml +++ b/rust-runtime/inlineable/Cargo.toml @@ -19,7 +19,7 @@ default = ["gated-tests"] [dependencies] async-trait = "0.1" -aws-smithy-http = { path = "../aws-smithy-http" } +aws-smithy-http = { path = "../aws-smithy-http", features = ["event-stream"] } aws-smithy-http-server = { path = "../aws-smithy-http-server" } aws-smithy-json = { path = "../aws-smithy-json" } aws-smithy-runtime-api = { path = "../aws-smithy-runtime-api", features = ["client"] } diff --git a/rust-runtime/inlineable/src/event_stream_receiver.rs b/rust-runtime/inlineable/src/event_stream_receiver.rs new file mode 100644 index 0000000000..055d1caf7e --- /dev/null +++ b/rust-runtime/inlineable/src/event_stream_receiver.rs @@ -0,0 +1,30 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +use aws_smithy_http::event_stream::Receiver; +use aws_smithy_runtime_api::box_error::BoxError; + +#[derive(Debug)] +#[non_exhaustive] +/// Receives messages out of an Event Stream. +pub struct EventStreamReceiver { + inner: Receiver, +} + +impl EventStreamReceiver { + pub(crate) fn new(inner: Receiver) -> Self { + Self { inner } + } + + /// Asynchronously tries to receive a message from the stream. If the stream has ended, + /// it returns an `Ok(None)`. If there is an error, such as failing to unmarshall a message in + /// the stream, it returns an [`BoxError`]. + pub async fn recv(&mut self) -> Result, BoxError> + where + E: std::error::Error + Send + Sync + 'static, + { + self.inner.recv().await.map_err(Into::into) + } +} diff --git a/rust-runtime/inlineable/src/lib.rs b/rust-runtime/inlineable/src/lib.rs index 01490bcee5..8e441aa525 100644 --- a/rust-runtime/inlineable/src/lib.rs +++ b/rust-runtime/inlineable/src/lib.rs @@ -13,6 +13,8 @@ mod client_idempotency_token; mod constrained; #[allow(dead_code)] mod ec2_query_errors; +#[allow(unused)] +mod event_stream_receiver; #[allow(dead_code)] mod idempotency_token; #[allow(dead_code)] From 44f96399ba2976d85a54b7ea52f3560911580613 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Mon, 30 Oct 2023 15:54:11 -0500 Subject: [PATCH 02/11] Update client codegen to use event-stream receiver wrapper --- .../smithy/EventStreamSymbolProviderTest.kt | 2 +- .../rust/codegen/core/rustlang/CargoDependency.kt | 7 +++++++ .../core/smithy/EventStreamSymbolProvider.kt | 2 +- .../rust/codegen/core/smithy/RuntimeType.kt | 3 +++ .../generators/http/HttpBindingGenerator.kt | 15 +++++++++++++-- 5 files changed, 25 insertions(+), 4 deletions(-) diff --git a/codegen-client/src/test/kotlin/software/amazon/smithy/rust/codegen/client/smithy/EventStreamSymbolProviderTest.kt b/codegen-client/src/test/kotlin/software/amazon/smithy/rust/codegen/client/smithy/EventStreamSymbolProviderTest.kt index e9e26724d9..84d7b61982 100644 --- a/codegen-client/src/test/kotlin/software/amazon/smithy/rust/codegen/client/smithy/EventStreamSymbolProviderTest.kt +++ b/codegen-client/src/test/kotlin/software/amazon/smithy/rust/codegen/client/smithy/EventStreamSymbolProviderTest.kt @@ -69,7 +69,7 @@ class EventStreamSymbolProviderTest { listOf(someStream, someStreamError), ) outputType shouldBe RustType.Application( - RuntimeType.eventStreamReceiver(TestRuntimeConfig).toSymbol().rustType(), + RuntimeType.eventStreamReceiverWrapper(TestRuntimeConfig).toSymbol().rustType(), listOf(someStream, someStreamError), ) } diff --git a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/rustlang/CargoDependency.kt b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/rustlang/CargoDependency.kt index 56c9003b83..7a93f542e3 100644 --- a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/rustlang/CargoDependency.kt +++ b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/rustlang/CargoDependency.kt @@ -89,6 +89,13 @@ class InlineDependency( private fun forInlineableRustFile(name: String, vararg additionalDependencies: RustDependency) = forRustFile(RustModule.private(name), "/inlineable/src/$name.rs", *additionalDependencies) + fun eventStreamReceiver(runtimeConfig: RuntimeConfig) = + forInlineableRustFile( + "event_stream_receiver", + CargoDependency.smithyHttp(runtimeConfig), + CargoDependency.smithyRuntimeApi(runtimeConfig), + ) + fun defaultAuthPlugin(runtimeConfig: RuntimeConfig) = forInlineableRustFile("auth_plugin", CargoDependency.smithyRuntimeApi(runtimeConfig)) fun jsonErrors(runtimeConfig: RuntimeConfig) = diff --git a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/EventStreamSymbolProvider.kt b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/EventStreamSymbolProvider.kt index fdb6e35a83..4ade25f166 100644 --- a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/EventStreamSymbolProvider.kt +++ b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/EventStreamSymbolProvider.kt @@ -53,7 +53,7 @@ class EventStreamSymbolProvider( (shape.isOutputEventStream(model) && target == CodegenTarget.SERVER) val outer = when (isSender) { true -> RuntimeType.eventStreamSender(runtimeConfig).toSymbol().rustType() - else -> RuntimeType.eventStreamReceiver(runtimeConfig).toSymbol().rustType() + else -> RuntimeType.eventStreamReceiverWrapper(runtimeConfig).toSymbol().rustType() } val rustType = RustType.Application(outer, listOf(innerT, errorT)) return initial.toBuilder() diff --git a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/RuntimeType.kt b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/RuntimeType.kt index f728db76c5..4d83073e4d 100644 --- a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/RuntimeType.kt +++ b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/RuntimeType.kt @@ -402,6 +402,9 @@ data class RuntimeType(val path: String, val dependency: RustDependency? = null) fun eventStreamReceiver(runtimeConfig: RuntimeConfig): RuntimeType = smithyHttp(runtimeConfig).resolve("event_stream::Receiver") + fun eventStreamReceiverWrapper(runtimeConfig: RuntimeConfig) = + RuntimeType.forInlineDependency(InlineDependency.eventStreamReceiver(runtimeConfig)).resolve("EventStreamReceiver") + fun eventStreamSender(runtimeConfig: RuntimeConfig): RuntimeType = smithyHttp(runtimeConfig).resolve("event_stream::EventStreamSender") diff --git a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/generators/http/HttpBindingGenerator.kt b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/generators/http/HttpBindingGenerator.kt index 7268dfd12a..62aac7bb00 100644 --- a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/generators/http/HttpBindingGenerator.kt +++ b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/generators/http/HttpBindingGenerator.kt @@ -38,6 +38,7 @@ import software.amazon.smithy.rust.codegen.core.rustlang.rustBlockTemplate import software.amazon.smithy.rust.codegen.core.rustlang.rustTemplate import software.amazon.smithy.rust.codegen.core.rustlang.stripOuter import software.amazon.smithy.rust.codegen.core.rustlang.withBlock +import software.amazon.smithy.rust.codegen.core.rustlang.writable import software.amazon.smithy.rust.codegen.core.smithy.CodegenContext import software.amazon.smithy.rust.codegen.core.smithy.CodegenTarget import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType @@ -254,15 +255,25 @@ class HttpBindingGenerator( operationShape, targetShape, ).render() - val receiver = outputT.rustType().qualifiedName() rustTemplate( """ let unmarshaller = #{unmarshallerConstructorFn}(); let body = std::mem::replace(body, #{SdkBody}::taken()); - Ok($receiver::new(unmarshaller, body)) + Ok(#{receiver:W}) """, "SdkBody" to RuntimeType.sdkBody(runtimeConfig), "unmarshallerConstructorFn" to unmarshallerConstructorFn, + "receiver" to writable { + if (codegenTarget == CodegenTarget.SERVER) { + rust("${outputT.rustType().qualifiedName()}::new(unmarshaller, body)") + } else { + rustTemplate( + "#{Wrapper}::new(#{Receiver}::new(unmarshaller, body))", + "Wrapper" to RuntimeType.eventStreamReceiverWrapper(runtimeConfig), + "Receiver" to RuntimeType.eventStreamReceiver(runtimeConfig), + ) + } + }, ) } From 58f466a8b1758f72a45866edf80499f267fcb480 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Mon, 30 Oct 2023 20:30:52 -0500 Subject: [PATCH 03/11] Bifurcate receiver type based on codegen target --- .../rust/codegen/core/smithy/EventStreamSymbolProvider.kt | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/EventStreamSymbolProvider.kt b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/EventStreamSymbolProvider.kt index 4ade25f166..f2320e63ba 100644 --- a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/EventStreamSymbolProvider.kt +++ b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/EventStreamSymbolProvider.kt @@ -53,7 +53,13 @@ class EventStreamSymbolProvider( (shape.isOutputEventStream(model) && target == CodegenTarget.SERVER) val outer = when (isSender) { true -> RuntimeType.eventStreamSender(runtimeConfig).toSymbol().rustType() - else -> RuntimeType.eventStreamReceiverWrapper(runtimeConfig).toSymbol().rustType() + else -> { + if (target == CodegenTarget.SERVER) { + RuntimeType.eventStreamReceiver(runtimeConfig).toSymbol().rustType() + } else { + RuntimeType.eventStreamReceiverWrapper(runtimeConfig).toSymbol().rustType() + } + } } val rustType = RustType.Application(outer, listOf(innerT, errorT)) return initial.toBuilder() From b3f4b86f5d6dd70c3bdb44a75f6fd8c34a9430a4 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Mon, 30 Oct 2023 21:32:49 -0500 Subject: [PATCH 04/11] Rename `EventStreamReceiver` to `EventReceiver` This commit addresses https://github.com/awslabs/smithy-rs/pull/3114#discussion_r1376912959 --- .../client/smithy/EventStreamSymbolProviderTest.kt | 2 +- .../rust/codegen/core/rustlang/CargoDependency.kt | 4 ++-- .../codegen/core/smithy/EventStreamSymbolProvider.kt | 2 +- .../smithy/rust/codegen/core/smithy/RuntimeType.kt | 4 ++-- .../smithy/generators/http/HttpBindingGenerator.kt | 4 ++-- .../{event_stream_receiver.rs => event_receiver.rs} | 10 +++++----- rust-runtime/inlineable/src/lib.rs | 2 +- 7 files changed, 14 insertions(+), 14 deletions(-) rename rust-runtime/inlineable/src/{event_stream_receiver.rs => event_receiver.rs} (65%) diff --git a/codegen-client/src/test/kotlin/software/amazon/smithy/rust/codegen/client/smithy/EventStreamSymbolProviderTest.kt b/codegen-client/src/test/kotlin/software/amazon/smithy/rust/codegen/client/smithy/EventStreamSymbolProviderTest.kt index 84d7b61982..6b8c28826c 100644 --- a/codegen-client/src/test/kotlin/software/amazon/smithy/rust/codegen/client/smithy/EventStreamSymbolProviderTest.kt +++ b/codegen-client/src/test/kotlin/software/amazon/smithy/rust/codegen/client/smithy/EventStreamSymbolProviderTest.kt @@ -69,7 +69,7 @@ class EventStreamSymbolProviderTest { listOf(someStream, someStreamError), ) outputType shouldBe RustType.Application( - RuntimeType.eventStreamReceiverWrapper(TestRuntimeConfig).toSymbol().rustType(), + RuntimeType.eventReceiver(TestRuntimeConfig).toSymbol().rustType(), listOf(someStream, someStreamError), ) } diff --git a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/rustlang/CargoDependency.kt b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/rustlang/CargoDependency.kt index 7a93f542e3..60b2795d33 100644 --- a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/rustlang/CargoDependency.kt +++ b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/rustlang/CargoDependency.kt @@ -89,9 +89,9 @@ class InlineDependency( private fun forInlineableRustFile(name: String, vararg additionalDependencies: RustDependency) = forRustFile(RustModule.private(name), "/inlineable/src/$name.rs", *additionalDependencies) - fun eventStreamReceiver(runtimeConfig: RuntimeConfig) = + fun eventReceiver(runtimeConfig: RuntimeConfig) = forInlineableRustFile( - "event_stream_receiver", + "event_receiver", CargoDependency.smithyHttp(runtimeConfig), CargoDependency.smithyRuntimeApi(runtimeConfig), ) diff --git a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/EventStreamSymbolProvider.kt b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/EventStreamSymbolProvider.kt index f2320e63ba..97ef843fe7 100644 --- a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/EventStreamSymbolProvider.kt +++ b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/EventStreamSymbolProvider.kt @@ -57,7 +57,7 @@ class EventStreamSymbolProvider( if (target == CodegenTarget.SERVER) { RuntimeType.eventStreamReceiver(runtimeConfig).toSymbol().rustType() } else { - RuntimeType.eventStreamReceiverWrapper(runtimeConfig).toSymbol().rustType() + RuntimeType.eventReceiver(runtimeConfig).toSymbol().rustType() } } } diff --git a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/RuntimeType.kt b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/RuntimeType.kt index 0ffff75221..ba6bcb618c 100644 --- a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/RuntimeType.kt +++ b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/RuntimeType.kt @@ -405,8 +405,8 @@ data class RuntimeType(val path: String, val dependency: RustDependency? = null) fun eventStreamReceiver(runtimeConfig: RuntimeConfig): RuntimeType = smithyHttp(runtimeConfig).resolve("event_stream::Receiver") - fun eventStreamReceiverWrapper(runtimeConfig: RuntimeConfig) = - RuntimeType.forInlineDependency(InlineDependency.eventStreamReceiver(runtimeConfig)).resolve("EventStreamReceiver") + fun eventReceiver(runtimeConfig: RuntimeConfig) = + forInlineDependency(InlineDependency.eventReceiver(runtimeConfig)).resolve("EventReceiver") fun eventStreamSender(runtimeConfig: RuntimeConfig): RuntimeType = smithyHttp(runtimeConfig).resolve("event_stream::EventStreamSender") diff --git a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/generators/http/HttpBindingGenerator.kt b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/generators/http/HttpBindingGenerator.kt index 62aac7bb00..2491d07c0e 100644 --- a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/generators/http/HttpBindingGenerator.kt +++ b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/generators/http/HttpBindingGenerator.kt @@ -268,8 +268,8 @@ class HttpBindingGenerator( rust("${outputT.rustType().qualifiedName()}::new(unmarshaller, body)") } else { rustTemplate( - "#{Wrapper}::new(#{Receiver}::new(unmarshaller, body))", - "Wrapper" to RuntimeType.eventStreamReceiverWrapper(runtimeConfig), + "#{EventReceiver}::new(#{Receiver}::new(unmarshaller, body))", + "EventReceiver" to RuntimeType.eventReceiver(runtimeConfig), "Receiver" to RuntimeType.eventStreamReceiver(runtimeConfig), ) } diff --git a/rust-runtime/inlineable/src/event_stream_receiver.rs b/rust-runtime/inlineable/src/event_receiver.rs similarity index 65% rename from rust-runtime/inlineable/src/event_stream_receiver.rs rename to rust-runtime/inlineable/src/event_receiver.rs index 055d1caf7e..0dc40c134e 100644 --- a/rust-runtime/inlineable/src/event_stream_receiver.rs +++ b/rust-runtime/inlineable/src/event_receiver.rs @@ -8,18 +8,18 @@ use aws_smithy_runtime_api::box_error::BoxError; #[derive(Debug)] #[non_exhaustive] -/// Receives messages out of an Event Stream. -pub struct EventStreamReceiver { +/// Receives unmarshalled events at a time out of an Event Stream. +pub struct EventReceiver { inner: Receiver, } -impl EventStreamReceiver { +impl EventReceiver { pub(crate) fn new(inner: Receiver) -> Self { Self { inner } } - /// Asynchronously tries to receive a message from the stream. If the stream has ended, - /// it returns an `Ok(None)`. If there is an error, such as failing to unmarshall a message in + /// Asynchronously tries to receive an event from the stream. If the stream has ended, it + /// returns an `Ok(None)`. If there is an error, such as failing to unmarshall a message in /// the stream, it returns an [`BoxError`]. pub async fn recv(&mut self) -> Result, BoxError> where diff --git a/rust-runtime/inlineable/src/lib.rs b/rust-runtime/inlineable/src/lib.rs index 8e441aa525..10f9ed7c2e 100644 --- a/rust-runtime/inlineable/src/lib.rs +++ b/rust-runtime/inlineable/src/lib.rs @@ -14,7 +14,7 @@ mod constrained; #[allow(dead_code)] mod ec2_query_errors; #[allow(unused)] -mod event_stream_receiver; +mod event_receiver; #[allow(dead_code)] mod idempotency_token; #[allow(dead_code)] From ad8bdb82c065f32f56415fac1ea0a68471e2fbaa Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Mon, 30 Oct 2023 21:35:06 -0500 Subject: [PATCH 05/11] Remove unnecessary #[non_exhaustive] --- rust-runtime/inlineable/src/event_receiver.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rust-runtime/inlineable/src/event_receiver.rs b/rust-runtime/inlineable/src/event_receiver.rs index 0dc40c134e..d5ced56171 100644 --- a/rust-runtime/inlineable/src/event_receiver.rs +++ b/rust-runtime/inlineable/src/event_receiver.rs @@ -7,7 +7,6 @@ use aws_smithy_http::event_stream::Receiver; use aws_smithy_runtime_api::box_error::BoxError; #[derive(Debug)] -#[non_exhaustive] /// Receives unmarshalled events at a time out of an Event Stream. pub struct EventReceiver { inner: Receiver, From db20c623933956fd21064a70faeeccbfa0af307e Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Tue, 31 Oct 2023 10:33:16 -0500 Subject: [PATCH 06/11] Update transcribestreaming test to use `BoxError` --- .../transcribestreaming/tests/test.rs | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/aws/sdk/integration-tests/transcribestreaming/tests/test.rs b/aws/sdk/integration-tests/transcribestreaming/tests/test.rs index fce762d82b..520fd78f68 100644 --- a/aws/sdk/integration-tests/transcribestreaming/tests/test.rs +++ b/aws/sdk/integration-tests/transcribestreaming/tests/test.rs @@ -5,10 +5,9 @@ use async_stream::stream; use aws_sdk_transcribestreaming::config::{Credentials, Region}; -use aws_sdk_transcribestreaming::error::SdkError; use aws_sdk_transcribestreaming::operation::start_stream_transcription::StartStreamTranscriptionOutput; use aws_sdk_transcribestreaming::primitives::Blob; -use aws_sdk_transcribestreaming::types::error::{AudioStreamError, TranscriptResultStreamError}; +use aws_sdk_transcribestreaming::types::error::AudioStreamError; use aws_sdk_transcribestreaming::types::{ AudioEvent, AudioStream, LanguageCode, MediaEncoding, TranscriptResultStream, }; @@ -75,15 +74,10 @@ async fn test_error() { start_request("us-east-1", include_str!("error.json"), input_stream).await; match output.transcript_result_stream.recv().await { - Err(SdkError::ServiceError(context)) => match context.err() { - TranscriptResultStreamError::BadRequestException(err) => { - assert_eq!( - Some("A complete signal was sent without the preceding empty frame."), - err.message() - ); - } - otherwise => panic!("Expected BadRequestException, got: {:?}", otherwise), - }, + Err(err) => { + assert!(format!("{err:?}") + .contains("A complete signal was sent without the preceding empty frame."),) + } otherwise => panic!("Expected BadRequestException, got: {:?}", otherwise), } From 1aa6a8da7d2b7669ba3ab7179a1fd72aadc03162 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Tue, 31 Oct 2023 10:37:15 -0500 Subject: [PATCH 07/11] Update transcribe canary to use `BoxError` --- tools/ci-cdk/canary-lambda/src/latest/transcribe_canary.rs | 7 ++++++- tools/ci-cdk/canary-runner/src/build_bundle.rs | 3 ++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/tools/ci-cdk/canary-lambda/src/latest/transcribe_canary.rs b/tools/ci-cdk/canary-lambda/src/latest/transcribe_canary.rs index 8f6420fc1b..376cf23494 100644 --- a/tools/ci-cdk/canary-lambda/src/latest/transcribe_canary.rs +++ b/tools/ci-cdk/canary-lambda/src/latest/transcribe_canary.rs @@ -48,7 +48,12 @@ pub async fn transcribe_canary( .await?; let mut full_message = String::new(); - while let Some(event) = output.transcript_result_stream.recv().await? { + while let Some(event) = output + .transcript_result_stream + .recv() + .await + .map_err(|e| anyhow::Error::msg(format!("failed to receive event: {e:#?}")))? + { match event { TranscriptResultStream::TranscriptEvent(transcript_event) => { let transcript = transcript_event.transcript.unwrap(); diff --git a/tools/ci-cdk/canary-runner/src/build_bundle.rs b/tools/ci-cdk/canary-runner/src/build_bundle.rs index 00b628f968..52c2a63950 100644 --- a/tools/ci-cdk/canary-runner/src/build_bundle.rs +++ b/tools/ci-cdk/canary-runner/src/build_bundle.rs @@ -66,7 +66,8 @@ const REQUIRED_SDK_CRATES: &[&str] = &[ // The elements in this `Vec` should be sorted in an ascending order by the release date. lazy_static! { static ref NOTABLE_SDK_RELEASE_TAGS: Vec = vec![ - ReleaseTag::from_str("release-2023-10-26").unwrap(), // last version before addition of Sigv4a MRAP test + // last version before addition of Sigv4a MRAP test and a breaking change for event receiver error type + ReleaseTag::from_str("release-2023-10-26").unwrap(), ]; } From 7c08c59764455127f16fdd195c14e51ac7b066b4 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Tue, 31 Oct 2023 12:51:46 -0500 Subject: [PATCH 08/11] Update CHANGELOG.next.toml --- CHANGELOG.next.toml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/CHANGELOG.next.toml b/CHANGELOG.next.toml index 267d530fd5..6f915dda4d 100644 --- a/CHANGELOG.next.toml +++ b/CHANGELOG.next.toml @@ -512,3 +512,19 @@ message = "Service builder initialization now takes in a `${serviceName}Config` references = ["smithy-rs#3095", "smithy-rs#3096"] meta = { "breaking" = true, "tada" = false, "bug" = true, "target" = "server" } author = "david-perez" + +[[smithy-rs]] +message = """ +An operation output that supports receiving events from stream now provides a new-type wrapping `aws_smithy_http::event_stream::receiver::Receiver`. You can continue using `.recv()` method on it, but the error type returned by that method is now opaque as opposed to [`SdkError`](https://docs.rs/aws-smithy-http/0.57.0/aws_smithy_http/event_stream/struct.Receiver.html#method.recv); It is only an informative error instead of an actionable one you can match on. +""" +references = ["smithy-rs#3100", "smithy-rs#3114"] +meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "client" } +author = "ysaito1001" + +[[aws-sdk-rust]] +message = """ +An operation output that supports receiving events from stream now provides a new-type wrapping `aws_smithy_http::event_stream::receiver::Receiver`. You can continue using `.recv()` method on it, but the error type returned by that method is now opaque as opposed to [`SdkError`](https://docs.rs/aws-smithy-http/0.57.0/aws_smithy_http/event_stream/struct.Receiver.html#method.recv); It is only an informative error instead of an actionable one you can match on. +""" +references = ["smithy-rs#3100", "smithy-rs#3114"] +meta = { "breaking" = true, "tada" = false, "bug" = false } +author = "ysaito1" From 1a1f226bfcd8b276b3b48974ada2087f77755ac7 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Mon, 6 Nov 2023 16:16:11 -0600 Subject: [PATCH 09/11] Make `.recv` return `SdkError` instead of `BoxError` --- CHANGELOG.next.toml | 4 ++-- .../transcribestreaming/tests/test.rs | 16 +++++++++----- .../codegen/core/rustlang/CargoDependency.kt | 1 + .../aws-smithy-http/src/event_stream.rs | 2 +- .../src/event_stream/receiver.rs | 19 +---------------- .../aws-smithy-types/src/event_stream.rs | 21 ++++++++++++++++++- rust-runtime/inlineable/src/event_receiver.rs | 15 +++++++------ .../src/latest/transcribe_canary.rs | 7 +------ .../ci-cdk/canary-runner/src/build_bundle.rs | 2 +- 9 files changed, 45 insertions(+), 42 deletions(-) diff --git a/CHANGELOG.next.toml b/CHANGELOG.next.toml index e8ea139197..540d5cac80 100644 --- a/CHANGELOG.next.toml +++ b/CHANGELOG.next.toml @@ -33,7 +33,7 @@ author = "ysaito1001" [[smithy-rs]] message = """ -An operation output that supports receiving events from stream now provides a new-type wrapping `aws_smithy_http::event_stream::receiver::Receiver`. You can continue using `.recv()` method on it, but the error type returned by that method is now opaque as opposed to [`SdkError`](https://docs.rs/aws-smithy-http/0.57.0/aws_smithy_http/event_stream/struct.Receiver.html#method.recv); It is only an informative error instead of an actionable one you can match on. +An operation output that supports receiving events from stream now provides a new-type wrapping `aws_smithy_http::event_stream::receiver::Receiver`. The new-type supports the `.recv()` method whose signature is the same as [`aws_smithy_http::event_stream::receiver::Receiver::recv`](https://docs.rs/aws-smithy-http/0.57.0/aws_smithy_http/event_stream/struct.Receiver.html#method.recv). """ references = ["smithy-rs#3100", "smithy-rs#3114"] meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "client" } @@ -41,7 +41,7 @@ author = "ysaito1001" [[aws-sdk-rust]] message = """ -An operation output that supports receiving events from stream now provides a new-type wrapping `aws_smithy_http::event_stream::receiver::Receiver`. You can continue using `.recv()` method on it, but the error type returned by that method is now opaque as opposed to [`SdkError`](https://docs.rs/aws-smithy-http/0.57.0/aws_smithy_http/event_stream/struct.Receiver.html#method.recv); It is only an informative error instead of an actionable one you can match on. +An operation output that supports receiving events from stream now provides a new-type wrapping `aws_smithy_http::event_stream::receiver::Receiver`. The new-type supports the `.recv()` method whose signature is the same as [`aws_smithy_http::event_stream::receiver::Receiver::recv`](https://docs.rs/aws-smithy-http/0.57.0/aws_smithy_http/event_stream/struct.Receiver.html#method.recv). """ references = ["smithy-rs#3100", "smithy-rs#3114"] meta = { "breaking" = true, "tada" = false, "bug" = false } diff --git a/aws/sdk/integration-tests/transcribestreaming/tests/test.rs b/aws/sdk/integration-tests/transcribestreaming/tests/test.rs index 244e93c71d..5178bd2ae9 100644 --- a/aws/sdk/integration-tests/transcribestreaming/tests/test.rs +++ b/aws/sdk/integration-tests/transcribestreaming/tests/test.rs @@ -5,10 +5,11 @@ use async_stream::stream; use aws_sdk_transcribestreaming::config::{Credentials, Region}; +use aws_sdk_transcribestreaming::error::SdkError; use aws_sdk_transcribestreaming::operation::start_stream_transcription::StartStreamTranscriptionOutput; use aws_sdk_transcribestreaming::primitives::event_stream::{HeaderValue, Message}; use aws_sdk_transcribestreaming::primitives::Blob; -use aws_sdk_transcribestreaming::types::error::AudioStreamError; +use aws_sdk_transcribestreaming::types::error::{AudioStreamError, TranscriptResultStreamError}; use aws_sdk_transcribestreaming::types::{ AudioEvent, AudioStream, LanguageCode, MediaEncoding, TranscriptResultStream, }; @@ -75,10 +76,15 @@ async fn test_error() { start_request("us-east-1", include_str!("error.json"), input_stream).await; match output.transcript_result_stream.recv().await { - Err(err) => { - assert!(format!("{err:?}") - .contains("A complete signal was sent without the preceding empty frame."),) - } + Err(SdkError::ServiceError(context)) => match context.err() { + TranscriptResultStreamError::BadRequestException(err) => { + assert_eq!( + Some("A complete signal was sent without the preceding empty frame."), + err.message() + ); + } + otherwise => panic!("Expected BadRequestException, got: {:?}", otherwise), + }, otherwise => panic!("Expected BadRequestException, got: {:?}", otherwise), } diff --git a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/rustlang/CargoDependency.kt b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/rustlang/CargoDependency.kt index b519617ebf..79c215009a 100644 --- a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/rustlang/CargoDependency.kt +++ b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/rustlang/CargoDependency.kt @@ -94,6 +94,7 @@ class InlineDependency( "event_receiver", CargoDependency.smithyHttp(runtimeConfig), CargoDependency.smithyRuntimeApi(runtimeConfig), + CargoDependency.smithyTypes(runtimeConfig), ) fun defaultAuthPlugin(runtimeConfig: RuntimeConfig) = forInlineableRustFile("auth_plugin", CargoDependency.smithyRuntimeApi(runtimeConfig)) diff --git a/rust-runtime/aws-smithy-http/src/event_stream.rs b/rust-runtime/aws-smithy-http/src/event_stream.rs index 0b95bb2d1e..3d4bab78ff 100644 --- a/rust-runtime/aws-smithy-http/src/event_stream.rs +++ b/rust-runtime/aws-smithy-http/src/event_stream.rs @@ -17,4 +17,4 @@ pub type BoxError = Box; pub use sender::{EventStreamSender, MessageStreamAdapter, MessageStreamError}; #[doc(inline)] -pub use receiver::{RawMessage, Receiver, ReceiverError}; +pub use receiver::{Receiver, ReceiverError}; diff --git a/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs b/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs index 69e1c4381b..49aebdb4ac 100644 --- a/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs +++ b/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs @@ -8,7 +8,7 @@ use aws_smithy_eventstream::frame::{ }; use aws_smithy_runtime_api::client::result::{ConnectorError, SdkError}; use aws_smithy_types::body::SdkBody; -use aws_smithy_types::event_stream::Message; +use aws_smithy_types::event_stream::{Message, RawMessage}; use bytes::Buf; use bytes::Bytes; use bytes_utils::SegmentedBuf; @@ -87,23 +87,6 @@ impl RecvBuf { } } -/// Raw message from a [`Receiver`] when a [`SdkError::ResponseError`] is returned. -#[derive(Debug)] -#[non_exhaustive] -pub enum RawMessage { - /// Message was decoded into a valid frame, but failed to unmarshall into a modeled type. - Decoded(Message), - /// Message failed to be decoded into a valid frame. The raw bytes may not be available in the - /// case where decoding consumed the buffer. - Invalid(Option), -} - -impl RawMessage { - pub(crate) fn invalid(buf: &mut SegmentedBuf) -> Self { - Self::Invalid(Some(buf.copy_to_bytes(buf.remaining()))) - } -} - #[derive(Debug)] enum ReceiverErrorKind { /// The stream ended before a complete message frame was received. diff --git a/rust-runtime/aws-smithy-types/src/event_stream.rs b/rust-runtime/aws-smithy-types/src/event_stream.rs index a63c44e8ed..126d7f96dc 100644 --- a/rust-runtime/aws-smithy-types/src/event_stream.rs +++ b/rust-runtime/aws-smithy-types/src/event_stream.rs @@ -6,7 +6,8 @@ //! Types relevant to event stream serialization/deserialization use crate::str_bytes::StrBytes; -use bytes::Bytes; +use bytes::{Buf, Bytes}; +use bytes_utils::SegmentedBuf; mod value { use crate::str_bytes::StrBytes; @@ -183,3 +184,21 @@ impl Message { &self.payload } } + +/// Raw message from an event stream receiver when a [`SdkError::ResponseError`] is returned. +#[derive(Debug)] +#[non_exhaustive] +pub enum RawMessage { + /// Message was decoded into a valid frame, but failed to unmarshall into a modeled type. + Decoded(Message), + /// Message failed to be decoded into a valid frame. The raw bytes may not be available in the + /// case where decoding consumed the buffer. + Invalid(Option), +} + +impl RawMessage { + /// Creates a `RawMessage` for failure to decode a message into a valid frame. + pub fn invalid(buf: &mut SegmentedBuf) -> Self { + Self::Invalid(Some(buf.copy_to_bytes(buf.remaining()))) + } +} diff --git a/rust-runtime/inlineable/src/event_receiver.rs b/rust-runtime/inlineable/src/event_receiver.rs index d5ced56171..50b78f7aa3 100644 --- a/rust-runtime/inlineable/src/event_receiver.rs +++ b/rust-runtime/inlineable/src/event_receiver.rs @@ -4,7 +4,8 @@ */ use aws_smithy_http::event_stream::Receiver; -use aws_smithy_runtime_api::box_error::BoxError; +use aws_smithy_runtime_api::client::result::SdkError; +use aws_smithy_types::event_stream::RawMessage; #[derive(Debug)] /// Receives unmarshalled events at a time out of an Event Stream. @@ -18,12 +19,10 @@ impl EventReceiver { } /// Asynchronously tries to receive an event from the stream. If the stream has ended, it - /// returns an `Ok(None)`. If there is an error, such as failing to unmarshall a message in - /// the stream, it returns an [`BoxError`]. - pub async fn recv(&mut self) -> Result, BoxError> - where - E: std::error::Error + Send + Sync + 'static, - { - self.inner.recv().await.map_err(Into::into) + /// returns an `Ok(None)`. If there is a transport layer error, it will return + /// `Err(SdkError::DispatchFailure)`. Service-modeled errors will be a part of the returned + /// messages. + pub async fn recv(&mut self) -> Result, SdkError> { + self.inner.recv().await } } diff --git a/tools/ci-cdk/canary-lambda/src/latest/transcribe_canary.rs b/tools/ci-cdk/canary-lambda/src/latest/transcribe_canary.rs index 376cf23494..8f6420fc1b 100644 --- a/tools/ci-cdk/canary-lambda/src/latest/transcribe_canary.rs +++ b/tools/ci-cdk/canary-lambda/src/latest/transcribe_canary.rs @@ -48,12 +48,7 @@ pub async fn transcribe_canary( .await?; let mut full_message = String::new(); - while let Some(event) = output - .transcript_result_stream - .recv() - .await - .map_err(|e| anyhow::Error::msg(format!("failed to receive event: {e:#?}")))? - { + while let Some(event) = output.transcript_result_stream.recv().await? { match event { TranscriptResultStream::TranscriptEvent(transcript_event) => { let transcript = transcript_event.transcript.unwrap(); diff --git a/tools/ci-cdk/canary-runner/src/build_bundle.rs b/tools/ci-cdk/canary-runner/src/build_bundle.rs index 52c2a63950..4da1e0b4c9 100644 --- a/tools/ci-cdk/canary-runner/src/build_bundle.rs +++ b/tools/ci-cdk/canary-runner/src/build_bundle.rs @@ -66,7 +66,7 @@ const REQUIRED_SDK_CRATES: &[&str] = &[ // The elements in this `Vec` should be sorted in an ascending order by the release date. lazy_static! { static ref NOTABLE_SDK_RELEASE_TAGS: Vec = vec![ - // last version before addition of Sigv4a MRAP test and a breaking change for event receiver error type + // last version before addition of Sigv4a MRAP test ReleaseTag::from_str("release-2023-10-26").unwrap(), ]; } From 85163efb43dec45507665f79b46a98e8bac29db7 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Mon, 6 Nov 2023 16:30:12 -0600 Subject: [PATCH 10/11] Do not mention a type from `aws-smithy-runtime-api` --- rust-runtime/aws-smithy-types/src/event_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust-runtime/aws-smithy-types/src/event_stream.rs b/rust-runtime/aws-smithy-types/src/event_stream.rs index 126d7f96dc..1491b43a5b 100644 --- a/rust-runtime/aws-smithy-types/src/event_stream.rs +++ b/rust-runtime/aws-smithy-types/src/event_stream.rs @@ -185,7 +185,7 @@ impl Message { } } -/// Raw message from an event stream receiver when a [`SdkError::ResponseError`] is returned. +/// Raw message from an event stream receiver when a response error is encountered. #[derive(Debug)] #[non_exhaustive] pub enum RawMessage { From b87ddbd3b0f128d81b0356d0e5131c6ffbecd3e9 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Mon, 6 Nov 2023 16:48:44 -0600 Subject: [PATCH 11/11] Avoid exposing unstable `SegmentedBuf` in `smithy-types --- rust-runtime/aws-smithy-http/src/event_stream/receiver.rs | 3 ++- rust-runtime/aws-smithy-types/src/event_stream.rs | 7 +++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs b/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs index 49aebdb4ac..6d94194511 100644 --- a/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs +++ b/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs @@ -193,11 +193,12 @@ impl Receiver { } if self.buffer.has_data() { trace!(remaining_data = ?self.buffer, "data left over in the event stream response stream"); + let buf = self.buffer.buffered(); return Err(SdkError::response_error( ReceiverError { kind: ReceiverErrorKind::UnexpectedEndOfStream, }, - RawMessage::invalid(self.buffer.buffered()), + RawMessage::invalid(Some(buf.copy_to_bytes(buf.remaining()))), )); } Ok(None) diff --git a/rust-runtime/aws-smithy-types/src/event_stream.rs b/rust-runtime/aws-smithy-types/src/event_stream.rs index 1491b43a5b..0d98dabd99 100644 --- a/rust-runtime/aws-smithy-types/src/event_stream.rs +++ b/rust-runtime/aws-smithy-types/src/event_stream.rs @@ -6,8 +6,7 @@ //! Types relevant to event stream serialization/deserialization use crate::str_bytes::StrBytes; -use bytes::{Buf, Bytes}; -use bytes_utils::SegmentedBuf; +use bytes::Bytes; mod value { use crate::str_bytes::StrBytes; @@ -198,7 +197,7 @@ pub enum RawMessage { impl RawMessage { /// Creates a `RawMessage` for failure to decode a message into a valid frame. - pub fn invalid(buf: &mut SegmentedBuf) -> Self { - Self::Invalid(Some(buf.copy_to_bytes(buf.remaining()))) + pub fn invalid(bytes: Option) -> Self { + Self::Invalid(bytes) } }