Skip to content

Commit

Permalink
Retry on 5xx, error on 4xx (#465)
Browse files Browse the repository at this point in the history
* Retry on 5xx, error on 4xx

* Onlyy retry on certain error statuses

* Only validate responses in the retry policy

* Fix inverted logic
  • Loading branch information
rylev authored Nov 1, 2021
1 parent cab39cd commit c6f94bc
Show file tree
Hide file tree
Showing 13 changed files with 99 additions and 145 deletions.
38 changes: 2 additions & 36 deletions sdk/core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,17 +124,8 @@ pub enum StreamError {
pub enum HttpError {
#[error("Failed to serialize request body as json: {0}")]
BodySerializationError(serde_json::Error),
#[error(
"unexpected HTTP result (expected: {:?}, received: {:?}, body: {:?})",
expected,
received,
body
)]
UnexpectedStatusCode {
expected: Vec<StatusCode>,
received: StatusCode,
body: String,
},
#[error("HTTP error status (status: {:?}, body: {:?})", status, body)]
ErrorStatusCode { status: StatusCode, body: String },
#[error("UTF8 conversion error: {0}")]
Utf8Error(#[from] std::str::Utf8Error),
#[error("from UTF8 conversion error: {0}")]
Expand All @@ -157,31 +148,6 @@ pub enum HttpError {
StreamResetError(StreamError),
}

impl HttpError {
pub fn new_unexpected_status_code(
expected: StatusCode,
received: StatusCode,
body: &str,
) -> HttpError {
HttpError::UnexpectedStatusCode {
expected: vec![expected],
received,
body: body.to_owned(),
}
}

pub fn new_multiple_unexpected_status_code(
allowed: Vec<StatusCode>,
received: StatusCode,
body: &str,
) -> HttpError {
HttpError::UnexpectedStatusCode {
expected: allowed,
received,
body: body.to_owned(),
}
}
}
#[derive(Debug, PartialEq, thiserror::Error)]
pub enum Not512ByteAlignedError {
#[error("start range not 512-byte aligned: {0}")]
Expand Down
40 changes: 5 additions & 35 deletions sdk/core/src/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,45 +41,15 @@ pub trait HttpClient: Send + Sync + std::fmt::Debug {
async fn execute_request_check_status(
&self,
request: Request<Bytes>,
expected_status: StatusCode,
_expected_status: StatusCode,
) -> Result<Response<Bytes>, HttpError> {
let response = self.execute_request(request).await?;
if expected_status != response.status() {
Err(HttpError::new_unexpected_status_code(
expected_status,
response.status(),
std::str::from_utf8(response.body())?,
))
} else {
let status = response.status();
if (200..400).contains(&status.as_u16()) {
Ok(response)
}
}

async fn execute_request_check_statuses(
&self,
request: Request<Bytes>,
expected_statuses: &[StatusCode],
) -> Result<Response<Bytes>, HttpError> {
let response = self.execute_request(request).await?;
if !expected_statuses
.iter()
.any(|expected_status| *expected_status == response.status())
{
if expected_statuses.len() == 1 {
Err(HttpError::new_unexpected_status_code(
expected_statuses[0],
response.status(),
std::str::from_utf8(response.body())?,
))
} else {
Err(HttpError::new_multiple_unexpected_status_code(
expected_statuses.to_vec(),
response.status(),
std::str::from_utf8(response.body())?,
))
}
} else {
Ok(response)
let body = std::str::from_utf8(response.body())?.to_owned();
Err(crate::HttpError::ErrorStatusCode { status, body })
}
}
}
Expand Down
73 changes: 63 additions & 10 deletions sdk/core/src/policies/retry_policies/retry_policy.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,37 @@
use crate::policies::{Policy, PolicyResult, Request, Response};
use crate::sleep::sleep;
use crate::PipelineContext;
use crate::{HttpError, PipelineContext};
use chrono::{DateTime, Local};
use http::StatusCode;
use std::sync::Arc;
use std::time::Duration;

/// A retry policy.
///
/// All retry policies follow a similar pattern only differing in how
/// they determine if the retry has expired and for how long they should
/// sleep between retries.
pub trait RetryPolicy {
/// Determine if no more retries should be performed.
///
/// Must return true if no more retries should be attempted.
fn is_expired(&self, first_retry_time: &mut Option<DateTime<Local>>, retry_count: u32) -> bool;
/// Determine how long before the next retry should be attempted.
fn sleep_duration(&self, retry_count: u32) -> Duration;
}

/// The status codes where a retry should be attempted.
///
/// On all other 4xx and 5xx status codes no retry is attempted.
const RETRY_STATUSES: &[StatusCode] = &[
StatusCode::REQUEST_TIMEOUT,
StatusCode::TOO_MANY_REQUESTS,
StatusCode::INTERNAL_SERVER_ERROR,
StatusCode::BAD_GATEWAY,
StatusCode::SERVICE_UNAVAILABLE,
StatusCode::GATEWAY_TIMEOUT,
];

#[async_trait::async_trait]
impl<T, C> Policy<C> for T
where
Expand All @@ -26,19 +48,50 @@ where
let mut retry_count = 0;

loop {
match next[0].send(ctx, request, &next[1..]).await {
Ok(response) => return Ok(response),
Err(error) => {
log::error!("Error occurred when making request: {}", error);
if self.is_expired(&mut first_retry_time, retry_count) {
let error = match next[0].send(ctx, request, &next[1..]).await {
Ok(response) if (200..400).contains(&response.status().as_u16()) => {
log::trace!(
"Succesful response. Request={:?} response={:?}",
request,
response
);
// Successful status code
return Ok(response);
}
Ok(response) => {
// Error status code
let status = response.status();
let body = response.into_body_string().await;
let error = Box::new(HttpError::ErrorStatusCode { status, body });
if !RETRY_STATUSES.contains(&status) {
log::error!(
"server returned error status which will not be retried: {}",
status
);
// Server didn't return a status we retry on so return early
return Err(error);
} else {
retry_count += 1;

sleep(self.sleep_duration(retry_count)).await;
}
log::debug!(
"server returned error status which requires retry: {}",
status
);
error
}
Err(error) => {
log::debug!(
"error occurred when making request which will be retried: {}",
error
);
error
}
};

if self.is_expired(&mut first_retry_time, retry_count) {
return Err(error);
}
retry_count += 1;

sleep(self.sleep_duration(retry_count)).await;
}
}
}
41 changes: 27 additions & 14 deletions sdk/core/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ impl ResponseBuilder {
}
}

// An HTTP Response
pub struct Response {
status: StatusCode,
headers: HeaderMap,
Expand Down Expand Up @@ -63,20 +64,18 @@ impl Response {
(self.status, self.headers, self.body)
}

pub async fn validate(self, expected_status: StatusCode) -> Result<Self, crate::HttpError> {
let status = self.status();
if expected_status != status {
let body = collect_pinned_stream(self.body)
.await
.unwrap_or_else(|_| Bytes::from_static("<INVALID BODY>".as_bytes()));
Err(crate::HttpError::new_unexpected_status_code(
expected_status,
status,
std::str::from_utf8(&body as &[u8]).unwrap_or("<NON-UTF8 BODY>"),
))
} else {
Ok(self)
}
pub async fn into_body_string(self) -> String {
pinned_stream_into_utf8_string(self.body).await
}
}

impl std::fmt::Debug for Response {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Response")
.field("status", &self.status)
.field("headers", &self.headers)
.field("body", &"<BODY>")
.finish()
}
}

Expand All @@ -92,6 +91,20 @@ pub async fn collect_pinned_stream(mut pinned_stream: PinnedStream) -> Result<By
Ok(final_result.into())
}

/// Collects a `PinnedStream` into a utf8 String
///
/// If the stream cannot be collected or is not utf8, a placeholder string
/// will be returned.
pub async fn pinned_stream_into_utf8_string(stream: PinnedStream) -> String {
let body = collect_pinned_stream(stream)
.await
.unwrap_or_else(|_| Bytes::from_static("<INVALID BODY>".as_bytes()));
let body = std::str::from_utf8(&body)
.unwrap_or("<NON-UTF8 BODY>")
.to_owned();
body
}

impl From<BytesResponse> for Response {
fn from(bytes_response: BytesResponse) -> Self {
let (status, headers, body) = bytes_response.deconstruct();
Expand Down
8 changes: 0 additions & 8 deletions sdk/cosmos/src/clients/collection_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ impl CollectionClient {
let response = self
.pipeline()
.send(&mut pipeline_context, &mut request)
.await?
.validate(http::StatusCode::OK)
.await?;

Ok(GetCollectionResponse::try_from(response).await?)
Expand All @@ -80,8 +78,6 @@ impl CollectionClient {
let response = self
.pipeline()
.send(&mut pipeline_context, &mut request)
.await?
.validate(http::StatusCode::NO_CONTENT)
.await?;

Ok(DeleteCollectionResponse::try_from(response).await?)
Expand All @@ -102,8 +98,6 @@ impl CollectionClient {
let response = self
.pipeline()
.send(&mut pipeline_context, &mut request)
.await?
.validate(http::StatusCode::OK)
.await?;

Ok(ReplaceCollectionResponse::try_from(response).await?)
Expand All @@ -128,8 +122,6 @@ impl CollectionClient {
let response = self
.pipeline()
.send(&mut pipeline_context, &mut request)
.await?
.validate(http::StatusCode::CREATED)
.await?;

Ok(CreateDocumentResponse::try_from(response).await?)
Expand Down
4 changes: 0 additions & 4 deletions sdk/cosmos/src/clients/cosmos_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,6 @@ impl CosmosClient {
let response = self
.pipeline()
.send(&mut pipeline_context, &mut request)
.await?
.validate(http::StatusCode::CREATED)
.await?;

Ok(CreateDatabaseResponse::try_from(response).await?)
Expand Down Expand Up @@ -239,7 +237,6 @@ impl CosmosClient {
.send(&mut pipeline_context, &mut request)
.await
);
let response = r#try!(response.validate(http::StatusCode::OK).await);

ListDatabasesResponse::try_from(response).await
}
Expand All @@ -256,7 +253,6 @@ impl CosmosClient {
.send(&mut pipeline_context, &mut request)
.await
);
let response = r#try!(response.validate(http::StatusCode::OK).await);
ListDatabasesResponse::try_from(response).await
}
State::Done => return None,
Expand Down
10 changes: 0 additions & 10 deletions sdk/cosmos/src/clients/database_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ impl DatabaseClient {
let response = self
.pipeline()
.send(&mut pipeline_context, &mut request)
.await?
.validate(http::StatusCode::OK)
.await?;

Ok(GetDatabaseResponse::try_from(response).await?)
Expand All @@ -94,8 +92,6 @@ impl DatabaseClient {
let response = self
.pipeline()
.send(&mut pipeline_context, &mut request)
.await?
.validate(http::StatusCode::OK)
.await?;

Ok(DeleteDatabaseResponse::try_from(response).await?)
Expand Down Expand Up @@ -127,7 +123,6 @@ impl DatabaseClient {
.send(&mut pipeline_context, &mut request)
.await
);
let response = r#try!(response.validate(http::StatusCode::OK).await);
ListCollectionsResponse::try_from(response).await
}
State::Continuation(continuation_token) => {
Expand All @@ -146,7 +141,6 @@ impl DatabaseClient {
.send(&mut pipeline_context, &mut request)
.await
);
let response = r#try!(response.validate(http::StatusCode::OK).await);
ListCollectionsResponse::try_from(response).await
}
State::Done => return None,
Expand Down Expand Up @@ -182,8 +176,6 @@ impl DatabaseClient {
let response = self
.pipeline()
.send(&mut pipeline_context, &mut request)
.await?
.validate(http::StatusCode::CREATED)
.await?;

Ok(CreateCollectionResponse::try_from(response).await?)
Expand Down Expand Up @@ -215,7 +207,6 @@ impl DatabaseClient {
.send(&mut pipeline_context, &mut request)
.await
);
let response = r#try!(response.validate(http::StatusCode::OK).await);
ListUsersResponse::try_from(response).await
}
State::Continuation(continuation_token) => {
Expand All @@ -234,7 +225,6 @@ impl DatabaseClient {
.send(&mut pipeline_context, &mut request)
.await
);
let response = r#try!(response.validate(http::StatusCode::OK).await);
ListUsersResponse::try_from(response).await
}
State::Done => return None,
Expand Down
2 changes: 0 additions & 2 deletions sdk/cosmos/src/clients/document_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ impl DocumentClient {
.cosmos_client()
.pipeline()
.send(&mut pipeline_context, &mut request)
.await?
.validate(http::StatusCode::OK)
.await?;

GetDocumentResponse::try_from(response).await
Expand Down
Loading

0 comments on commit c6f94bc

Please sign in to comment.