Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disable stalled stream protection on empty bodies and after read complete #3644

Merged
merged 16 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ references = ["aws-sdk-rust#1079"]
meta = { "breaking" = false, "bug" = true, "tada" = false }
author = "rcoh"

[[aws-sdk-rust]]
message = "Fixes stalled upload stream protection to not apply to empty request bodies and to stop checking for violations once the request body has been read."
references = ["aws-sdk-rust#1141", "aws-sdk-rust#1146", "aws-sdk-rust#1148"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will aws-sdk-rust#1141 (for CopyObject) and aws-sdk-rust#1146 (for MultipartUpload) be addressed by this PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CopyObject is going to be resolved in a separate PR that I linked in an earlier post. MPU should be fixed by this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, maybe we should remove aws-sdk-rust#1141 from references then?

meta = { "breaking" = false, "tada" = false, "bug" = true }
authors = ["aajtodd", "Velfi"]

[[smithy-rs]]
message = "Fixes stalled upload stream protection to not apply to empty request bodies and to stop checking for violations once the request body has been read."
references = ["aws-sdk-rust#1141", "aws-sdk-rust#1146", "aws-sdk-rust#1148"]
meta = { "breaking" = false, "tada" = false, "bug" = true }
authors = ["aajtodd", "Velfi"]

[[aws-sdk-rust]]
message = "Updating the documentation for the `app_name` method on `ConfigLoader` to indicate the order of precedence for the sources of the `AppName`."
references = ["smithy-rs#3645"]
Expand Down
2 changes: 1 addition & 1 deletion rust-runtime/aws-smithy-runtime-api/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aws-smithy-runtime-api"
version = "1.6.0"
version = "1.6.1"
authors = ["AWS Rust SDK Team <[email protected]>", "Zelda Hessler <[email protected]>"]
description = "Smithy runtime types."
edition = "2021"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
use aws_smithy_types::config_bag::{Storable, StoreReplace};
use std::time::Duration;

const DEFAULT_GRACE_PERIOD: Duration = Duration::from_secs(5);
/// The default grace period for stalled stream protection.
///
/// When a stream stalls for longer than this grace period, the stream will
/// return an error.
pub const DEFAULT_GRACE_PERIOD: Duration = Duration::from_secs(20);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why pub?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

b/c there's an SSP option struct in aws-smithy-runtime that also has a default and I wanted a single source of truth.


/// Configuration for stalled stream protection.
///
Expand Down
2 changes: 1 addition & 1 deletion rust-runtime/aws-smithy-runtime/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aws-smithy-runtime"
version = "1.5.2"
version = "1.5.3"
authors = ["AWS Rust SDK Team <[email protected]>", "Zelda Hessler <[email protected]>"]
description = "The new smithy runtime crate"
edition = "2021"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ impl UploadThroughput {
self.logs.lock().unwrap().push_bytes_transferred(now, bytes);
}

pub(crate) fn mark_complete(&self) -> bool {
self.logs.lock().unwrap().mark_complete()
}

pub(crate) fn report(&self, now: SystemTime) -> ThroughputReport {
self.logs.lock().unwrap().report(now)
}
Expand Down Expand Up @@ -177,6 +181,8 @@ trait UploadReport {
impl UploadReport for ThroughputReport {
fn minimum_throughput_violated(self, minimum_throughput: Throughput) -> (bool, Throughput) {
let throughput = match self {
// stream has been exhausted, stop tracking violations
ThroughputReport::Complete => return (false, ZERO_THROUGHPUT),
// If the report is incomplete, then we don't have enough data yet to
// decide if minimum throughput was violated.
ThroughputReport::Incomplete => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ trait DownloadReport {
impl DownloadReport for ThroughputReport {
fn minimum_throughput_violated(self, minimum_throughput: Throughput) -> (bool, Throughput) {
let throughput = match self {
ThroughputReport::Complete => return (false, ZERO_THROUGHPUT),
// If the report is incomplete, then we don't have enough data yet to
// decide if minimum throughput was violated.
ThroughputReport::Incomplete => {
Expand Down Expand Up @@ -175,6 +176,18 @@ where
tracing::trace!("received data: {}", bytes.len());
this.throughput
.push_bytes_transferred(now, bytes.len() as u64);

// hyper will optimistically stop polling when end of stream is reported
// (e.g. when content-length amount of data has been consumed) which means
// we may never get to `Poll:Ready(None)`. Check for same condition and
// attempt to stop checking throughput violations _now_ as we may never
// get polled again. The caveat here is that it depends on `Body` implementations
// implementing `is_end_stream()` correctly. Users can also disable SSP as an
// alternative for such fringe use cases.
if self.is_end_stream() {
tracing::trace!("stream reported end of stream before Poll::Ready(None) reached; marking stream complete");
self.throughput.mark_complete();
}
Poll::Ready(Some(Ok(bytes)))
}
Poll::Pending => {
Expand All @@ -183,7 +196,12 @@ where
Poll::Pending
}
// If we've read all the data or an error occurred, then return that result.
res => res,
res => {
if this.throughput.mark_complete() {
tracing::trace!("stream completed: {:?}", res);
}
res
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
*/

use super::Throughput;
use aws_smithy_runtime_api::client::stalled_stream_protection::StalledStreamProtectionConfig;
use aws_smithy_runtime_api::client::stalled_stream_protection::{
StalledStreamProtectionConfig, DEFAULT_GRACE_PERIOD,
};
use std::time::Duration;

/// A collection of options for configuring a [`MinimumThroughputBody`](super::MinimumThroughputBody).
/// A collection of options for configuring a [`MinimumThroughputBody`](super::MinimumThroughputDownloadBody).
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was correct as it was I think, it applies to MinimumThroughputBody not just MinimumThroughputDownloadBody

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it to avoid the deprecation message:

/// Use [`MinimumThroughputDownloadBody`] instead.
#[deprecated(note = "Renamed to MinimumThroughputDownloadBody since it doesn't work for uploads")]
pub type MinimumThroughputBody<B> = MinimumThroughputDownloadBody<B>;

#[derive(Debug, Clone)]
pub struct MinimumThroughputBodyOptions {
/// The minimum throughput that is acceptable.
Expand Down Expand Up @@ -69,6 +71,13 @@ impl MinimumThroughputBodyOptions {
}
}

const DEFAULT_MINIMUM_THROUGHPUT: Throughput = Throughput {
bytes_read: 1,
per_time_elapsed: Duration::from_secs(1),
};

const DEFAULT_CHECK_WINDOW: Duration = Duration::from_secs(1);

impl Default for MinimumThroughputBodyOptions {
fn default() -> Self {
Self {
Expand All @@ -87,14 +96,6 @@ pub struct MinimumThroughputBodyOptionsBuilder {
grace_period: Option<Duration>,
}

const DEFAULT_GRACE_PERIOD: Duration = Duration::from_secs(0);
const DEFAULT_MINIMUM_THROUGHPUT: Throughput = Throughput {
bytes_read: 1,
per_time_elapsed: Duration::from_secs(1),
};

const DEFAULT_CHECK_WINDOW: Duration = Duration::from_secs(1);

impl MinimumThroughputBodyOptionsBuilder {
/// Create a new `MinimumThroughputBodyOptionsBuilder`.
pub fn new() -> Self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ pub(crate) enum ThroughputReport {
Pending,
/// The stream transferred this amount of throughput during the time window.
Transferred(Throughput),
/// The stream has completed, no more data is expected.
Complete,
}

const BIN_COUNT: usize = 10;
Expand All @@ -285,6 +287,7 @@ pub(super) struct ThroughputLogs {
resolution: Duration,
current_tail: SystemTime,
buffer: LogBuffer<BIN_COUNT>,
stream_complete: bool,
}

impl ThroughputLogs {
Expand All @@ -302,6 +305,7 @@ impl ThroughputLogs {
resolution,
current_tail: now,
buffer: LogBuffer::new(),
stream_complete: false,
}
}

Expand Down Expand Up @@ -343,8 +347,24 @@ impl ThroughputLogs {
assert!(self.current_tail >= now);
}

/// Mark the stream complete indicating no more data is expected. This is an
/// idempotent operation -- subsequent invocations of this function have no effect
/// and return false.
///
/// After marking a stream complete [report](#method.report) will forever more return
/// [ThroughputReport::Complete]
pub(super) fn mark_complete(&mut self) -> bool {
let prev = self.stream_complete;
self.stream_complete = true;
!prev
}

/// Generates an overall report of the time window.
pub(super) fn report(&mut self, now: SystemTime) -> ThroughputReport {
if self.stream_complete {
return ThroughputReport::Complete;
}

self.catch_up(now);
self.buffer.fill_gaps();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ impl Intercept for StalledStreamProtectionInterceptor {
) -> Result<(), BoxError> {
if let Some(sspcfg) = cfg.load::<StalledStreamProtectionConfig>().cloned() {
if sspcfg.upload_enabled() {
if let Some(0) = context.request().body().content_length() {
tracing::trace!(
"skipping stalled stream protection for zero length request body"
);
return Ok(());
}
let (_async_sleep, time_source) = get_runtime_component_deps(runtime_components)?;
let now = time_source.now();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,24 @@ async fn download_stalls() {
let (time, sleep) = tick_advance_time_and_sleep();
let (server, response_sender) = channel_server();
let op = operation(server, time.clone(), sleep);
let barrier = Arc::new(Barrier::new(2));

let c = barrier.clone();
let server = tokio::spawn(async move {
for _ in 1..10 {
c.wait().await;
for i in 1..10 {
tracing::debug!("send {i}");
response_sender.send(NEAT_DATA).await.unwrap();
tick!(time, Duration::from_secs(1));
}
tick!(time, Duration::from_secs(10));
});

let response_body = op.invoke(()).await.expect("initial success");
let result = tokio::spawn(eagerly_consume(response_body));
let result = tokio::spawn(async move {
barrier.wait().await;
eagerly_consume(response_body).await
});
server.await.unwrap();

let err = result
Expand Down Expand Up @@ -188,6 +195,7 @@ async fn user_downloads_data_too_slowly() {
}

use download_test_tools::*;
use tokio::sync::Barrier;
mod download_test_tools {
use crate::stalled_stream_common::*;

Expand Down
Loading