Skip to content

Commit

Permalink
Backoff on connection-unrelated errors (paritytech#178)
Browse files Browse the repository at this point in the history
* backoff on connection-unrelated errors

* cargo fmt --all

* Fix some typos

Co-authored-by: Hernando Castano <[email protected]>
  • Loading branch information
2 people authored and serban300 committed Apr 10, 2024
1 parent 1627e73 commit e59b42e
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 18 deletions.
1 change: 1 addition & 0 deletions bridges/relays/ethereum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ ansi_term = "0.12"
async-std = "=1.5.0"
async-stream = "0.2.0"
async-trait = "0.1.36"
backoff = "0.1"
clap = { version = "2.33.1", features = ["yaml"] }
codec = { package = "parity-scale-codec", version = "1.0.0" }
env_logger = "0.7.0"
Expand Down
132 changes: 114 additions & 18 deletions bridges/relays/ethereum/src/sync_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::sync_types::{
};

use async_trait::async_trait;
use backoff::{backoff::Backoff, ExponentialBackoff};
use futures::{future::FutureExt, stream::StreamExt};
use num_traits::{Saturating, Zero};
use std::{
Expand All @@ -44,6 +45,9 @@ const BACKUP_STALL_SYNC_TIMEOUT: Duration = Duration::from_secs(10 * 60);
/// Delay after connection-related error happened before we'll try
/// reconnection again.
const CONNECTION_ERROR_DELAY: Duration = Duration::from_secs(10);
/// Max delay after connection-unrelated error happened before we'll try the
/// same request again.
const MAX_BACKOFF_INTERVAL: Duration = Duration::from_secs(60);

/// Source client trait.
#[async_trait]
Expand Down Expand Up @@ -128,6 +132,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
let mut stall_countdown = None;
let mut last_update_time = Instant::now();

let mut source_retry_backoff = retry_backoff();
let mut source_client_is_online = false;
let mut source_best_block_number_required = false;
let source_best_block_number_future = source_client.best_block_number().fuse();
Expand All @@ -138,6 +143,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
let source_go_offline_future = futures::future::Fuse::terminated();
let source_tick_stream = interval(source_tick).fuse();

let mut target_retry_backoff = retry_backoff();
let mut target_client_is_online = false;
let mut target_best_block_required = false;
let mut target_incomplete_headers_required = true;
Expand Down Expand Up @@ -175,45 +181,50 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(

source_client_is_online = process_future_result(
source_best_block_number,
&mut source_retry_backoff,
|source_best_block_number| sync.source_best_header_number_response(source_best_block_number),
&mut source_go_offline_future,
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|delay| async_std::task::sleep(delay),
|| format!("Error retrieving best header number from {}", P::SOURCE_NAME),
);
},
source_new_header = source_new_header_future => {
source_client_is_online = process_future_result(
source_new_header,
&mut source_retry_backoff,
|source_new_header| sync.headers_mut().header_response(source_new_header),
&mut source_go_offline_future,
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|delay| async_std::task::sleep(delay),
|| format!("Error retrieving header from {} node", P::SOURCE_NAME),
);
},
source_orphan_header = source_orphan_header_future => {
source_client_is_online = process_future_result(
source_orphan_header,
&mut source_retry_backoff,
|source_orphan_header| sync.headers_mut().header_response(source_orphan_header),
&mut source_go_offline_future,
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|delay| async_std::task::sleep(delay),
|| format!("Error retrieving orphan header from {} node", P::SOURCE_NAME),
);
},
source_extra = source_extra_future => {
source_client_is_online = process_future_result(
source_extra,
&mut source_retry_backoff,
|(header, extra)| sync.headers_mut().extra_response(&header, extra),
&mut source_go_offline_future,
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|delay| async_std::task::sleep(delay),
|| format!("Error retrieving extra data from {} node", P::SOURCE_NAME),
);
},
source_completion = source_completion_future => {
source_client_is_online = process_future_result(
source_completion,
&mut source_retry_backoff,
|(header, completion)| sync.headers_mut().completion_response(&header, completion),
&mut source_go_offline_future,
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|delay| async_std::task::sleep(delay),
|| format!("Error retrieving completion data from {} node", P::SOURCE_NAME),
);
},
Expand All @@ -230,6 +241,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(

target_client_is_online = process_future_result(
target_best_block,
&mut target_retry_backoff,
|target_best_block| {
let head_updated = sync.target_best_header_response(target_best_block);
if head_updated {
Expand Down Expand Up @@ -264,7 +276,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
}
},
&mut target_go_offline_future,
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|delay| async_std::task::sleep(delay),
|| format!("Error retrieving best known header from {} node", P::TARGET_NAME),
);
},
Expand All @@ -273,20 +285,22 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(

target_client_is_online = process_future_result(
incomplete_headers_ids,
&mut target_retry_backoff,
|incomplete_headers_ids| sync.headers_mut().incomplete_headers_response(incomplete_headers_ids),
&mut target_go_offline_future,
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|delay| async_std::task::sleep(delay),
|| format!("Error retrieving incomplete headers from {} node", P::TARGET_NAME),
);
},
target_existence_status = target_existence_status_future => {
target_client_is_online = process_future_result(
target_existence_status,
&mut target_retry_backoff,
|(target_header, target_existence_status)| sync
.headers_mut()
.maybe_orphan_response(&target_header, target_existence_status),
&mut target_go_offline_future,
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|delay| async_std::task::sleep(delay),
|| format!("Error retrieving existence status from {} node", P::TARGET_NAME),
);
},
Expand All @@ -297,9 +311,10 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(

target_client_is_online = process_future_result(
maybe_fatal_error,
&mut target_retry_backoff,
|_| {},
&mut target_go_offline_future,
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|delay| async_std::task::sleep(delay),
|| format!("Error submitting headers to {} node", P::TARGET_NAME),
);

Expand All @@ -309,20 +324,22 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
target_complete_header_result = target_complete_header_future => {
target_client_is_online = process_future_result(
target_complete_header_result,
&mut target_retry_backoff,
|completed_header| sync.headers_mut().header_completed(&completed_header),
&mut target_go_offline_future,
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|delay| async_std::task::sleep(delay),
|| format!("Error completing headers at {}", P::TARGET_NAME),
);
},
target_extra_check_result = target_extra_check_future => {
target_client_is_online = process_future_result(
target_extra_check_result,
&mut target_retry_backoff,
|(header, extra_check_result)| sync
.headers_mut()
.maybe_extra_response(&header, extra_check_result),
&mut target_go_offline_future,
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|delay| async_std::task::sleep(delay),
|| format!("Error retrieving receipts requirement from {} node", P::TARGET_NAME),
);
},
Expand Down Expand Up @@ -522,16 +539,26 @@ fn interval(timeout: Duration) -> impl futures::Stream<Item = ()> {
})
}

/// Exponential backoff for connection-unrelated errors retries.
fn retry_backoff() -> ExponentialBackoff {
let mut backoff = ExponentialBackoff::default();
// we do not want relayer to stop
backoff.max_elapsed_time = None;
backoff.max_interval = MAX_BACKOFF_INTERVAL;
backoff
}

/// Process result of the future from a client.
///
/// Returns whether or not the client we're interacting with is online. In this context
/// what online means is that the client is currently not handling any other requests
/// that we've previously sent.
fn process_future_result<TResult, TError, TGoOfflineFuture>(
result: Result<TResult, TError>,
retry_backoff: &mut ExponentialBackoff,
on_success: impl FnOnce(TResult),
go_offline_future: &mut std::pin::Pin<&mut futures::future::Fuse<TGoOfflineFuture>>,
go_offline: impl FnOnce() -> TGoOfflineFuture,
go_offline: impl FnOnce(Duration) -> TGoOfflineFuture,
error_pattern: impl FnOnce() -> String,
) -> bool
where
Expand All @@ -543,16 +570,26 @@ where
match result {
Ok(result) => {
on_success(result);
retry_backoff.reset();
client_is_online = true
}
Err(error) => {
if error.is_connection_error() {
go_offline_future.set(go_offline().fuse());
let is_connection_error = error.is_connection_error();
let retry_delay = if is_connection_error {
retry_backoff.reset();
CONNECTION_ERROR_DELAY
} else {
client_is_online = true
}

log::error!(target: "bridge", "{}: {:?}", error_pattern(), error);
retry_backoff.next_backoff().unwrap_or(CONNECTION_ERROR_DELAY)
};
go_offline_future.set(go_offline(retry_delay).fuse());

log::error!(
target: "bridge",
"{}: {:?}. Retrying in {}s",
error_pattern(),
error,
retry_delay.as_secs_f64(),
);
}
}

Expand Down Expand Up @@ -587,3 +624,62 @@ fn print_sync_progress<P: HeadersSyncPipeline>(
);
(now_time, now_best_header.clone().map(|id| id.0), *now_target_header)
}

