diff --git a/Cargo.lock b/Cargo.lock index 9a06129d8..6cde26b5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7413,6 +7413,17 @@ dependencies = [ "uint", ] +[[package]] +name = "priority-queue" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "714c75db297bc88a63783ffc6ab9f830698a6705aa0201416931759ef4c8183d" +dependencies = [ + "autocfg", + "equivalent", + "indexmap 2.6.0", +] + [[package]] name = "proc-macro-crate" version = "3.2.0" @@ -8642,6 +8653,7 @@ dependencies = [ "parking_lot", "portpicker", "pretty_assertions", + "priority-queue", "rand 0.8.5", "rand_chacha 0.3.1", "rand_distr", diff --git a/Cargo.toml b/Cargo.toml index 12738fd66..c52556dc4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -126,6 +126,7 @@ thiserror = "1.0.69" tracing = "0.1" bytesize = "1.3" itertools = "0.12" +priority-queue = "2" rand_chacha = "0.3" rand_distr = "0.4" reqwest = "0.12" diff --git a/builder/src/non_permissioned.rs b/builder/src/non_permissioned.rs index ab33c1fb3..e07509d6d 100644 --- a/builder/src/non_permissioned.rs +++ b/builder/src/non_permissioned.rs @@ -19,6 +19,7 @@ use hotshot_types::{ data::{fake_commitment, ViewNumber}, traits::{ block_contents::{vid_commitment, GENESIS_VID_NUM_STORAGE_NODES}, + metrics::NoMetrics, node_implementation::Versions, EncodeBytes, }, @@ -53,6 +54,7 @@ pub async fn build_instance_state( Arc::new(StatePeers::::from_urls( state_peers, Default::default(), + &NoMetrics, )), V::Base::VERSION, ); diff --git a/marketplace-builder/src/builder.rs b/marketplace-builder/src/builder.rs index 46a2005f5..f0daf5e31 100644 --- a/marketplace-builder/src/builder.rs +++ b/marketplace-builder/src/builder.rs @@ -30,6 +30,7 @@ use hotshot_types::{ data::{fake_commitment, Leaf, ViewNumber}, traits::{ block_contents::{vid_commitment, Transaction as _, GENESIS_VID_NUM_STORAGE_NODES}, + metrics::NoMetrics, node_implementation::{ConsensusTime, NodeType, Versions}, EncodeBytes, }, @@ -74,6 +75,7 @@ pub async fn build_instance_state( Arc::new(StatePeers::::from_urls( state_peers, Default::default(), + &NoMetrics, )), V::Base::version(), ); diff --git a/sequencer-sqlite/Cargo.lock b/sequencer-sqlite/Cargo.lock index b2f2e85e6..ca6f27327 100644 --- a/sequencer-sqlite/Cargo.lock +++ b/sequencer-sqlite/Cargo.lock @@ -7139,6 +7139,17 @@ dependencies = [ "uint", ] +[[package]] +name = "priority-queue" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "714c75db297bc88a63783ffc6ab9f830698a6705aa0201416931759ef4c8183d" +dependencies = [ + "autocfg", + "equivalent", + "indexmap 2.7.0", +] + [[package]] name = "proc-macro-crate" version = "3.2.0" @@ -8359,6 +8370,7 @@ dependencies = [ "num_enum", "parking_lot", "portpicker", + "priority-queue", "rand 0.8.5", "rand_chacha 0.3.1", "rand_distr", diff --git a/sequencer/Cargo.toml b/sequencer/Cargo.toml index f9948fc79..b95b182af 100644 --- a/sequencer/Cargo.toml +++ b/sequencer/Cargo.toml @@ -95,6 +95,7 @@ marketplace-solver = { path = "../marketplace-solver" } num_enum = "0.7" parking_lot = "0.12" portpicker = { workspace = true } +priority-queue = { workspace = true } rand = { workspace = true } rand_chacha = { workspace = true } rand_distr = { workspace = true } diff --git a/sequencer/src/api.rs b/sequencer/src/api.rs index 12f8191e2..a3cd5c593 100644 --- a/sequencer/src/api.rs +++ b/sequencer/src/api.rs @@ -1527,6 +1527,7 @@ mod test { StatePeers::>::from_urls( vec![format!("http://localhost:{port}").parse().unwrap()], Default::default(), + &NoMetrics, ) })) .build(); @@ -1571,6 +1572,7 @@ mod test { StatePeers::>::from_urls( vec![format!("http://localhost:{port}").parse().unwrap()], Default::default(), + &NoMetrics, ), &NoMetrics, test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST, @@ -1636,6 +1638,7 @@ mod test { StatePeers::>::from_urls( vec![format!("http://localhost:{port}").parse().unwrap()], Default::default(), + &NoMetrics, ) })) .network_config(TestConfigBuilder::default().l1_url(l1).build()) @@ -1713,6 +1716,7 @@ mod test { StatePeers::>::from_urls( vec![format!("http://localhost:{port}").parse().unwrap()], Default::default(), + &NoMetrics, ) })) .network_config(TestConfigBuilder::default().l1_url(l1).build()) @@ -1773,6 +1777,7 @@ mod test { StatePeers::::from_urls( vec![format!("http://localhost:{port}").parse().unwrap()], BackoffParams::default(), + &NoMetrics, ) }); @@ -1780,6 +1785,7 @@ mod test { peers[2] = StatePeers::::from_urls( vec![url.clone()], BackoffParams::default(), + &NoMetrics, ); let config = TestNetworkConfigBuilder::::with_num_nodes() @@ -1801,13 +1807,16 @@ mod test { // The catchup should successfully retrieve the correct chain config. let node = &network.peers[0]; let peers = node.node_state().peers; - peers.try_fetch_chain_config(cf.commit()).await.unwrap(); + peers.try_fetch_chain_config(0, cf.commit()).await.unwrap(); // Test a catchup request for node #1, which is connected to a dishonest peer. // This request will result in an error due to the malicious chain config provided by the peer. let node = &network.peers[1]; let peers = node.node_state().peers; - peers.try_fetch_chain_config(cf.commit()).await.unwrap_err(); + peers + .try_fetch_chain_config(0, cf.commit()) + .await + .unwrap_err(); network.server.shut_down().await; handle.abort(); @@ -1963,6 +1972,7 @@ mod test { StatePeers::::from_urls( vec![format!("http://localhost:{port}").parse().unwrap()], Default::default(), + &NoMetrics, ) })) .network_config( @@ -2136,6 +2146,7 @@ mod test { StatePeers::>::from_urls( vec![format!("http://localhost:{port}").parse().unwrap()], Default::default(), + &NoMetrics, ) })) .network_config(TestConfigBuilder::default().l1_url(l1).build()) @@ -2200,6 +2211,7 @@ mod test { let peers = StatePeers::>::from_urls( vec!["https://notarealnode.network".parse().unwrap(), url], Default::default(), + &NoMetrics, ); // Fetch the config from node 1, a different node than the one running the service. diff --git a/sequencer/src/catchup.rs b/sequencer/src/catchup.rs index ccfe74322..8c8ca7a66 100644 --- a/sequencer/src/catchup.rs +++ b/sequencer/src/catchup.rs @@ -1,6 +1,7 @@ use std::sync::Arc; -use anyhow::{bail, Context}; +use anyhow::{anyhow, bail, ensure, Context}; +use async_lock::RwLock; use async_trait::async_trait; use committable::Commitment; use committable::Committable; @@ -9,17 +10,24 @@ use espresso_types::{ v0::traits::StateCatchup, v0_99::ChainConfig, BackoffParams, BlockMerkleTree, FeeAccount, FeeAccountProof, FeeMerkleCommitment, FeeMerkleTree, Leaf2, NodeState, }; -use futures::future::{Future, FutureExt}; +use futures::future::{Future, FutureExt, TryFuture, TryFutureExt}; use hotshot_types::{ - data::ViewNumber, network::NetworkConfig, traits::node_implementation::ConsensusTime as _, + data::ViewNumber, + network::NetworkConfig, + traits::{ + metrics::{Counter, CounterFamily, Metrics}, + node_implementation::ConsensusTime as _, + }, ValidatorConfig, }; use itertools::Itertools; use jf_merkle_tree::{prelude::MerkleNode, ForgetableMerkleTreeScheme, MerkleTreeScheme}; +use priority_queue::PriorityQueue; use serde::de::DeserializeOwned; -use std::collections::HashMap; +use std::{cmp::Ordering, collections::HashMap, fmt::Display, time::Duration}; use surf_disco::Request; use tide_disco::error::ServerError; +use tokio::time::timeout; use url::Url; use vbs::version::StaticVersionType; @@ -34,12 +42,20 @@ use crate::{ struct Client { inner: surf_disco::Client, url: Url, + requests: Arc>, + failures: Arc>, } impl Client { - pub fn new(url: Url) -> Self { + pub fn new( + url: Url, + requests: &(impl CounterFamily + ?Sized), + failures: &(impl CounterFamily + ?Sized), + ) -> Self { Self { inner: surf_disco::Client::new(url.clone()), + requests: Arc::new(requests.create(vec![url.to_string()])), + failures: Arc::new(failures.create(vec![url.to_string()])), url, } } @@ -64,49 +80,165 @@ pub(crate) async fn local_and_remote( } } +/// A score of a catchup peer, based on our interactions with that peer. +/// +/// The score accounts for malicious peers -- i.e. peers that gave us an invalid response to a +/// verifiable request -- and faulty/unreliable peers -- those that fail to respond to requests at +/// all. The score has a comparison function where higher is better, or in other words `p1 > p2` +/// means we believe we are more likely to successfully catch up using `p1` than `p2`. This makes it +/// convenient and efficient to collect peers in a priority queue which we can easily convert to a +/// list sorted by reliability. +#[derive(Clone, Copy, Debug, Default)] +struct PeerScore { + requests: usize, + failures: usize, +} + +impl Ord for PeerScore { + fn cmp(&self, other: &Self) -> Ordering { + // Compare failure rates: `self` is better than `other` if + // self.failures / self.requests < other.failures / other.requests + // or equivalently + // other.failures * self.requests > self.failures * other.requests + (other.failures * self.requests).cmp(&(self.failures * other.requests)) + } +} + +impl PartialOrd for PeerScore { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for PeerScore { + fn eq(&self, other: &Self) -> bool { + self.cmp(other).is_eq() + } +} + +impl Eq for PeerScore {} + #[derive(Debug, Clone, Default)] pub struct StatePeers { + // Peer IDs, ordered by reliability score. Each ID is an index into `clients`. + scores: Arc>>, clients: Vec>, backoff: BackoffParams, } impl StatePeers { - pub fn from_urls(urls: Vec, backoff: BackoffParams) -> Self { + async fn fetch( + &self, + retry: usize, + f: impl Fn(Client) -> Fut, + ) -> anyhow::Result + where + Fut: TryFuture, + { + // Since we have generally have multiple peers we can catch up from, we want a fairly + // aggressive timeout for requests: if a peer is not responding quickly, we're better off + // just trying the next one rather than waiting, and this prevents a malicious peer from + // delaying catchup for a long time. + // + // However, if we set the timeout _too_ aggressively, we might fail to catch up even from an + // honest peer, and thus never make progress. Thus, we start with a timeout of 500ms, which + // is aggressive but still very reasonable for an HTTP request. If that fails with all of + // our peers, we increase the timeout by 1 second for each successive retry, until we + // eventually succeed. + let timeout_dur = Duration::from_millis(500) * (retry as u32 + 1); + + // Keep track of which peers we make requests to and which succeed (`true`) or fail (`false`), + // so we can update reliability scores at the end. + let mut requests = HashMap::new(); + let mut res = Err(anyhow!("failed fetching from every peer")); + + // Try each peer in order of reliability score, until we succeed. We clone out of + // `self.scores` because it is small (contains only numeric IDs and scores), so this clone + // is a lot cheaper than holding the read lock the entire time we are making requests (which + // could be a while). + let mut scores = { (*self.scores.read().await).clone() }; + while let Some((id, score)) = scores.pop() { + let client = &self.clients[id]; + tracing::info!("fetching from {}", client.url); + match timeout(timeout_dur, f(client.clone()).into_future()).await { + Ok(Ok(t)) => { + requests.insert(id, true); + res = Ok(t); + break; + } + Ok(Err(err)) => { + tracing::warn!(id, ?score, peer = %client.url, "error from peer: {err:#}"); + requests.insert(id, false); + } + Err(_) => { + tracing::warn!(id, ?score, peer = %client.url, ?timeout_dur, "request timed out"); + requests.insert(id, false); + } + } + } + + // Update client scores. + let mut scores = self.scores.write().await; + for (id, success) in requests { + scores.change_priority_by(&id, |score| { + score.requests += 1; + self.clients[id].requests.add(1); + if !success { + score.failures += 1; + self.clients[id].failures.add(1); + } + }); + } + + res + } + + pub fn from_urls( + urls: Vec, + backoff: BackoffParams, + metrics: &(impl Metrics + ?Sized), + ) -> Self { if urls.is_empty() { panic!("Cannot create StatePeers with no peers"); } + let metrics = metrics.subgroup("catchup".into()); + let requests = metrics.counter_family("requests".into(), vec!["peer".into()]); + let failures = metrics.counter_family("request_failures".into(), vec!["peer".into()]); + + let scores = urls + .iter() + .enumerate() + .map(|(i, _)| (i, PeerScore::default())) + .collect(); + let clients = urls + .into_iter() + .map(|url| Client::new(url, &*requests, &*failures)) + .collect(); + Self { - clients: urls.into_iter().map(Client::new).collect(), + clients, + scores: Arc::new(RwLock::new(scores)), backoff, } } + #[tracing::instrument(skip(self, my_own_validator_config))] pub async fn fetch_config( &self, my_own_validator_config: ValidatorConfig, ) -> anyhow::Result> { self.backoff() - .retry(self, move |provider| { + .retry(self, move |provider, retry| { let my_own_validator_config = my_own_validator_config.clone(); async move { - for client in &provider.clients { - tracing::info!("fetching config from {}", client.url); - match client - .get::("config/hotshot") - .send() - .await - { - Ok(res) => { - return res.into_network_config(my_own_validator_config) - .context(format!("fetched config from {}, but failed to convert to private config", client.url)); - } - Err(err) => { - tracing::warn!("error fetching config from peer: {err:#}"); - } - } - } - bail!("could not fetch config from any peer"); + let cfg = provider + .fetch(retry, |client| { + client.get::("config/hotshot").send() + }) + .await?; + cfg.into_network_config(my_own_validator_config) + .context("fetched config, but failed to convert to private config") } .boxed() }) @@ -119,115 +251,82 @@ impl StateCatchup for StatePeers { #[tracing::instrument(skip(self, _instance))] async fn try_fetch_accounts( &self, + retry: usize, _instance: &NodeState, height: u64, view: ViewNumber, fee_merkle_tree_root: FeeMerkleCommitment, accounts: &[FeeAccount], ) -> anyhow::Result { - for client in self.clients.iter() { - tracing::info!("Fetching accounts from {}", client.url); - let req = match client + self.fetch(retry, |client| async move { + let snapshot = client .inner - .post::(&format!("catchup/{height}/{}/accounts", view.u64(),)) - .body_binary(&accounts.to_vec()) - { - Ok(req) => req, - Err(err) => { - tracing::warn!("failed to construct accounts catchup request: {err:#}"); - continue; - } - }; - let snapshot = match req.send().await { - Ok(res) => res, - Err(err) => { - tracing::info!(peer = %client.url, "error fetching accounts from peer: {err:#}"); - continue; - } - }; + .post::(&format!("catchup/{height}/{}/accounts", view.u64())) + .body_binary(&accounts.to_vec())? + .send() + .await?; // Verify proofs. for account in accounts { - let Some((proof, _)) = FeeAccountProof::prove(&snapshot, (*account).into()) else { - tracing::warn!(peer = %client.url, "response from peer missing account {account}"); - continue; - }; - if let Err(err) = proof.verify(&fee_merkle_tree_root) { - tracing::warn!(peer = %client.url, "peer gave invalid proof for account {account}: {err:#}"); - continue; - } + let (proof, _) = FeeAccountProof::prove(&snapshot, (*account).into()) + .context(format!("response missing account {account}"))?; + proof + .verify(&fee_merkle_tree_root) + .context(format!("invalid proof for accoujnt {account}"))?; } - return Ok(snapshot); - } - bail!("Could not fetch account from any peer"); + anyhow::Ok(snapshot) + }) + .await } #[tracing::instrument(skip(self, _instance, mt))] async fn try_remember_blocks_merkle_tree( &self, + retry: usize, _instance: &NodeState, height: u64, view: ViewNumber, mt: &mut BlockMerkleTree, ) -> anyhow::Result<()> { - for client in self.clients.iter() { - tracing::debug!(peer = %client.url, "fetching frontier from peer"); - match client - .get::(&format!("catchup/{height}/{}/blocks", view.u64())) - .send() - .await - { - Ok(frontier) => { - let Some(elem) = frontier.elem() else { - tracing::warn!(peer = %client.url, "Provided frontier is missing leaf element"); - continue; - }; - match mt.remember(mt.num_leaves() - 1, *elem, &frontier) { - Ok(_) => return Ok(()), - Err(err) => { - tracing::warn!(peer = %client.url, "Error verifying block proof: {err:#}"); - continue; - } - } - } - Err(err) => { - tracing::info!(peer = %client.url, "error fetching blocks from peer: {err:#}"); + *mt = self + .fetch(retry, |client| { + let mut mt = mt.clone(); + async move { + let frontier = client + .get::(&format!("catchup/{height}/{}/blocks", view.u64())) + .send() + .await?; + let elem = frontier + .elem() + .context("provided frontier is missing leaf element")?; + mt.remember(mt.num_leaves() - 1, *elem, &frontier) + .context("verifying block proof")?; + anyhow::Ok(mt) } - } - } - bail!("Could not fetch frontier from any peer"); + }) + .await?; + Ok(()) } async fn try_fetch_chain_config( &self, + retry: usize, commitment: Commitment, ) -> anyhow::Result { - for client in self.clients.iter() { - tracing::info!("Fetching chain config from {}", client.url); - match client + self.fetch(retry, |client| async move { + let cf = client .get::(&format!("catchup/chain-config/{}", commitment)) .send() - .await - { - Ok(cf) => { - if cf.commit() == commitment { - return Ok(cf); - } else { - tracing::error!( - "Received chain config with mismatched commitment from {}: expected {}, got {}", - client.url, - commitment, - cf.commit(), - ); - } - } - Err(err) => { - tracing::warn!("Error fetching chain config from peer: {}", err); - } - } - } - bail!("Could not fetch chain config from any peer"); + .await?; + ensure!( + cf.commit() == commitment, + "received chain config with mismatched commitment: expected {commitment}, got {}", + cf.commit() + ); + Ok(cf) + }) + .await } fn backoff(&self) -> &BackoffParams { @@ -358,9 +457,10 @@ where { // TODO: add a test for the account proof validation // issue # 2102 (https://github.com/EspressoSystems/espresso-sequencer/issues/2102) - #[tracing::instrument(skip(self, instance))] + #[tracing::instrument(skip(self, _retry, instance))] async fn try_fetch_accounts( &self, + _retry: usize, instance: &NodeState, block_height: u64, view: ViewNumber, @@ -374,9 +474,10 @@ where .0) } - #[tracing::instrument(skip(self, instance, mt))] + #[tracing::instrument(skip(self, _retry, instance, mt))] async fn try_remember_blocks_merkle_tree( &self, + _retry: usize, instance: &NodeState, bh: u64, view: ViewNumber, @@ -401,6 +502,7 @@ where async fn try_fetch_chain_config( &self, + _retry: usize, commitment: Commitment, ) -> anyhow::Result { let cf = self.db.get_chain_config(commitment).await?; @@ -461,6 +563,7 @@ impl NullStateCatchup { impl StateCatchup for NullStateCatchup { async fn try_fetch_accounts( &self, + _retry: usize, _instance: &NodeState, _height: u64, _view: ViewNumber, @@ -472,6 +575,7 @@ impl StateCatchup for NullStateCatchup { async fn try_remember_blocks_merkle_tree( &self, + _retry: usize, _instance: &NodeState, _height: u64, _view: ViewNumber, @@ -482,6 +586,7 @@ impl StateCatchup for NullStateCatchup { async fn try_fetch_chain_config( &self, + _retry: usize, commitment: Commitment, ) -> anyhow::Result { self.chain_configs @@ -498,3 +603,25 @@ impl StateCatchup for NullStateCatchup { "NullStateCatchup".into() } } + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_peer_priority() { + let good_peer = PeerScore { + requests: 1000, + failures: 2, + }; + let bad_peer = PeerScore { + requests: 10, + failures: 1, + }; + assert!(good_peer > bad_peer); + + let mut peers: PriorityQueue<_, _> = [(0, good_peer), (1, bad_peer)].into_iter().collect(); + assert_eq!(peers.pop(), Some((0, good_peer))); + assert_eq!(peers.pop(), Some((1, bad_peer))); + } +} diff --git a/sequencer/src/lib.rs b/sequencer/src/lib.rs index b4d2a4e5e..acc973152 100644 --- a/sequencer/src/lib.rs +++ b/sequencer/src/lib.rs @@ -55,7 +55,7 @@ use hotshot_types::{ light_client::{StateKeyPair, StateSignKey}, signature_key::{BLSPrivKey, BLSPubKey}, traits::{ - metrics::Metrics, + metrics::{Metrics, NoMetrics}, network::ConnectedNetwork, node_implementation::{NodeImplementation, NodeType, Versions}, }, @@ -314,8 +314,11 @@ pub async fn init_node( // If we were told to fetch the config from an already-started peer, do so. (None, Some(peers)) => { tracing::info!(?peers, "loading network config from peers"); - let peers = - StatePeers::::from_urls(peers, network_params.catchup_backoff); + let peers = StatePeers::::from_urls( + peers, + network_params.catchup_backoff, + &NoMetrics, + ); let config = peers.fetch_config(validator_config.clone()).await?; tracing::info!( @@ -511,6 +514,7 @@ pub async fn init_node( StatePeers::::from_urls( network_params.state_peers, network_params.catchup_backoff, + metrics, ), ) .await, diff --git a/types/src/v0/impls/instance_state.rs b/types/src/v0/impls/instance_state.rs index 71a387bbc..bf5e5595f 100644 --- a/types/src/v0/impls/instance_state.rs +++ b/types/src/v0/impls/instance_state.rs @@ -213,6 +213,7 @@ pub mod mock { impl StateCatchup for MockStateCatchup { async fn try_fetch_accounts( &self, + _retry: usize, _instance: &NodeState, _height: u64, view: ViewNumber, @@ -228,6 +229,7 @@ pub mod mock { async fn try_remember_blocks_merkle_tree( &self, + _retry: usize, _instance: &NodeState, _height: u64, view: ViewNumber, @@ -252,6 +254,7 @@ pub mod mock { async fn try_fetch_chain_config( &self, + _retry: usize, _commitment: Commitment, ) -> anyhow::Result { Ok(ChainConfig::default()) diff --git a/types/src/v0/traits.rs b/types/src/v0/traits.rs index 6bd58a837..ec4eea666 100644 --- a/types/src/v0/traits.rs +++ b/types/src/v0/traits.rs @@ -41,6 +41,7 @@ pub trait StateCatchup: Send + Sync { /// Try to fetch the given accounts state, failing without retrying if unable. async fn try_fetch_accounts( &self, + retry: usize, instance: &NodeState, height: u64, view: ViewNumber, @@ -58,10 +59,18 @@ pub trait StateCatchup: Send + Sync { accounts: Vec, ) -> anyhow::Result> { self.backoff() - .retry(self, |provider| { - async { + .retry(self, |provider, retry| { + let accounts = &accounts; + async move { let tree = provider - .try_fetch_accounts(instance, height, view, fee_merkle_tree_root, &accounts) + .try_fetch_accounts( + retry, + instance, + height, + view, + fee_merkle_tree_root, + accounts, + ) .await .map_err(|err| { err.context(format!( @@ -85,6 +94,7 @@ pub trait StateCatchup: Send + Sync { /// Try to fetch and remember the blocks frontier, failing without retrying if unable. async fn try_remember_blocks_merkle_tree( &self, + retry: usize, instance: &NodeState, height: u64, view: ViewNumber, @@ -100,8 +110,8 @@ pub trait StateCatchup: Send + Sync { mt: &mut BlockMerkleTree, ) -> anyhow::Result<()> { self.backoff() - .retry(mt, |mt| { - self.try_remember_blocks_merkle_tree(instance, height, view, mt) + .retry(mt, |mt, retry| { + self.try_remember_blocks_merkle_tree(retry, instance, height, view, mt) .map_err(|err| err.context("fetching frontier")) .boxed() }) @@ -110,6 +120,7 @@ pub trait StateCatchup: Send + Sync { async fn try_fetch_chain_config( &self, + retry: usize, commitment: Commitment, ) -> anyhow::Result; @@ -118,9 +129,9 @@ pub trait StateCatchup: Send + Sync { commitment: Commitment, ) -> anyhow::Result { self.backoff() - .retry(self, |provider| { + .retry(self, |provider, retry| { provider - .try_fetch_chain_config(commitment) + .try_fetch_chain_config(retry, commitment) .map_err(|err| err.context("fetching chain config")) .boxed() }) @@ -135,6 +146,7 @@ pub trait StateCatchup: Send + Sync { impl StateCatchup for Box { async fn try_fetch_accounts( &self, + retry: usize, instance: &NodeState, height: u64, view: ViewNumber, @@ -142,7 +154,14 @@ impl StateCatchup for Box { accounts: &[FeeAccount], ) -> anyhow::Result { (**self) - .try_fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts) + .try_fetch_accounts( + retry, + instance, + height, + view, + fee_merkle_tree_root, + accounts, + ) .await } @@ -161,13 +180,14 @@ impl StateCatchup for Box { async fn try_remember_blocks_merkle_tree( &self, + retry: usize, instance: &NodeState, height: u64, view: ViewNumber, mt: &mut BlockMerkleTree, ) -> anyhow::Result<()> { (**self) - .try_remember_blocks_merkle_tree(instance, height, view, mt) + .try_remember_blocks_merkle_tree(retry, instance, height, view, mt) .await } @@ -185,9 +205,10 @@ impl StateCatchup for Box { async fn try_fetch_chain_config( &self, + retry: usize, commitment: Commitment, ) -> anyhow::Result { - (**self).try_fetch_chain_config(commitment).await + (**self).try_fetch_chain_config(retry, commitment).await } async fn fetch_chain_config( @@ -210,6 +231,7 @@ impl StateCatchup for Box { impl StateCatchup for Arc { async fn try_fetch_accounts( &self, + retry: usize, instance: &NodeState, height: u64, view: ViewNumber, @@ -217,7 +239,14 @@ impl StateCatchup for Arc { accounts: &[FeeAccount], ) -> anyhow::Result { (**self) - .try_fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts) + .try_fetch_accounts( + retry, + instance, + height, + view, + fee_merkle_tree_root, + accounts, + ) .await } @@ -236,13 +265,14 @@ impl StateCatchup for Arc { async fn try_remember_blocks_merkle_tree( &self, + retry: usize, instance: &NodeState, height: u64, view: ViewNumber, mt: &mut BlockMerkleTree, ) -> anyhow::Result<()> { (**self) - .try_remember_blocks_merkle_tree(instance, height, view, mt) + .try_remember_blocks_merkle_tree(retry, instance, height, view, mt) .await } @@ -260,9 +290,10 @@ impl StateCatchup for Arc { async fn try_fetch_chain_config( &self, + retry: usize, commitment: Commitment, ) -> anyhow::Result { - (**self).try_fetch_chain_config(commitment).await + (**self).try_fetch_chain_config(retry, commitment).await } async fn fetch_chain_config( @@ -287,6 +318,7 @@ impl StateCatchup for Vec { #[tracing::instrument(skip(self, instance))] async fn try_fetch_accounts( &self, + retry: usize, instance: &NodeState, height: u64, view: ViewNumber, @@ -295,7 +327,14 @@ impl StateCatchup for Vec { ) -> anyhow::Result { for provider in self { match provider - .try_fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts) + .try_fetch_accounts( + retry, + instance, + height, + view, + fee_merkle_tree_root, + accounts, + ) .await { Ok(tree) => return Ok(tree), @@ -315,6 +354,7 @@ impl StateCatchup for Vec { #[tracing::instrument(skip(self, instance, mt))] async fn try_remember_blocks_merkle_tree( &self, + retry: usize, instance: &NodeState, height: u64, view: ViewNumber, @@ -322,7 +362,7 @@ impl StateCatchup for Vec { ) -> anyhow::Result<()> { for provider in self { match provider - .try_remember_blocks_merkle_tree(instance, height, view, mt) + .try_remember_blocks_merkle_tree(retry, instance, height, view, mt) .await { Ok(()) => return Ok(()), @@ -340,10 +380,11 @@ impl StateCatchup for Vec { async fn try_fetch_chain_config( &self, + retry: usize, commitment: Commitment, ) -> anyhow::Result { for provider in self { - match provider.try_fetch_chain_config(commitment).await { + match provider.try_fetch_chain_config(retry, commitment).await { Ok(cf) => return Ok(cf), Err(err) => { tracing::info!( diff --git a/types/src/v0/utils.rs b/types/src/v0/utils.rs index 3f4cfd275..0c3df9579 100644 --- a/types/src/v0/utils.rs +++ b/types/src/v0/utils.rs @@ -286,12 +286,12 @@ impl BackoffParams { pub async fn retry( &self, mut state: S, - f: impl for<'a> Fn(&'a mut S) -> BoxFuture<'a, anyhow::Result>, + f: impl for<'a> Fn(&'a mut S, usize) -> BoxFuture<'a, anyhow::Result>, ) -> anyhow::Result { let mut delay = self.base; - loop { - match f(&mut state).await { - Ok(res) => break Ok(res), + for i in 0.. { + match f(&mut state, i).await { + Ok(res) => return Ok(res), Err(err) if self.disable => { return Err(err.context("Retryable operation failed; retries disabled")); } @@ -304,6 +304,7 @@ impl BackoffParams { } } } + unreachable!() } #[must_use]