Skip to content

Commit

Permalink
Avoid exposing aws_smithy_http::event_stream::receiver::Receiver in…
Browse files Browse the repository at this point in the history
… SDK's public API (#3114)

## Motivation and Context
Implements #3100 

## Description
Currently, we expose `aws_smithy_http::event_stream::Receiver` in
generated SDKs, as shown in the following S3's example (see[ a generated
diff](https://d2luzm2xt3nokh.cloudfront.net/codegen-diff/cc303ab1a07693ab02d5ec4f06101b628d1dbabe/1aa6a8da7d2b7669ba3ab7179a1fd72aadc03162/aws-sdk-ignore-whitespace/index.html)
for
`tmp-codegen-diff/aws-sdk/sdk/s3/src/operation/select_object_content/_select_object_content_output.rs`):
```
pub struct SelectObjectContentOutput {
    <p>The array of results.</p>
    pub payload: ::aws_smithy_http::event_stream::Receiver<
        crate::types::SelectObjectContentEventStream,
        crate::types::error::SelectObjectContentEventStreamError,
    >,
...
```

This PR wraps `Receiver` in a new-type, called `EventReceiver`, which
then supports `pub async fn recv` method whose signature is the same as
`aws_smithy_http::event_stream::Receiver::recv`.

## Testing
Relied on existing tests (e.g. `s3` and `transcribestreaming`
integration tests cover uses cases affected by this change).

## Checklist
<!--- If a checkbox below is not applicable, then please DELETE it
rather than leaving it unchecked -->
- [x] I have updated `CHANGELOG.next.toml` if I made changes to the
smithy-rs codegen or runtime crates
- [x] I have updated `CHANGELOG.next.toml` if I made changes to the AWS
SDK, generated SDK code, or SDK runtime crates

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
  • Loading branch information
