Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): Handle GCS Response retriable errors #2588

Merged
merged 2 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions core/bin/zksync_tee_prover/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,28 @@ pub(crate) enum TeeProverError {
}

impl TeeProverError {
pub fn is_transient(&self) -> bool {
pub fn is_retriable(&self) -> bool {
match self {
Self::Request(err) => is_transient_http_error(err),
Self::Request(err) => is_retriable_http_error(err),
_ => false,
}
}
}

fn is_transient_http_error(err: &reqwest::Error) -> bool {
fn is_retriable_http_error(err: &reqwest::Error) -> bool {
err.is_timeout()
|| err.is_connect()
// Not all request errors are logically transient, but a significant part of them are (e.g.,
// `hyper` protocol-level errors), and it's safer to consider an error transient.
// `hyper` protocol-level errors), and it's safer to consider an error retriable.
|| err.is_request()
|| has_transient_io_source(err)
|| err.status() == Some(StatusCode::BAD_GATEWAY)
|| err.status() == Some(StatusCode::SERVICE_UNAVAILABLE)
}

fn has_transient_io_source(err: &(dyn StdError + 'static)) -> bool {
// We treat any I/O errors as transient. This isn't always true, but frequently occurring I/O errors
// (e.g., "connection reset by peer") *are* transient, and treating an error as transient is a safer option,
// We treat any I/O errors as retriable. This isn't always true, but frequently occurring I/O errors
// (e.g., "connection reset by peer") *are* transient, and treating an error as retriable is a safer option,
// even if it can lead to unnecessary retries.
get_source::<io::Error>(err).is_some()
}
Expand Down
6 changes: 3 additions & 3 deletions core/bin/zksync_tee_prover/src/tee_prover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,10 @@ impl TeeProver {
/// TEE prover configuration options.
#[derive(Debug, Clone)]
pub struct TeeProverConfig {
/// Number of retries for transient errors before giving up on recovery (i.e., returning an error
/// Number of retries for retriable errors before giving up on recovery (i.e., returning an error
/// from [`Self::run()`]).
pub max_retries: usize,
/// Initial back-off interval when retrying recovery on a transient error. Each subsequent retry interval
/// Initial back-off interval when retrying recovery on a retriable error. Each subsequent retry interval
/// will be multiplied by [`Self.retry_backoff_multiplier`].
pub initial_retry_backoff: Duration,
pub retry_backoff_multiplier: f32,
Expand Down Expand Up @@ -198,7 +198,7 @@ impl Task for TeeProver {
}
Err(err) => {
METRICS.network_errors_counter.inc_by(1);
if !err.is_transient() || retries > self.config.max_retries {
if !err.is_retriable() || retries > self.config.max_retries {
return Err(err.into());
}
retries += 1;
Expand Down
10 changes: 5 additions & 5 deletions core/lib/da_client/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ use serde::Serialize;
#[derive(Debug)]
pub struct DAError {
pub error: anyhow::Error,
pub is_transient: bool,
pub is_retriable: bool,
}

impl DAError {
pub fn is_transient(&self) -> bool {
self.is_transient
pub fn is_retriable(&self) -> bool {
self.is_retriable
}
}

impl Display for DAError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let kind = if self.is_transient {
"transient"
let kind = if self.is_retriable {
"retriable"
} else {
"fatal"
};
Expand Down
6 changes: 3 additions & 3 deletions core/lib/default_da_clients/src/object_store/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl DataAvailabilityClient for ObjectStoreDAClient {
.await
{
return Err(DAError {
is_transient: err.is_transient(),
is_retriable: err.is_retriable(),
error: anyhow::Error::from(err),
});
}
Expand All @@ -53,7 +53,7 @@ impl DataAvailabilityClient for ObjectStoreDAClient {
async fn get_inclusion_data(&self, key: &str) -> Result<Option<InclusionData>, DAError> {
let key_u32 = key.parse::<u32>().map_err(|err| DAError {
error: anyhow::Error::from(err).context(format!("Failed to parse blob key: {}", key)),
is_transient: false,
is_retriable: false,
})?;

if let Err(err) = self
Expand All @@ -66,7 +66,7 @@ impl DataAvailabilityClient for ObjectStoreDAClient {
}

return Err(DAError {
is_transient: err.is_transient(),
is_retriable: err.is_retriable(),
error: anyhow::Error::from(err),
});
}
Expand Down
2 changes: 1 addition & 1 deletion core/lib/object_store/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ impl From<io::Error> for ObjectStoreError {
match err.kind() {
io::ErrorKind::NotFound => ObjectStoreError::KeyNotFound(err.into()),
kind => ObjectStoreError::Other {
is_transient: matches!(kind, io::ErrorKind::Interrupted | io::ErrorKind::TimedOut),
is_retriable: matches!(kind, io::ErrorKind::Interrupted | io::ErrorKind::TimedOut),
source: err.into(),
},
}
Expand Down
25 changes: 13 additions & 12 deletions core/lib/object_store/src/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,22 +85,22 @@ impl GoogleCloudStore {

impl From<AuthError> for ObjectStoreError {
fn from(err: AuthError) -> Self {
let is_transient = matches!(
let is_retriable = matches!(
&err,
AuthError::HttpError(err) if is_transient_http_error(err)
AuthError::HttpError(err) if is_retriable_http_error(err)
);
Self::Initialization {
source: err.into(),
is_transient,
is_retriable,
}
}
}

fn is_transient_http_error(err: &reqwest::Error) -> bool {
fn is_retriable_http_error(err: &reqwest::Error) -> bool {
EmilLuta marked this conversation as resolved.
Show resolved Hide resolved
err.is_timeout()
|| err.is_connect()
// Not all request errors are logically transient, but a significant part of them are (e.g.,
// `hyper` protocol-level errors), and it's safer to consider an error transient.
// `hyper` protocol-level errors), and it's safer to consider an error retriable.
|| err.is_request()
|| has_transient_io_source(err)
|| err.status() == Some(StatusCode::BAD_GATEWAY)
Expand All @@ -117,8 +117,8 @@ fn get_source<'a, T: StdError + 'static>(mut err: &'a (dyn StdError + 'static))
}

fn has_transient_io_source(err: &(dyn StdError + 'static)) -> bool {
// We treat any I/O errors as transient. This isn't always true, but frequently occurring I/O errors
// (e.g., "connection reset by peer") *are* transient, and treating an error as transient is a safer option,
// We treat any I/O errors as retriable. This isn't always true, but frequently occurring I/O errors
// (e.g., "connection reset by peer") *are* transient, and treating an error as retriable is a safer option,
// even if it can lead to unnecessary retries.
get_source::<io::Error>(err).is_some()
}
Expand All @@ -136,19 +136,20 @@ impl From<HttpError> for ObjectStoreError {
if is_not_found {
ObjectStoreError::KeyNotFound(err.into())
} else {
let is_transient = match &err {
HttpError::HttpClient(err) => is_transient_http_error(err),
let is_retriable = match &err {
HttpError::HttpClient(err) => is_retriable_http_error(err),
HttpError::TokenSource(err) => {
// Token sources are mostly based on the `reqwest` HTTP client, so transient error detection
// Token sources are mostly based on the `reqwest` HTTP client, so retriable error detection
// can reuse the same logic.
let err = err.as_ref();
has_transient_io_source(err)
|| get_source::<reqwest::Error>(err).is_some_and(is_transient_http_error)
|| get_source::<reqwest::Error>(err).is_some_and(is_retriable_http_error)
}
HttpError::Response(err) => err.is_retriable(),
EmilLuta marked this conversation as resolved.
Show resolved Hide resolved
_ => false,
};
ObjectStoreError::Other {
is_transient,
is_retriable,
source: err.into(),
}
}
Expand Down
20 changes: 10 additions & 10 deletions core/lib/object_store/src/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub enum ObjectStoreError {
/// Object store initialization failed.
Initialization {
source: BoxedError,
is_transient: bool,
is_retriable: bool,
},
/// An object with the specified key is not found.
KeyNotFound(BoxedError),
Expand All @@ -68,16 +68,16 @@ pub enum ObjectStoreError {
/// Other error has occurred when accessing the store (e.g., a network error).
Other {
source: BoxedError,
is_transient: bool,
is_retriable: bool,
},
}

impl ObjectStoreError {
/// Gives a best-effort estimate whether this error is transient.
pub fn is_transient(&self) -> bool {
/// Gives a best-effort estimate whether this error is retriable.
pub fn is_retriable(&self) -> bool {
match self {
Self::Initialization { is_transient, .. } | Self::Other { is_transient, .. } => {
*is_transient
Self::Initialization { is_retriable, .. } | Self::Other { is_retriable, .. } => {
*is_retriable
}
Self::KeyNotFound(_) | Self::Serialization(_) => false,
}
Expand All @@ -89,9 +89,9 @@ impl fmt::Display for ObjectStoreError {
match self {
Self::Initialization {
source,
is_transient,
is_retriable,
} => {
let kind = if *is_transient { "transient" } else { "fatal" };
let kind = if *is_retriable { "retriable" } else { "fatal" };
write!(
formatter,
"{kind} error initializing object store: {source}"
Expand All @@ -101,9 +101,9 @@ impl fmt::Display for ObjectStoreError {
Self::Serialization(err) => write!(formatter, "serialization error: {err}"),
Self::Other {
source,
is_transient,
is_retriable,
} => {
let kind = if *is_transient { "transient" } else { "fatal" };
let kind = if *is_retriable { "retriable" } else { "fatal" };
write!(formatter, "{kind} error accessing object store: {source}")
}
}
Expand Down
10 changes: 5 additions & 5 deletions core/lib/object_store/src/retries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl Request<'_> {
let result = loop {
match f().await {
Ok(result) => break Ok(result),
Err(err) if err.is_transient() => {
Err(err) if err.is_retriable() => {
if retries > max_retries {
tracing::warn!(%err, "Exhausted {max_retries} retries performing request; returning last error");
break Err(err);
Expand Down Expand Up @@ -140,9 +140,9 @@ mod test {

use super::*;

fn transient_error() -> ObjectStoreError {
fn retriable_error() -> ObjectStoreError {
ObjectStoreError::Other {
is_transient: true,
is_retriable: true,
source: "oops".into(),
}
}
Expand All @@ -159,7 +159,7 @@ mod test {
#[tokio::test]
async fn test_retry_failure_exhausted() {
let err = Request::New
.retry(&"store", 2, || async { Err::<i32, _>(transient_error()) })
.retry(&"store", 2, || async { Err::<i32, _>(retriable_error()) })
.await
.unwrap_err();
assert_matches!(err, ObjectStoreError::Other { .. });
Expand All @@ -173,7 +173,7 @@ mod test {
if retries + 1 == n {
Ok(42)
} else {
Err(transient_error())
Err(retriable_error())
}
})
.await
Expand Down
6 changes: 3 additions & 3 deletions core/lib/snapshots_applier/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ enum SnapshotsApplierError {

impl SnapshotsApplierError {
fn object_store(err: ObjectStoreError, context: String) -> Self {
if err.is_transient() {
if err.is_retriable() {
Self::Retryable(anyhow::Error::from(err).context(context))
} else {
Self::Fatal(anyhow::Error::from(err).context(context))
Expand Down Expand Up @@ -209,10 +209,10 @@ pub enum RecoveryCompletionStatus {
/// Snapshot applier configuration options.
#[derive(Debug, Clone)]
pub struct SnapshotsApplierConfig {
/// Number of retries for transient errors before giving up on recovery (i.e., returning an error
/// Number of retries for retriable errors before giving up on recovery (i.e., returning an error
/// from [`Self::run()`]).
pub retry_count: usize,
/// Initial back-off interval when retrying recovery on a transient error. Each subsequent retry interval
/// Initial back-off interval when retrying recovery on a retriable error. Each subsequent retry interval
/// will be multiplied by [`Self.retry_backoff_multiplier`].
pub initial_retry_backoff: Duration,
pub retry_backoff_multiplier: f32,
Expand Down
4 changes: 2 additions & 2 deletions core/lib/snapshots_applier/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async fn snapshots_creator_can_successfully_recover_db(
Ok(()) // "recover" after 3 retries
} else {
Err(ObjectStoreError::Other {
is_transient: true,
is_retriable: true,
source: "transient error".into(),
})
}
Expand Down Expand Up @@ -550,7 +550,7 @@ async fn applier_returns_error_after_too_many_object_store_retries() {
let (object_store, client) = prepare_clients(&expected_status, &storage_logs).await;
let object_store = ObjectStoreWithErrors::new(object_store, |_| {
Err(ObjectStoreError::Other {
is_transient: true,
is_retriable: true,
source: "service not available".into(),
})
});
Expand Down
4 changes: 2 additions & 2 deletions core/lib/web3_decl/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ impl EnrichedClientError {
self
}

/// Whether the error should be considered transient.
pub fn is_transient(&self) -> bool {
/// Whether the error should be considered retriable.
pub fn is_retriable(&self) -> bool {
match self.as_ref() {
ClientError::Transport(_) | ClientError::RequestTimeout => true,
ClientError::Call(err) => {
Expand Down
2 changes: 1 addition & 1 deletion core/node/block_reverter/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ impl ObjectStore for ErroneousStore {
.unwrap()
.remove(&(bucket, key.to_owned()));
Err(ObjectStoreError::Other {
is_transient: false,
is_retriable: false,
source: "fatal error".into(),
})
}
Expand Down
2 changes: 1 addition & 1 deletion core/node/commitment_generator/src/validation_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl L1BatchCommitmentModeValidationTask {
return Ok(());
}

Err(ContractCallError::EthereumGateway(err)) if err.is_transient() => {
Err(ContractCallError::EthereumGateway(err)) if err.is_retriable() => {
tracing::warn!(
"Transient error validating commitment mode, will retry after {:?}: {err}",
self.retry_interval
Expand Down
2 changes: 1 addition & 1 deletion core/node/consensus/src/en.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl EN {
match res {
Ok(Some(block)) => return Ok(block.try_into()?),
Ok(None) => {}
Err(err) if err.is_transient() => {}
Err(err) if err.is_retriable() => {}
Err(err) => {
return Err(anyhow::format_err!("client.fetch_l2_block({}): {err}", n).into());
}
Expand Down
2 changes: 1 addition & 1 deletion core/node/consensus/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ impl StateKeeper {
let res = ctx.wait(client.fetch_l2_block_number()).await?;
match res {
Ok(_) => return Ok(client),
Err(err) if err.is_transient() => {
Err(err) if err.is_retriable() => {
ctx.sleep(time::Duration::seconds(5)).await?;
}
Err(err) => {
Expand Down
8 changes: 4 additions & 4 deletions core/node/consistency_checker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ enum CheckError {
}

impl CheckError {
fn is_transient(&self) -> bool {
fn is_retriable(&self) -> bool {
match self {
Self::Web3(err) | Self::ContractCall(ContractCallError::EthereumGateway(err)) => {
err.is_transient()
err.is_retriable()
}
_ => false,
}
Expand Down Expand Up @@ -535,7 +535,7 @@ impl ConsistencyChecker {
self.event_handler.initialize();

while let Err(err) = self.sanity_check_diamond_proxy_addr().await {
if err.is_transient() {
if err.is_retriable() {
tracing::warn!(
"Transient error checking diamond proxy contract; will retry after a delay: {:#}",
anyhow::Error::from(err)
Expand Down Expand Up @@ -635,7 +635,7 @@ impl ConsistencyChecker {
}
}
}
Err(err) if err.is_transient() => {
Err(err) if err.is_retriable() => {
tracing::warn!(
"Transient error while verifying L1 batch #{batch_number}; will retry after a delay: {:#}",
anyhow::Error::from(err)
Expand Down
Loading
Loading