Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overhaul stalled stream protection and add upload support #3485

Merged
merged 24 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
892a1bc
Take initial stab at upload minimum throughput
jdisanti Mar 5, 2024
38aa76e
Add comprehensive upload stream protection integration tests
jdisanti Mar 8, 2024
186c51b
Add comprehensive download stream protection integration tests
jdisanti Mar 8, 2024
dd8d351
Commonize code between upload/download tests
jdisanti Mar 8, 2024
d57f7df
Rework upload/download stalled stream protection
jdisanti Mar 11, 2024
38cd2a3
CI fixes
jdisanti Mar 13, 2024
6fd19ec
Rename "cell" to "bin"
jdisanti Mar 14, 2024
8ef35d6
Fix empty cell check
jdisanti Mar 14, 2024
eb58c53
Merge remote-tracking branch 'origin/main' into jdisanti-upload-min-t…
jdisanti Mar 14, 2024
ac15324
Update changelog
jdisanti Mar 14, 2024
510bedb
Fix aws-smithy-runtime no-default-features build
jdisanti Mar 15, 2024
7363504
Fix size hints
jdisanti Mar 25, 2024
0f1e009
Merge remote-tracking branch 'origin/main' into jdisanti-upload-min-t…
jdisanti Mar 25, 2024
79b6010
Disable upload protection in current BMV
jdisanti Mar 25, 2024
77255af
Merge remote-tracking branch 'origin/main' into jdisanti-upload-min-t…
jdisanti Mar 25, 2024
96198d5
Merge remote-tracking branch 'origin/main' into jdisanti-upload-min-t…
jdisanti Mar 25, 2024
4c67020
Version bump runtime-api
jdisanti Mar 25, 2024
e25e049
Rename upload check futures
jdisanti Mar 25, 2024
3020312
Simplify bin merging logic
jdisanti Mar 25, 2024
adcb87f
Create a new `show_test_logs` fn
jdisanti Mar 25, 2024
d960f50
Improve the changelog entries
jdisanti Mar 25, 2024
a21d9df
Merge remote-tracking branch 'origin/main' into jdisanti-upload-min-t…
jdisanti Mar 27, 2024
3ca765a
Incorporate feedback
jdisanti Mar 27, 2024
e7d7f44
Fix feature issue
jdisanti Mar 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions aws/sdk/integration-tests/s3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,6 @@ tracing-subscriber = { version = "0.3.15", features = ["env-filter", "json"] }
# If you're writing a test with this, take heed! `no-env-filter` means you'll be capturing
# logs from everything that speaks, so be specific with your asserts.
tracing-test = { version = "0.2.4", features = ["no-env-filter"] }

[dependencies]
pin-project-lite = "0.2.13"
124 changes: 86 additions & 38 deletions aws/sdk/integration-tests/s3/tests/stalled-stream-protection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,58 +4,118 @@
*/

use aws_credential_types::Credentials;
use aws_sdk_s3::config::{Region, StalledStreamProtectionConfig};
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::{
config::{Region, StalledStreamProtectionConfig},
error::BoxError,
};
use aws_sdk_s3::{error::DisplayErrorContext, primitives::ByteStream};
use aws_sdk_s3::{Client, Config};
use bytes::BytesMut;
use aws_smithy_runtime::{assert_str_contains, test_util::capture_test_logs::capture_test_logs};
use aws_smithy_types::body::SdkBody;
use bytes::{Bytes, BytesMut};
use http_body::Body;
use std::error::Error;
use std::future::Future;
use std::net::SocketAddr;
use std::time::Duration;
use std::{future::Future, task::Poll};
use std::{net::SocketAddr, pin::Pin, task::Context};
use tokio::{
net::{TcpListener, TcpStream},
time::sleep,
};
use tracing::debug;

// This test doesn't work because we can't count on `hyper` to poll the body,
// regardless of whether we schedule a wake. To make this functionality work,
// we'd have to integrate more closely with the orchestrator.
//
// I'll leave this test here because we do eventually want to support stalled
// stream protection for uploads.
#[ignore]
enum SlowBodyState {
Wait(Pin<Box<dyn std::future::Future<Output = ()> + Send + Sync + 'static>>),
Send,
Taken,
}

struct SlowBody {
state: SlowBodyState,
}

impl SlowBody {
fn new() -> Self {
Self {
state: SlowBodyState::Send,
}
}
}

impl Body for SlowBody {
type Data = Bytes;
type Error = BoxError;

fn poll_data(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
loop {
let mut state = SlowBodyState::Taken;
std::mem::swap(&mut state, &mut self.state);
match state {
SlowBodyState::Wait(mut fut) => match fut.as_mut().poll(cx) {
Comment on lines +49 to +57
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recommend using http_body 1.0 for bodies going forward

Poll::Ready(_) => self.state = SlowBodyState::Send,
Poll::Pending => {
self.state = SlowBodyState::Wait(fut);
return Poll::Pending;
}
},
SlowBodyState::Send => {
self.state = SlowBodyState::Wait(Box::pin(sleep(Duration::from_micros(100))));
return Poll::Ready(Some(Ok(Bytes::from_static(
b"data_data_data_data_data_data_data_data_data_data_data_data_\
data_data_data_data_data_data_data_data_data_data_data_data_\
data_data_data_data_data_data_data_data_data_data_data_data_\
data_data_data_data_data_data_data_data_data_data_data_data_",
))));
}
SlowBodyState::Taken => unreachable!(),
}
}
}

fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
}
}

