Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry on 5xx, error on 4xx #465

Merged
merged 4 commits into from
Nov 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 2 additions & 36 deletions sdk/core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,17 +124,8 @@ pub enum StreamError {
pub enum HttpError {
#[error("Failed to serialize request body as json: {0}")]
BodySerializationError(serde_json::Error),
#[error(
"unexpected HTTP result (expected: {:?}, received: {:?}, body: {:?})",
expected,
received,
body
)]
UnexpectedStatusCode {
expected: Vec<StatusCode>,
received: StatusCode,
body: String,
},
#[error("HTTP error status (status: {:?}, body: {:?})", status, body)]
ErrorStatusCode { status: StatusCode, body: String },
#[error("UTF8 conversion error: {0}")]
Utf8Error(#[from] std::str::Utf8Error),
#[error("from UTF8 conversion error: {0}")]
Expand All @@ -157,31 +148,6 @@ pub enum HttpError {
StreamResetError(StreamError),
}

impl HttpError {
pub fn new_unexpected_status_code(
expected: StatusCode,
received: StatusCode,
body: &str,
) -> HttpError {
HttpError::UnexpectedStatusCode {
expected: vec![expected],
received,
body: body.to_owned(),
}
}

pub fn new_multiple_unexpected_status_code(
allowed: Vec<StatusCode>,
received: StatusCode,
body: &str,
) -> HttpError {
HttpError::UnexpectedStatusCode {
expected: allowed,
received,
body: body.to_owned(),
}
}
}
#[derive(Debug, PartialEq, thiserror::Error)]
pub enum Not512ByteAlignedError {
#[error("start range not 512-byte aligned: {0}")]
Expand Down
40 changes: 5 additions & 35 deletions sdk/core/src/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,45 +41,15 @@ pub trait HttpClient: Send + Sync + std::fmt::Debug {
async fn execute_request_check_status(
&self,
request: Request<Bytes>,
expected_status: StatusCode,
_expected_status: StatusCode,
) -> Result<Response<Bytes>, HttpError> {
let response = self.execute_request(request).await?;
if expected_status != response.status() {
Err(HttpError::new_unexpected_status_code(
expected_status,
response.status(),
std::str::from_utf8(response.body())?,
))
} else {
let status = response.status();
if (200..400).contains(&status.as_u16()) {
Ok(response)
}
}

async fn execute_request_check_statuses(
&self,
request: Request<Bytes>,
expected_statuses: &[StatusCode],
) -> Result<Response<Bytes>, HttpError> {
let response = self.execute_request(request).await?;
if !expected_statuses
.iter()
.any(|expected_status| *expected_status == response.status())
{
if expected_statuses.len() == 1 {
Err(HttpError::new_unexpected_status_code(
expected_statuses[0],
response.status(),
std::str::from_utf8(response.body())?,
))
} else {
Err(HttpError::new_multiple_unexpected_status_code(
expected_statuses.to_vec(),
response.status(),
std::str::from_utf8(response.body())?,
))
}
} else {
Ok(response)
let body = std::str::from_utf8(response.body())?.to_owned();
Err(crate::HttpError::ErrorStatusCode { status, body })
}
}
}
Expand Down
73 changes: 63 additions & 10 deletions sdk/core/src/policies/retry_policies/retry_policy.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,37 @@
use crate::policies::{Policy, PolicyResult, Request, Response};
use crate::sleep::sleep;
use crate::PipelineContext;
use crate::{HttpError, PipelineContext};
use chrono::{DateTime, Local};
use http::StatusCode;
use std::sync::Arc;
use std::time::Duration;

/// A retry policy.
///
/// All retry policies follow a similar pattern only differing in how
/// they determine if the retry has expired and for how long they should
/// sleep between retries.
pub trait RetryPolicy {
/// Determine if no more retries should be performed.
///
/// Must return true if no more retries should be attempted.
fn is_expired(&self, first_retry_time: &mut Option<DateTime<Local>>, retry_count: u32) -> bool;
/// Determine how long before the next retry should be attempted.
fn sleep_duration(&self, retry_count: u32) -> Duration;
}

/// The status codes where a retry should be attempted.
///
/// On all other 4xx and 5xx status codes no retry is attempted.
const RETRY_STATUSES: &[StatusCode] = &[
StatusCode::REQUEST_TIMEOUT,
StatusCode::TOO_MANY_REQUESTS,
StatusCode::INTERNAL_SERVER_ERROR,
StatusCode::BAD_GATEWAY,
StatusCode::SERVICE_UNAVAILABLE,
StatusCode::GATEWAY_TIMEOUT,
];

#[async_trait::async_trait]
impl<T, C> Policy<C> for T
where
Expand All @@ -26,19 +48,50 @@ where
let mut retry_count = 0;

loop {
match next[0].send(ctx, request, &next[1..]).await {
Ok(response) => return Ok(response),
Err(error) => {
log::error!("Error occurred when making request: {}", error);
if self.is_expired(&mut first_retry_time, retry_count) {
let error = match next[0].send(ctx, request, &next[1..]).await {
Ok(response) if (200..400).contains(&response.status().as_u16()) => {
log::trace!(
"Succesful response. Request={:?} response={:?}",
request,
response
);
// Successful status code
return Ok(response);
}
Ok(response) => {
// Error status code
let status = response.status();
let body = response.into_body_string().await;
let error = Box::new(HttpError::ErrorStatusCode { status, body });
if !RETRY_STATUSES.contains(&status) {
log::error!(
"server returned error status which will not be retried: {}",
status
);
// Server didn't return a status we retry on so return early
return Err(error);
} else {
retry_count += 1;

sleep(self.sleep_duration(retry_count)).await;
}
log::debug!(
"server returned error status which requires retry: {}",
status
);
error
}
Err(error) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The C# SDK retries IOExceptions so we should retry here too (assuming we can discriminate the error to errors raised by the http policy).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you saying we should only retry IO related errors? Right now any error is retried.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sorry, that was what I meant... 😔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I can tackle that in another PR. This might require that we change the pipeline to not work with Box<dyn Error> but with azure_core::Error instead.

log::debug!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 67 uses log::error for the same message. Should these be consistent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be retried (and hopefully succeed on retry) where as line 67 will be returned to the user as an error. Therefore, line 67 represents an error that the user will definitely handle while hopefully this line is just a transient state.

"error occurred when making request which will be retried: {}",
error
);
error
}
};

if self.is_expired(&mut first_retry_time, retry_count) {
return Err(error);
}
retry_count += 1;

sleep(self.sleep_duration(retry_count)).await;
}
}
}
41 changes: 27 additions & 14 deletions sdk/core/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ impl ResponseBuilder {
}
}

// An HTTP Response
pub struct Response {
status: StatusCode,
headers: HeaderMap,
Expand Down Expand Up @@ -63,20 +64,18 @@ impl Response {
(self.status, self.headers, self.body)
}

pub async fn validate(self, expected_status: StatusCode) -> Result<Self, crate::HttpError> {
let status = self.status();
if expected_status != status {
let body = collect_pinned_stream(self.body)
.await
.unwrap_or_else(|_| Bytes::from_static("<INVALID BODY>".as_bytes()));
Err(crate::HttpError::new_unexpected_status_code(
expected_status,
status,
std::str::from_utf8(&body as &[u8]).unwrap_or("<NON-UTF8 BODY>"),
))
} else {
Ok(self)
}
pub async fn into_body_string(self) -> String {
pinned_stream_into_utf8_string(self.body).await
}
}

impl std::fmt::Debug for Response {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Response")
.field("status", &self.status)
.field("headers", &self.headers)
.field("body", &"<BODY>")
.finish()
}
}

Expand All @@ -92,6 +91,20 @@ pub async fn collect_pinned_stream(mut pinned_stream: PinnedStream) -> Result<By
Ok(final_result.into())
}

