Skip to content

Commit

Permalink
add additional tests
Browse files Browse the repository at this point in the history
  • Loading branch information
aajtodd committed May 16, 2024
1 parent bc4b1db commit 22a0e30
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::client::http::body::minimum_throughput::{
};
use aws_smithy_async::rt::sleep::AsyncSleep;
use http_body_0_4::Body;
use std::borrow::BorrowMut;
use std::future::Future;
use std::pin::{pin, Pin};
use std::task::{Context, Poll};
Expand Down Expand Up @@ -182,9 +181,11 @@ where
// (e.g. when content-length amount of data has been consumed) which means
// we may never get to `Poll:Ready(None)`. Check for same condition and
// attempt to stop checking throughput violations _now_ as we may never
// get polled again.
// get polled again. The caveat here is that it depends on `Body` implementations
// implementing `is_end_stream()` correctly. Users can also disable SSP as an
// alternative for such fringe use cases.
if self.is_end_stream() {
tracing::trace!("stream reported end of stream before Poll::Ready(None) reached marking stream complete");
tracing::trace!("stream reported end of stream before Poll::Ready(None) reached; marking stream complete");
self.throughput.complete();
}
Poll::Ready(Some(Ok(bytes)))
Expand Down
83 changes: 81 additions & 2 deletions rust-runtime/aws-smithy-runtime/tests/stalled_stream_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ async fn empty_request_body_delayed_response() {
assert_eq!(200, result.await.unwrap().expect("success").as_u16());
}

// TODO(fix-stalled-stream-upload) - expand coverage to include hitting the grace period and then completing the upload

/// Scenario: All the request data is either uploaded to the server or buffered in the
/// HTTP client, but the response doesn't start coming through within the grace period.
/// Expected: MUST NOT timeout, upload throughput should only apply up until the request body has
Expand Down Expand Up @@ -160,6 +158,36 @@ async fn complete_upload_delayed_response() {
assert_eq!(200, result.await.unwrap().expect("success").as_u16());
}

/// Scenario: Upload all request data and never poll again once content-length has
/// been reached. Hyper will stop polling once it detects end of stream so we can't rely
/// on reaching `Poll:Ready(None)` to detect end of stream.
///
/// ref: https://github.com/hyperium/hyper/issues/1545
/// ref: https://github.com/hyperium/hyper/issues/1521
///
/// Expected: MUST NOT timeout, upload throughput should only apply up until the request body has
/// been read completely. Once no more data is expected we should stop checking for throughput
/// violations.
#[tokio::test]
async fn complete_upload_stop_polling() {
let _logs = show_test_logs();

let (server, time, sleep) = limited_read_server(NEAT_DATA.len(), Some(Duration::from_secs(7)));
let op = operation(server, time.clone(), sleep.clone());

let body = SdkBody::from(NEAT_DATA);
let result = tokio::spawn(async move { op.invoke(body).await });

tokio::spawn(async move {
// advance past the grace period
tick!(time, Duration::from_secs(6));
// unblock server
tick!(time, Duration::from_secs(2));
});

assert_eq!(200, result.await.unwrap().expect("success").as_u16());
}

// Scenario: The server stops asking for data, the client maxes out its send buffer,
// and the request stream stops being polled. However, before the grace period
// is over, the server recovers and starts asking for data again.
Expand Down Expand Up @@ -383,6 +411,57 @@ mod upload_test_tools {
)
}

/// Fake server/connector that polls data only up to the content-length. Optionally delays
/// sending the response by the given duration.
pub fn limited_read_server(
content_len: usize,
respond_after: Option<Duration>,
) -> (SharedHttpConnector, TickAdvanceTime, TickAdvanceSleep) {
async fn fake_server(
mut body: Pin<&mut SdkBody>,
_time: TickAdvanceTime,
sleep: TickAdvanceSleep,
params: (usize, Option<Duration>),
) -> HttpResponse {
let mut remaining = params.0;
loop {
match poll_fn(|cx| body.as_mut().poll_data(cx)).await {
Some(res) => {
let rc = res.unwrap().len();
remaining -= rc;
tracing::info!("read {rc} bytes; remaining: {remaining}");
if remaining == 0 {
tracing::info!("read reported content-length data, stopping polling");
break;
};
}
None => {
tracing::info!(
"read until poll_data() returned None, no data left, stopping polling"
);
break;
}
}
}

let respond_after = params.1;
if let Some(delay) = respond_after {
tracing::info!("stalling for {} seconds", delay.as_secs());
sleep.sleep(delay).await;
tracing::info!("returning delayed response");
}

successful_response()
}

fake_server!(
FakeServerConnector,
fake_server,
(usize, Option<Duration>),
(content_len, respond_after)
)
}

pub fn expect_timeout(result: Result<StatusCode, SdkError<Infallible, Response<SdkBody>>>) {
let err = result.expect_err("should have timed out");
assert_str_contains!(
Expand Down
2 changes: 1 addition & 1 deletion rust-runtime/aws-smithy-types/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aws-smithy-types"
version = "1.1.9"
version = "1.1.10"
authors = [
"AWS Rust SDK Team <[email protected]>",
"Russell Cohen <[email protected]>",
Expand Down
2 changes: 2 additions & 0 deletions rust-runtime/aws-smithy-types/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,9 +377,11 @@ mod test {
let mut body = SdkBody::from("hello!");
let mut body = Pin::new(&mut body);
let data = body.next().await;
assert!(!body.is_end_stream());
assert!(data.is_some());
let data = body.next().await;
assert!(data.is_none());
assert!(body.is_end_stream());
}

#[tokio::test]
Expand Down

0 comments on commit 22a0e30

Please sign in to comment.