#[tokio::test]
async fn test_stalled_stream_protection_defaults_for_upload() {
// We spawn a faulty server that will close the connection after
// writing half of the response body.
let _logs = capture_test_logs();

// We spawn a faulty server that will stop all request processing after reading half of the request body.
let (server, server_addr) = start_faulty_upload_server().await;
let _ = tokio::spawn(server);

let conf = Config::builder()
.credentials_provider(Credentials::for_tests())
.region(Region::new("us-east-1"))
.endpoint_url(format!("http://{server_addr}"))
// .stalled_stream_protection(StalledStreamProtectionConfig::enabled().build())
.stalled_stream_protection(StalledStreamProtectionConfig::enabled().build())
jdisanti marked this conversation as resolved.
Show resolved Hide resolved
.build();
let client = Client::from_conf(conf);

let err = client
.put_object()
.bucket("a-test-bucket")
.key("stalled-stream-test.txt")
.body(ByteStream::from_static(b"Hello"))
.body(ByteStream::new(SdkBody::from_body_0_4(SlowBody::new())))
.send()
.await
.expect_err("upload stream stalled out");

let err = err.source().expect("inner error exists");
assert_eq!(
err.to_string(),
let err_msg = DisplayErrorContext(&err).to_string();
assert_str_contains!(
err_msg,
"minimum throughput was specified at 1 B/s, but throughput of 0 B/s was observed"
);
}

async fn start_faulty_upload_server() -> (impl Future<Output = ()>, SocketAddr) {
use tokio::net::{TcpListener, TcpStream};
use tokio::time::sleep;

let listener = TcpListener::bind("0.0.0.0:0")
.await
.expect("socket is free");
Expand All @@ -65,12 +125,7 @@ async fn start_faulty_upload_server() -> (impl Future<Output = ()>, SocketAddr)
let mut buf = BytesMut::new();
let mut time_to_stall = false;

loop {
if time_to_stall {
debug!("faulty server has read partial request, now getting stuck");
break;
}

while !time_to_stall {
match socket.try_read_buf(&mut buf) {
Ok(0) => {
unreachable!(
Expand All @@ -79,12 +134,7 @@ async fn start_faulty_upload_server() -> (impl Future<Output = ()>, SocketAddr)
}
Ok(n) => {
debug!("read {n} bytes from the socket");

// Check to see if we've received some headers
if buf.len() >= 128 {
let s = String::from_utf8_lossy(&buf);
debug!("{s}");

time_to_stall = true;
}
}
Expand All @@ -98,6 +148,7 @@ async fn start_faulty_upload_server() -> (impl Future<Output = ()>, SocketAddr)
}
}

debug!("faulty server has read partial request, now getting stuck");
loop {
tokio::task::yield_now().await
}
Expand Down Expand Up @@ -229,14 +280,11 @@ async fn test_stalled_stream_protection_for_downloads_is_enabled_by_default() {
err.to_string(),
"minimum throughput was specified at 1 B/s, but throughput of 0 B/s was observed"
);
// 1s check interval + 5s grace period
assert_eq!(start.elapsed().as_secs(), 6);
// the 1s check interval is included in the 5s grace period
assert_eq!(start.elapsed().as_secs(), 5);
}

async fn start_faulty_download_server() -> (impl Future<Output = ()>, SocketAddr) {
use tokio::net::{TcpListener, TcpStream};
use tokio::time::sleep;

let listener = TcpListener::bind("0.0.0.0:0")
.await
.expect("socket is free");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class StalledStreamProtectionOperationCustomization(
// we can't count on hyper to poll a request body on wake.
rustTemplate(
"""
#{StalledStreamProtectionInterceptor}::new(#{Kind}::ResponseBody)
jdisanti marked this conversation as resolved.
Show resolved Hide resolved
#{StalledStreamProtectionInterceptor}::new(#{Kind}::RequestAndResponseBody)
""",
*preludeScope,
"StalledStreamProtectionInterceptor" to stalledStreamProtectionModule.resolve("StalledStreamProtectionInterceptor"),
Expand Down
2 changes: 1 addition & 1 deletion rust-runtime/aws-smithy-runtime/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aws-smithy-runtime"
version = "1.1.8"
version = "1.2.0"
authors = ["AWS Rust SDK Team <[email protected]>", "Zelda Hessler <[email protected]>"]
description = "The new smithy runtime crate"
edition = "2021"
Expand Down
Loading
Loading