#[cfg(test)]
mod tests {
use super::*;

#[derive(Debug)]
struct TestError(bool);

impl MaybeConnectionError for TestError {
fn is_connection_error(&self) -> bool {
self.0
}
}

fn run_backoff_test(result: Result<(), TestError>) -> (Duration, Duration) {
let mut backoff = retry_backoff();

// no randomness in tests (otherwise intervals may overlap => asserts are failing)
backoff.randomization_factor = 0f64;

// increase backoff's current interval
let interval1 = backoff.next_backoff().unwrap();
let interval2 = backoff.next_backoff().unwrap();
assert!(interval2 > interval1);

// successful future result leads to backoff's reset
let go_offline_future = futures::future::Fuse::terminated();
futures::pin_mut!(go_offline_future);

process_future_result(
result,
&mut backoff,
|_| {},
&mut go_offline_future,
|delay| async_std::task::sleep(delay),
|| unreachable!(),
);

(interval2, backoff.next_backoff().unwrap())
}

#[test]
fn process_future_result_resets_backoff_on_success() {
let (interval2, interval_after_reset) = run_backoff_test(Ok(()));
assert!(interval2 > interval_after_reset);
}

#[test]
fn process_future_result_resets_backoff_on_connection_error() {
let (interval2, interval_after_reset) = run_backoff_test(Err(TestError(true)));
assert!(interval2 > interval_after_reset);
}

#[test]
fn process_future_result_does_not_reset_backoff_on_non_connection_error() {
let (interval2, interval_after_reset) = run_backoff_test(Err(TestError(false)));
assert!(interval2 < interval_after_reset);
}
}

0 comments on commit e59b42e

Please sign in to comment.