Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): Handle GCS Response retriable errors #2588

Merged
merged 2 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/lib/default_da_clients/src/object_store/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl DataAvailabilityClient for ObjectStoreDAClient {
.await
{
return Err(DAError {
is_transient: err.is_transient(),
is_transient: err.is_retriable(),
error: anyhow::Error::from(err),
});
}
Expand All @@ -66,7 +66,7 @@ impl DataAvailabilityClient for ObjectStoreDAClient {
}

return Err(DAError {
is_transient: err.is_transient(),
is_transient: err.is_retriable(),
error: anyhow::Error::from(err),
});
}
Expand Down
2 changes: 1 addition & 1 deletion core/lib/object_store/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ impl From<io::Error> for ObjectStoreError {
match err.kind() {
io::ErrorKind::NotFound => ObjectStoreError::KeyNotFound(err.into()),
kind => ObjectStoreError::Other {
is_transient: matches!(kind, io::ErrorKind::Interrupted | io::ErrorKind::TimedOut),
is_retriable: matches!(kind, io::ErrorKind::Interrupted | io::ErrorKind::TimedOut),
source: err.into(),
},
}
Expand Down
25 changes: 13 additions & 12 deletions core/lib/object_store/src/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,22 +85,22 @@ impl GoogleCloudStore {

impl From<AuthError> for ObjectStoreError {
fn from(err: AuthError) -> Self {
let is_transient = matches!(
let is_retriable = matches!(
&err,
AuthError::HttpError(err) if is_transient_http_error(err)
AuthError::HttpError(err) if is_retriable_http_error(err)
);
Self::Initialization {
source: err.into(),
is_transient,
is_retriable,
}
}
}

fn is_transient_http_error(err: &reqwest::Error) -> bool {
fn is_retriable_http_error(err: &reqwest::Error) -> bool {
EmilLuta marked this conversation as resolved.
Show resolved Hide resolved
err.is_timeout()
|| err.is_connect()
// Not all request errors are logically transient, but a significant part of them are (e.g.,
// `hyper` protocol-level errors), and it's safer to consider an error transient.
// `hyper` protocol-level errors), and it's safer to consider an error retriable.
|| err.is_request()
|| has_transient_io_source(err)
|| err.status() == Some(StatusCode::BAD_GATEWAY)
Expand All @@ -117,8 +117,8 @@ fn get_source<'a, T: StdError + 'static>(mut err: &'a (dyn StdError + 'static))
}

fn has_transient_io_source(err: &(dyn StdError + 'static)) -> bool {
// We treat any I/O errors as transient. This isn't always true, but frequently occurring I/O errors
// (e.g., "connection reset by peer") *are* transient, and treating an error as transient is a safer option,
// We treat any I/O errors as retriable. This isn't always true, but frequently occurring I/O errors
// (e.g., "connection reset by peer") *are* transient, and treating an error as retriable is a safer option,
// even if it can lead to unnecessary retries.
get_source::<io::Error>(err).is_some()
}
Expand All @@ -136,19 +136,20 @@ impl From<HttpError> for ObjectStoreError {
if is_not_found {
ObjectStoreError::KeyNotFound(err.into())
} else {
let is_transient = match &err {
HttpError::HttpClient(err) => is_transient_http_error(err),
let is_retriable = match &err {
HttpError::HttpClient(err) => is_retriable_http_error(err),
HttpError::TokenSource(err) => {
// Token sources are mostly based on the `reqwest` HTTP client, so transient error detection
// Token sources are mostly based on the `reqwest` HTTP client, so retriable error detection
// can reuse the same logic.
let err = err.as_ref();
has_transient_io_source(err)
|| get_source::<reqwest::Error>(err).is_some_and(is_transient_http_error)
|| get_source::<reqwest::Error>(err).is_some_and(is_retriable_http_error)
}
HttpError::Response(err) => err.is_retriable(),
EmilLuta marked this conversation as resolved.
Show resolved Hide resolved
_ => false,
};
ObjectStoreError::Other {
is_transient,
is_retriable,
source: err.into(),
}
}
Expand Down
20 changes: 10 additions & 10 deletions core/lib/object_store/src/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub enum ObjectStoreError {
/// Object store initialization failed.
Initialization {
source: BoxedError,
is_transient: bool,
is_retriable: bool,
},
/// An object with the specified key is not found.
KeyNotFound(BoxedError),
Expand All @@ -68,16 +68,16 @@ pub enum ObjectStoreError {
/// Other error has occurred when accessing the store (e.g., a network error).
Other {
source: BoxedError,
is_transient: bool,
is_retriable: bool,
},
}

impl ObjectStoreError {
/// Gives a best-effort estimate whether this error is transient.
pub fn is_transient(&self) -> bool {
/// Gives a best-effort estimate whether this error is retriable.
pub fn is_retriable(&self) -> bool {
match self {
Self::Initialization { is_transient, .. } | Self::Other { is_transient, .. } => {
*is_transient
Self::Initialization { is_retriable, .. } | Self::Other { is_retriable, .. } => {
*is_retriable
}
Self::KeyNotFound(_) | Self::Serialization(_) => false,
}
Expand All @@ -89,9 +89,9 @@ impl fmt::Display for ObjectStoreError {
match self {
Self::Initialization {
source,
is_transient,
is_retriable,
} => {
let kind = if *is_transient { "transient" } else { "fatal" };
let kind = if *is_retriable { "retriable" } else { "fatal" };
write!(
formatter,
"{kind} error initializing object store: {source}"
Expand All @@ -101,9 +101,9 @@ impl fmt::Display for ObjectStoreError {
Self::Serialization(err) => write!(formatter, "serialization error: {err}"),
Self::Other {
source,
is_transient,
is_retriable,
} => {
let kind = if *is_transient { "transient" } else { "fatal" };
let kind = if *is_retriable { "retriable" } else { "fatal" };
write!(formatter, "{kind} error accessing object store: {source}")
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/lib/object_store/src/retries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl Request<'_> {
let result = loop {
match f().await {
Ok(result) => break Ok(result),
Err(err) if err.is_transient() => {
Err(err) if err.is_retriable() => {
if retries > max_retries {
tracing::warn!(%err, "Exhausted {max_retries} retries performing request; returning last error");
break Err(err);
Expand Down Expand Up @@ -142,7 +142,7 @@ mod test {

fn transient_error() -> ObjectStoreError {
ObjectStoreError::Other {
is_transient: true,
is_retriable: true,
source: "oops".into(),
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/lib/snapshots_applier/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ enum SnapshotsApplierError {

impl SnapshotsApplierError {
fn object_store(err: ObjectStoreError, context: String) -> Self {
if err.is_transient() {
if err.is_retriable() {
Self::Retryable(anyhow::Error::from(err).context(context))
} else {
Self::Fatal(anyhow::Error::from(err).context(context))
Expand Down
4 changes: 2 additions & 2 deletions core/lib/snapshots_applier/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async fn snapshots_creator_can_successfully_recover_db(
Ok(()) // "recover" after 3 retries
} else {
Err(ObjectStoreError::Other {
is_transient: true,
is_retriable: true,
source: "transient error".into(),
})
}
Expand Down Expand Up @@ -550,7 +550,7 @@ async fn applier_returns_error_after_too_many_object_store_retries() {
let (object_store, client) = prepare_clients(&expected_status, &storage_logs).await;
let object_store = ObjectStoreWithErrors::new(object_store, |_| {
Err(ObjectStoreError::Other {
is_transient: true,
is_retriable: true,
source: "service not available".into(),
})
});
Expand Down
2 changes: 1 addition & 1 deletion core/node/block_reverter/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ impl ObjectStore for ErroneousStore {
.unwrap()
.remove(&(bucket, key.to_owned()));
Err(ObjectStoreError::Other {
is_transient: false,
is_retriable: false,
source: "fatal error".into(),
})
}
Expand Down
Loading