Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid exposing aws_smithy_http::event_stream::receiver::Receiver in SDK's public API #3114

Merged
merged 15 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,19 @@ message = """
references = ["smithy-rs#3139"]
meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "all"}
author = "ysaito1001"

[[smithy-rs]]
message = """
An operation output that supports receiving events from stream now provides a new-type wrapping `aws_smithy_http::event_stream::receiver::Receiver`. The new-type supports the `.recv()` method whose signature is the same as [`aws_smithy_http::event_stream::receiver::Receiver::recv`](https://docs.rs/aws-smithy-http/0.57.0/aws_smithy_http/event_stream/struct.Receiver.html#method.recv).
"""
references = ["smithy-rs#3100", "smithy-rs#3114"]
meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "client" }
author = "ysaito1001"

[[aws-sdk-rust]]
message = """
An operation output that supports receiving events from stream now provides a new-type wrapping `aws_smithy_http::event_stream::receiver::Receiver`. The new-type supports the `.recv()` method whose signature is the same as [`aws_smithy_http::event_stream::receiver::Receiver::recv`](https://docs.rs/aws-smithy-http/0.57.0/aws_smithy_http/event_stream/struct.Receiver.html#method.recv).
"""
references = ["smithy-rs#3100", "smithy-rs#3114"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
author = "ysaito1001"
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class EventStreamSymbolProviderTest {
listOf(someStream, someStreamError),
)
outputType shouldBe RustType.Application(
RuntimeType.eventStreamReceiver(TestRuntimeConfig).toSymbol().rustType(),
RuntimeType.eventReceiver(TestRuntimeConfig).toSymbol().rustType(),
listOf(someStream, someStreamError),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ class InlineDependency(
private fun forInlineableRustFile(name: String, vararg additionalDependencies: RustDependency) =
forRustFile(RustModule.private(name), "/inlineable/src/$name.rs", *additionalDependencies)

fun eventReceiver(runtimeConfig: RuntimeConfig) =
forInlineableRustFile(
"event_receiver",
CargoDependency.smithyHttp(runtimeConfig),
CargoDependency.smithyRuntimeApi(runtimeConfig),
CargoDependency.smithyTypes(runtimeConfig),
)

fun defaultAuthPlugin(runtimeConfig: RuntimeConfig) = forInlineableRustFile("auth_plugin", CargoDependency.smithyRuntimeApi(runtimeConfig))

fun jsonErrors(runtimeConfig: RuntimeConfig) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,13 @@ class EventStreamSymbolProvider(
(shape.isOutputEventStream(model) && target == CodegenTarget.SERVER)
val outer = when (isSender) {
true -> RuntimeType.eventStreamSender(runtimeConfig).toSymbol().rustType()
else -> RuntimeType.eventStreamReceiver(runtimeConfig).toSymbol().rustType()
else -> {
if (target == CodegenTarget.SERVER) {
RuntimeType.eventStreamReceiver(runtimeConfig).toSymbol().rustType()
} else {
RuntimeType.eventReceiver(runtimeConfig).toSymbol().rustType()
}
}
}
val rustType = RustType.Application(outer, listOf(innerT, errorT))
return initial.toBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,9 @@ data class RuntimeType(val path: String, val dependency: RustDependency? = null)
fun eventStreamReceiver(runtimeConfig: RuntimeConfig): RuntimeType =
smithyHttp(runtimeConfig).resolve("event_stream::Receiver")

fun eventReceiver(runtimeConfig: RuntimeConfig) =
forInlineDependency(InlineDependency.eventReceiver(runtimeConfig)).resolve("EventReceiver")

fun eventStreamSender(runtimeConfig: RuntimeConfig): RuntimeType =
smithyHttp(runtimeConfig).resolve("event_stream::EventStreamSender")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import software.amazon.smithy.rust.codegen.core.rustlang.rustBlockTemplate
import software.amazon.smithy.rust.codegen.core.rustlang.rustTemplate
import software.amazon.smithy.rust.codegen.core.rustlang.stripOuter
import software.amazon.smithy.rust.codegen.core.rustlang.withBlock
import software.amazon.smithy.rust.codegen.core.rustlang.writable
import software.amazon.smithy.rust.codegen.core.smithy.CodegenContext
import software.amazon.smithy.rust.codegen.core.smithy.CodegenTarget
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType
Expand Down Expand Up @@ -254,15 +255,25 @@ class HttpBindingGenerator(
operationShape,
targetShape,
).render()
val receiver = outputT.rustType().qualifiedName()
rustTemplate(
"""
let unmarshaller = #{unmarshallerConstructorFn}();
let body = std::mem::replace(body, #{SdkBody}::taken());
Ok($receiver::new(unmarshaller, body))
Ok(#{receiver:W})
""",
"SdkBody" to RuntimeType.sdkBody(runtimeConfig),
"unmarshallerConstructorFn" to unmarshallerConstructorFn,
"receiver" to writable {
if (codegenTarget == CodegenTarget.SERVER) {
rust("${outputT.rustType().qualifiedName()}::new(unmarshaller, body)")
} else {
rustTemplate(
"#{EventReceiver}::new(#{Receiver}::new(unmarshaller, body))",
"EventReceiver" to RuntimeType.eventReceiver(runtimeConfig),
"Receiver" to RuntimeType.eventStreamReceiver(runtimeConfig),
ysaito1001 marked this conversation as resolved.
Show resolved Hide resolved
)
}
},
)
}

Expand Down
2 changes: 1 addition & 1 deletion rust-runtime/aws-smithy-http/src/event_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ pub type BoxError = Box<dyn StdError + Send + Sync + 'static>;
pub use sender::{EventStreamSender, MessageStreamAdapter, MessageStreamError};

#[doc(inline)]
pub use receiver::{RawMessage, Receiver, ReceiverError};
pub use receiver::{Receiver, ReceiverError};
22 changes: 3 additions & 19 deletions rust-runtime/aws-smithy-http/src/event_stream/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use aws_smithy_eventstream::frame::{
};
use aws_smithy_runtime_api::client::result::{ConnectorError, SdkError};
use aws_smithy_types::body::SdkBody;
use aws_smithy_types::event_stream::Message;
use aws_smithy_types::event_stream::{Message, RawMessage};
use bytes::Buf;
use bytes::Bytes;
use bytes_utils::SegmentedBuf;
Expand Down Expand Up @@ -87,23 +87,6 @@ impl RecvBuf {
}
}

/// Raw message from a [`Receiver`] when a [`SdkError::ResponseError`] is returned.
#[derive(Debug)]
#[non_exhaustive]
pub enum RawMessage {
/// Message was decoded into a valid frame, but failed to unmarshall into a modeled type.
Decoded(Message),
/// Message failed to be decoded into a valid frame. The raw bytes may not be available in the
/// case where decoding consumed the buffer.
Invalid(Option<Bytes>),
}

impl RawMessage {
pub(crate) fn invalid(buf: &mut SegmentedBuf<Bytes>) -> Self {
Self::Invalid(Some(buf.copy_to_bytes(buf.remaining())))
}
}

#[derive(Debug)]
enum ReceiverErrorKind {
/// The stream ended before a complete message frame was received.
Expand Down Expand Up @@ -210,11 +193,12 @@ impl<T, E> Receiver<T, E> {
}
if self.buffer.has_data() {
trace!(remaining_data = ?self.buffer, "data left over in the event stream response stream");
let buf = self.buffer.buffered();
return Err(SdkError::response_error(
ReceiverError {
kind: ReceiverErrorKind::UnexpectedEndOfStream,
},
RawMessage::invalid(self.buffer.buffered()),
RawMessage::invalid(Some(buf.copy_to_bytes(buf.remaining()))),
));
}
Ok(None)
Expand Down
18 changes: 18 additions & 0 deletions rust-runtime/aws-smithy-types/src/event_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,21 @@ impl Message {
&self.payload
}
}

/// Raw message from an event stream receiver when a response error is encountered.
#[derive(Debug)]
#[non_exhaustive]
pub enum RawMessage {
/// Message was decoded into a valid frame, but failed to unmarshall into a modeled type.
Decoded(Message),
/// Message failed to be decoded into a valid frame. The raw bytes may not be available in the
/// case where decoding consumed the buffer.
Invalid(Option<Bytes>),
}

impl RawMessage {
/// Creates a `RawMessage` for failure to decode a message into a valid frame.
pub fn invalid(bytes: Option<Bytes>) -> Self {
Self::Invalid(bytes)
}
}
2 changes: 1 addition & 1 deletion rust-runtime/inlineable/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ default = ["gated-tests"]

[dependencies]
async-trait = "0.1"
aws-smithy-http = { path = "../aws-smithy-http" }
aws-smithy-http = { path = "../aws-smithy-http", features = ["event-stream"] }
aws-smithy-http-server = { path = "../aws-smithy-http-server" }
aws-smithy-json = { path = "../aws-smithy-json" }
aws-smithy-runtime-api = { path = "../aws-smithy-runtime-api", features = ["client"] }
Expand Down
28 changes: 28 additions & 0 deletions rust-runtime/inlineable/src/event_receiver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

use aws_smithy_http::event_stream::Receiver;
use aws_smithy_runtime_api::client::result::SdkError;
use aws_smithy_types::event_stream::RawMessage;

#[derive(Debug)]
/// Receives unmarshalled events at a time out of an Event Stream.
pub struct EventReceiver<T, E> {
inner: Receiver<T, E>,
}

impl<T, E> EventReceiver<T, E> {
pub(crate) fn new(inner: Receiver<T, E>) -> Self {
Self { inner }
}

/// Asynchronously tries to receive an event from the stream. If the stream has ended, it
/// returns an `Ok(None)`. If there is a transport layer error, it will return
/// `Err(SdkError::DispatchFailure)`. Service-modeled errors will be a part of the returned
/// messages.
pub async fn recv(&mut self) -> Result<Option<T>, SdkError<E, RawMessage>> {
self.inner.recv().await
}
}
2 changes: 2 additions & 0 deletions rust-runtime/inlineable/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ mod client_idempotency_token;
mod constrained;
#[allow(dead_code)]
mod ec2_query_errors;
#[allow(unused)]
mod event_receiver;
#[allow(dead_code)]
mod idempotency_token;
#[allow(dead_code)]
Expand Down
3 changes: 2 additions & 1 deletion tools/ci-cdk/canary-runner/src/build_bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ const REQUIRED_SDK_CRATES: &[&str] = &[
// The elements in this `Vec` should be sorted in an ascending order by the release date.
lazy_static! {
static ref NOTABLE_SDK_RELEASE_TAGS: Vec<ReleaseTag> = vec![
ReleaseTag::from_str("release-2023-10-26").unwrap(), // last version before addition of Sigv4a MRAP test
// last version before addition of Sigv4a MRAP test
ReleaseTag::from_str("release-2023-10-26").unwrap(),
];
}

Expand Down