ysaito1001 authored Nov 7, 2023
1 parent c296e8e commit f9c0526
Show file tree
Hide file tree
Showing 13 changed files with 103 additions and 26 deletions.
16 changes: 16 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,19 @@ message = """
references = ["smithy-rs#3139"]
meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "all"}
author = "ysaito1001"

[[smithy-rs]]
message = """
An operation output that supports receiving events from stream now provides a new-type wrapping `aws_smithy_http::event_stream::receiver::Receiver`. The new-type supports the `.recv()` method whose signature is the same as [`aws_smithy_http::event_stream::receiver::Receiver::recv`](https://docs.rs/aws-smithy-http/0.57.0/aws_smithy_http/event_stream/struct.Receiver.html#method.recv).
"""
references = ["smithy-rs#3100", "smithy-rs#3114"]
meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "client" }
author = "ysaito1001"

[[aws-sdk-rust]]
message = """
An operation output that supports receiving events from stream now provides a new-type wrapping `aws_smithy_http::event_stream::receiver::Receiver`. The new-type supports the `.recv()` method whose signature is the same as [`aws_smithy_http::event_stream::receiver::Receiver::recv`](https://docs.rs/aws-smithy-http/0.57.0/aws_smithy_http/event_stream/struct.Receiver.html#method.recv).
"""
references = ["smithy-rs#3100", "smithy-rs#3114"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
author = "ysaito1001"
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class EventStreamSymbolProviderTest {
listOf(someStream, someStreamError),
)
outputType shouldBe RustType.Application(
RuntimeType.eventStreamReceiver(TestRuntimeConfig).toSymbol().rustType(),
RuntimeType.eventReceiver(TestRuntimeConfig).toSymbol().rustType(),
listOf(someStream, someStreamError),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ class InlineDependency(
private fun forInlineableRustFile(name: String, vararg additionalDependencies: RustDependency) =
forRustFile(RustModule.private(name), "/inlineable/src/$name.rs", *additionalDependencies)

fun eventReceiver(runtimeConfig: RuntimeConfig) =
forInlineableRustFile(
"event_receiver",
CargoDependency.smithyHttp(runtimeConfig),
CargoDependency.smithyRuntimeApi(runtimeConfig),
CargoDependency.smithyTypes(runtimeConfig),
)

fun defaultAuthPlugin(runtimeConfig: RuntimeConfig) = forInlineableRustFile("auth_plugin", CargoDependency.smithyRuntimeApi(runtimeConfig))

fun jsonErrors(runtimeConfig: RuntimeConfig) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,13 @@ class EventStreamSymbolProvider(
(shape.isOutputEventStream(model) && target == CodegenTarget.SERVER)
val outer = when (isSender) {
true -> RuntimeType.eventStreamSender(runtimeConfig).toSymbol().rustType()
else -> RuntimeType.eventStreamReceiver(runtimeConfig).toSymbol().rustType()
else -> {
if (target == CodegenTarget.SERVER) {
RuntimeType.eventStreamReceiver(runtimeConfig).toSymbol().rustType()
} else {
RuntimeType.eventReceiver(runtimeConfig).toSymbol().rustType()
}
}
}
val rustType = RustType.Application(outer, listOf(innerT, errorT))
return initial.toBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,9 @@ data class RuntimeType(val path: String, val dependency: RustDependency? = null)
fun eventStreamReceiver(runtimeConfig: RuntimeConfig): RuntimeType =
smithyHttp(runtimeConfig).resolve("event_stream::Receiver")

fun eventReceiver(runtimeConfig: RuntimeConfig) =
forInlineDependency(InlineDependency.eventReceiver(runtimeConfig)).resolve("EventReceiver")

fun eventStreamSender(runtimeConfig: RuntimeConfig): RuntimeType =
smithyHttp(runtimeConfig).resolve("event_stream::EventStreamSender")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import software.amazon.smithy.rust.codegen.core.rustlang.rustBlockTemplate
import software.amazon.smithy.rust.codegen.core.rustlang.rustTemplate
import software.amazon.smithy.rust.codegen.core.rustlang.stripOuter
import software.amazon.smithy.rust.codegen.core.rustlang.withBlock
import software.amazon.smithy.rust.codegen.core.rustlang.writable
import software.amazon.smithy.rust.codegen.core.smithy.CodegenContext
import software.amazon.smithy.rust.codegen.core.smithy.CodegenTarget
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType
Expand Down Expand Up @@ -254,15 +255,25 @@ class HttpBindingGenerator(
operationShape,
targetShape,
).render()
val receiver = outputT.rustType().qualifiedName()
rustTemplate(
"""
let unmarshaller = #{unmarshallerConstructorFn}();
let body = std::mem::replace(body, #{SdkBody}::taken());
Ok($receiver::new(unmarshaller, body))
Ok(#{receiver:W})
""",
"SdkBody" to RuntimeType.sdkBody(runtimeConfig),
"unmarshallerConstructorFn" to unmarshallerConstructorFn,
"receiver" to writable {
if (codegenTarget == CodegenTarget.SERVER) {
rust("${outputT.rustType().qualifiedName()}::new(unmarshaller, body)")
} else {
rustTemplate(
"#{EventReceiver}::new(#{Receiver}::new(unmarshaller, body))",
"EventReceiver" to RuntimeType.eventReceiver(runtimeConfig),
"Receiver" to RuntimeType.eventStreamReceiver(runtimeConfig),
)
}
},
)
}

Expand Down
2 changes: 1 addition & 1 deletion rust-runtime/aws-smithy-http/src/event_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ pub type BoxError = Box<dyn StdError + Send + Sync + 'static>;
pub use sender::{EventStreamSender, MessageStreamAdapter, MessageStreamError};

#[doc(inline)]
pub use receiver::{RawMessage, Receiver, ReceiverError};
pub use receiver::{Receiver, ReceiverError};
22 changes: 3 additions & 19 deletions rust-runtime/aws-smithy-http/src/event_stream/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use aws_smithy_eventstream::frame::{
};
use aws_smithy_runtime_api::client::result::{ConnectorError, SdkError};
use aws_smithy_types::body::SdkBody;
use aws_smithy_types::event_stream::Message;
use aws_smithy_types::event_stream::{Message, RawMessage};
use bytes::Buf;
use bytes::Bytes;
use bytes_utils::SegmentedBuf;
Expand Down Expand Up @@ -87,23 +87,6 @@ impl RecvBuf {
}
}

/// Raw message from a [`Receiver`] when a [`SdkError::ResponseError`] is returned.
#[derive(Debug)]
#[non_exhaustive]
pub enum RawMessage {
/// Message was decoded into a valid frame, but failed to unmarshall into a modeled type.
Decoded(Message),
/// Message failed to be decoded into a valid frame. The raw bytes may not be available in the
/// case where decoding consumed the buffer.
Invalid(Option<Bytes>),
}

impl RawMessage {
pub(crate) fn invalid(buf: &mut SegmentedBuf<Bytes>) -> Self {
Self::Invalid(Some(buf.copy_to_bytes(buf.remaining())))
}
}

#[derive(Debug)]
enum ReceiverErrorKind {
/// The stream ended before a complete message frame was received.
Expand Down Expand Up @@ -210,11 +193,12 @@ impl<T, E> Receiver<T, E> {
}
if self.buffer.has_data() {
trace!(remaining_data = ?self.buffer, "data left over in the event stream response stream");
let buf = self.buffer.buffered();
return Err(SdkError::response_error(
ReceiverError {
kind: ReceiverErrorKind::UnexpectedEndOfStream,
},
RawMessage::invalid(self.buffer.buffered()),
RawMessage::invalid(Some(buf.copy_to_bytes(buf.remaining()))),
));
}
Ok(None)
Expand Down
18 changes: 18 additions & 0 deletions rust-runtime/aws-smithy-types/src/event_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,21 @@ impl Message {
&self.payload
}
}

/// Raw message from an event stream receiver when a response error is encountered.
#[derive(Debug)]
#[non_exhaustive]
pub enum RawMessage {
/// Message was decoded into a valid frame, but failed to unmarshall into a modeled type.
Decoded(Message),
/// Message failed to be decoded into a valid frame. The raw bytes may not be available in the
/// case where decoding consumed the buffer.
Invalid(Option<Bytes>),
}

impl RawMessage {
/// Creates a `RawMessage` for failure to decode a message into a valid frame.
pub fn invalid(bytes: Option<Bytes>) -> Self {
Self::Invalid(bytes)
}
}
2 changes: 1 addition & 1 deletion rust-runtime/inlineable/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ default = ["gated-tests"]

[dependencies]
async-trait = "0.1"
aws-smithy-http = { path = "../aws-smithy-http" }
aws-smithy-http = { path = "../aws-smithy-http", features = ["event-stream"] }
aws-smithy-http-server = { path = "../aws-smithy-http-server" }
aws-smithy-json = { path = "../aws-smithy-json" }
aws-smithy-runtime-api = { path = "../aws-smithy-runtime-api", features = ["client"] }
Expand Down
28 changes: 28 additions & 0 deletions rust-runtime/inlineable/src/event_receiver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

use aws_smithy_http::event_stream::Receiver;
use aws_smithy_runtime_api::client::result::SdkError;
use aws_smithy_types::event_stream::RawMessage;

#[derive(Debug)]
/// Receives unmarshalled events at a time out of an Event Stream.
pub struct EventReceiver<T, E> {
inner: Receiver<T, E>,
}

impl<T, E> EventReceiver<T, E> {
pub(crate) fn new(inner: Receiver<T, E>) -> Self {
Self { inner }
}

/// Asynchronously tries to receive an event from the stream. If the stream has ended, it
/// returns an `Ok(None)`. If there is a transport layer error, it will return
/// `Err(SdkError::DispatchFailure)`. Service-modeled errors will be a part of the returned
/// messages.
pub async fn recv(&mut self) -> Result<Option<T>, SdkError<E, RawMessage>> {
self.inner.recv().await
}
}
2 changes: 2 additions & 0 deletions rust-runtime/inlineable/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ mod client_idempotency_token;
mod constrained;
#[allow(dead_code)]
mod ec2_query_errors;
#[allow(unused)]
mod event_receiver;
#[allow(dead_code)]
mod idempotency_token;
#[allow(dead_code)]
Expand Down
3 changes: 2 additions & 1 deletion tools/ci-cdk/canary-runner/src/build_bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ const REQUIRED_SDK_CRATES: &[&str] = &[
// The elements in this `Vec` should be sorted in an ascending order by the release date.
lazy_static! {
static ref NOTABLE_SDK_RELEASE_TAGS: Vec<ReleaseTag> = vec![
ReleaseTag::from_str("release-2023-10-26").unwrap(), // last version before addition of Sigv4a MRAP test
// last version before addition of Sigv4a MRAP test
ReleaseTag::from_str("release-2023-10-26").unwrap(),
];
}

Expand Down

0 comments on commit f9c0526

Please sign in to comment.