/// Collects a `PinnedStream` into a utf8 String
///
/// If the stream cannot be collected or is not utf8, a placeholder string
/// will be returned.
pub async fn pinned_stream_into_utf8_string(stream: PinnedStream) -> String {
let body = collect_pinned_stream(stream)
.await
.unwrap_or_else(|_| Bytes::from_static("<INVALID BODY>".as_bytes()));
let body = std::str::from_utf8(&body)
.unwrap_or("<NON-UTF8 BODY>")
.to_owned();
body
}

impl From<BytesResponse> for Response {
fn from(bytes_response: BytesResponse) -> Self {
let (status, headers, body) = bytes_response.deconstruct();
Expand Down
8 changes: 0 additions & 8 deletions sdk/cosmos/src/clients/collection_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ impl CollectionClient {
let response = self
.pipeline()
.send(&mut pipeline_context, &mut request)
.await?
.validate(http::StatusCode::OK)
.await?;

Ok(GetCollectionResponse::try_from(response).await?)
Expand All @@ -80,8 +78,6 @@ impl CollectionClient {
let response = self
.pipeline()
.send(&mut pipeline_context, &mut request)
.await?
.validate(http::StatusCode::NO_CONTENT)
.await?;

Ok(DeleteCollectionResponse::try_from(response).await?)
Expand All @@ -102,8 +98,6 @@ impl CollectionClient {
let response = self
.pipeline()
.send(&mut pipeline_context, &mut request)
.await?
.validate(http::StatusCode::OK)
.await?;

Ok(ReplaceCollectionResponse::try_from(response).await?)
Expand All @@ -128,8 +122,6 @@ impl CollectionClient {
let response = self
.pipeline()
.send(&mut pipeline_context, &mut request)
.await?
.validate(http::StatusCode::CREATED)
.await?;

Ok(CreateDocumentResponse::try_from(response).await?)
Expand Down
4 changes: 0 additions & 4 deletions sdk/cosmos/src/clients/cosmos_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,6 @@ impl CosmosClient {
let response = self
.pipeline()
.send(&mut pipeline_context, &mut request)
.await?
.validate(http::StatusCode::CREATED)
.await?;

Ok(CreateDatabaseResponse::try_from(response).await?)
Expand Down Expand Up @@ -239,7 +237,6 @@ impl CosmosClient {
.send(&mut pipeline_context, &mut request)
.await
);
let response = r#try!(response.validate(http::StatusCode::OK).await);

ListDatabasesResponse::try_from(response).await
}
Expand All @@ -256,7 +253,6 @@ impl CosmosClient {
.send(&mut pipeline_context, &mut request)
.await
);
let response = r#try!(response.validate(http::StatusCode::OK).await);
ListDatabasesResponse::try_from(response).await
}
State::Done => return None,
Expand Down
10 changes: 0 additions & 10 deletions sdk/cosmos/src/clients/database_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ impl DatabaseClient {
let response = self
.pipeline()
.send(&mut pipeline_context, &mut request)
.await?
.validate(http::StatusCode::OK)
.await?;

Ok(GetDatabaseResponse::try_from(response).await?)
Expand All @@ -94,8 +92,6 @@ impl DatabaseClient {
let response = self
.pipeline()
.send(&mut pipeline_context, &mut request)
.await?
.validate(http::StatusCode::OK)
.await?;

Ok(DeleteDatabaseResponse::try_from(response).await?)
Expand Down Expand Up @@ -127,7 +123,6 @@ impl DatabaseClient {
.send(&mut pipeline_context, &mut request)
.await
);
let response = r#try!(response.validate(http::StatusCode::OK).await);
ListCollectionsResponse::try_from(response).await
}
State::Continuation(continuation_token) => {
Expand All @@ -146,7 +141,6 @@ impl DatabaseClient {
.send(&mut pipeline_context, &mut request)
.await
);
let response = r#try!(response.validate(http::StatusCode::OK).await);
ListCollectionsResponse::try_from(response).await
}
State::Done => return None,
Expand Down Expand Up @@ -182,8 +176,6 @@ impl DatabaseClient {
let response = self
.pipeline()
.send(&mut pipeline_context, &mut request)
.await?
.validate(http::StatusCode::CREATED)
.await?;

Ok(CreateCollectionResponse::try_from(response).await?)
Expand Down Expand Up @@ -215,7 +207,6 @@ impl DatabaseClient {
.send(&mut pipeline_context, &mut request)
.await
);
let response = r#try!(response.validate(http::StatusCode::OK).await);
ListUsersResponse::try_from(response).await
}
State::Continuation(continuation_token) => {
Expand All @@ -234,7 +225,6 @@ impl DatabaseClient {
.send(&mut pipeline_context, &mut request)
.await
);
let response = r#try!(response.validate(http::StatusCode::OK).await);
ListUsersResponse::try_from(response).await
}
State::Done => return None,
Expand Down
2 changes: 0 additions & 2 deletions sdk/cosmos/src/clients/document_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ impl DocumentClient {
.cosmos_client()
.pipeline()
.send(&mut pipeline_context, &mut request)
.await?
.validate(http::StatusCode::OK)
.await?;

GetDocumentResponse::try_from(response).await
Expand Down
Loading