diff --git a/Cargo.lock b/Cargo.lock index 4a1f39a24..f3b188797 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7489,6 +7489,17 @@ dependencies = [ "uint", ] +[[package]] +name = "priority-queue" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "714c75db297bc88a63783ffc6ab9f830698a6705aa0201416931759ef4c8183d" +dependencies = [ + "autocfg", + "equivalent", + "indexmap 2.7.0", +] + [[package]] name = "proc-macro-crate" version = "3.2.0" @@ -8751,6 +8762,7 @@ dependencies = [ "parking_lot", "portpicker", "pretty_assertions", + "priority-queue", "rand 0.8.5", "rand_chacha 0.3.1", "rand_distr", diff --git a/Cargo.toml b/Cargo.toml index d15148305..e4566b674 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -123,6 +123,7 @@ thiserror = "1.0.69" tracing = "0.1" bytesize = "1.3" itertools = "0.12" +priority-queue = "2" rand_chacha = "0.3" rand_distr = "0.4" reqwest = "0.12" diff --git a/sequencer/Cargo.toml b/sequencer/Cargo.toml index 5fb19d8f6..bc35e4188 100644 --- a/sequencer/Cargo.toml +++ b/sequencer/Cargo.toml @@ -94,6 +94,7 @@ marketplace-builder-shared = { workspace = true, optional = true } marketplace-solver = { path = "../marketplace-solver" } num_enum = "0.7" parking_lot = "0.12" +priority-queue = { workspace = true } portpicker = { workspace = true } rand = { workspace = true } rand_chacha = { workspace = true } diff --git a/sequencer/src/catchup.rs b/sequencer/src/catchup.rs index 7b53dacc7..a388c8241 100644 --- a/sequencer/src/catchup.rs +++ b/sequencer/src/catchup.rs @@ -1,6 +1,7 @@ use std::sync::Arc; -use anyhow::{bail, Context}; +use anyhow::{anyhow, bail, ensure, Context}; +use async_lock::RwLock; use async_trait::async_trait; use committable::Commitment; use committable::Committable; @@ -9,15 +10,16 @@ use espresso_types::{ v0::traits::StateCatchup, v0_3::ChainConfig, BackoffParams, BlockMerkleTree, FeeAccount, FeeAccountProof, FeeMerkleCommitment, FeeMerkleTree, Leaf, NodeState, }; -use futures::future::{Future, FutureExt}; +use futures::future::{Future, FutureExt, TryFuture, TryFutureExt}; use hotshot_types::{ data::ViewNumber, network::NetworkConfig, traits::node_implementation::ConsensusTime as _, ValidatorConfig, }; use itertools::Itertools; use jf_merkle_tree::{prelude::MerkleNode, ForgetableMerkleTreeScheme, MerkleTreeScheme}; +use priority_queue::PriorityQueue; use serde::de::DeserializeOwned; -use std::{collections::HashMap, time::Duration}; +use std::{cmp::Ordering, collections::HashMap, fmt::Display, time::Duration}; use surf_disco::Request; use tide_disco::error::ServerError; use tokio::time::timeout; @@ -65,24 +67,137 @@ pub(crate) async fn local_and_remote( } } +/// A score of a catchup peer, based on our interactions with that peer. +/// +/// The score accounts for malicious peers -- i.e. peers that gave us an invalid response to a +/// verifiable request -- and faulty/unreliable peers -- those that fail to respond to requests at +/// all. The score has a comparison function where higher is better, or in other words `p1 > p2` +/// means we believe we are more likely to successfully catch up using `p1` than `p2`. This makes it +/// convenient and efficient to collect peers in a priority queue which we can easily convert to a +/// list sorted by reliability. +#[derive(Clone, Copy, Debug, Default)] +struct PeerScore { + requests: usize, + failures: usize, +} + +impl Ord for PeerScore { + fn cmp(&self, other: &Self) -> Ordering { + // Compare failure rates: `self` is better than `other` if + // self.failures / self.requests < other.failures / other.requests + // or equivalently + // other.failures * self.requests > self.failures * other.requests + (other.failures * self.requests).cmp(&(self.failures * other.requests)) + } +} + +impl PartialOrd for PeerScore { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for PeerScore { + fn eq(&self, other: &Self) -> bool { + self.cmp(other).is_eq() + } +} + +impl Eq for PeerScore {} + #[derive(Debug, Clone, Default)] pub struct StatePeers { + // Peer IDs, ordered by reliability score. Each ID is an index into `clients`. + scores: Arc>>, clients: Vec>, backoff: BackoffParams, } impl StatePeers { + async fn fetch( + &self, + retry: usize, + f: impl Fn(Client) -> Fut, + ) -> anyhow::Result + where + Fut: TryFuture, + { + // Since we have generally have multiple peers we can catch up from, we want a fairly + // aggressive timeout for requests: if a peer is not responding quickly, we're better off + // just trying the next one rather than waiting, and this prevents a malicious peer from + // delaying catchup for a long time. + // + // However, if we set the timeout _too_ aggressively, we might fail to catch up even from an + // honest peer, and thus never make progress. Thus, we start with a timeout of 500ms, which + // is aggressive but still very reasonable for an HTTP request. If that fails with all of + // our peers, we increase the timeout by 1 second for each successive retry, until we + // eventually succeed. + let timeout_dur = Duration::from_millis(500) * (retry as u32 + 1); + + // Keep track of which peers we make requests to and which succeed (`true`) or fail (`false`), + // so we can update reliability scores at the end. + let mut requests = HashMap::new(); + let mut res = Err(anyhow!("failed fetching from every peer")); + + // Try each peer in order of reliability score, until we succeed. We clone out of + // `self.scores` because it is small (contains only numeric IDs and scores), so this clone + // is a lot cheaper than holding the read lock the entire time we are making requests (which + // could be a while). + let mut scores = { (*self.scores.read().await).clone() }; + while let Some((id, score)) = scores.pop() { + let client = &self.clients[id]; + tracing::info!("fetching from {}", client.url); + match timeout(timeout_dur, f(client.clone()).into_future()).await { + Ok(Ok(t)) => { + requests.insert(id, true); + res = Ok(t); + break; + } + Ok(Err(err)) => { + tracing::warn!(id, ?score, peer = %client.url, "error from peer: {err:#}"); + requests.insert(id, false); + } + Err(_) => { + tracing::warn!(id, ?score, peer = %client.url, ?timeout_dur, "request timed out"); + requests.insert(id, false); + } + } + } + + // Update client scores. + let mut scores = self.scores.write().await; + for (id, success) in requests { + scores.change_priority_by(&id, |score| { + score.requests += 1; + if !success { + score.failures += 1; + } + }); + } + + res + } + pub fn from_urls(urls: Vec, backoff: BackoffParams) -> Self { if urls.is_empty() { panic!("Cannot create StatePeers with no peers"); } + let scores = urls + .iter() + .enumerate() + .map(|(i, _)| (i, PeerScore::default())) + .collect(); + let clients = urls.into_iter().map(Client::new).collect(); + Self { - clients: urls.into_iter().map(Client::new).collect(), + clients, + scores: Arc::new(RwLock::new(scores)), backoff, } } + #[tracing::instrument(skip(self, my_own_validator_config))] pub async fn fetch_config( &self, my_own_validator_config: ValidatorConfig, @@ -91,48 +206,18 @@ impl StatePeers { .retry(self, move |provider, retry| { let my_own_validator_config = my_own_validator_config.clone(); async move { - for client in &provider.clients { - tracing::info!("fetching config from {}", client.url); - match Self::retry(retry, client - .get::("config/hotshot") - .send()) - .await - { - Ok(res) => { - return res.into_network_config(my_own_validator_config) - .context(format!("fetched config from {}, but failed to convert to private config", client.url)); - } - Err(err) => { - tracing::warn!("error fetching config from peer: {err:#}"); - } - } - } - bail!("could not fetch config from any peer"); + let cfg = provider + .fetch(retry, |client| { + client.get::("config/hotshot").send() + }) + .await?; + cfg.into_network_config(my_own_validator_config) + .context("fetched config, but failed to convert to private config") } .boxed() }) .await } - - async fn retry(retry: usize, f: impl Future>) -> anyhow::Result - where - E: std::error::Error + Send + Sync + 'static, - { - // Since we have generally have multiple peers we can catch up from, we want a fairly - // aggressive timeout for requests: if a peer is not responding quickly, we're better off - // just trying the next one rather than waiting, and this prevents a malicious peer from - // delaying catchup for a long time. - // - // However, if we set the timeout _too_ aggressively, we might fail to catch up even from an - // honest peer, and thus never make progress. Thus, we start with a timeout of 500ms, which - // is aggressive but still very reasonable for an HTTP request. If that fails with all of - // our peers, we increase the timeout by 1 second for each successive retry, until we - // eventually succeed. - timeout(Duration::from_millis(500) * (retry as u32 + 1), f) - .await - .context(format!("operation timed out (retry {retry})"))? - .context(format!("operation failed (retry {retry})")) - } } #[async_trait] @@ -147,42 +232,26 @@ impl StateCatchup for StatePeers { fee_merkle_tree_root: FeeMerkleCommitment, accounts: &[FeeAccount], ) -> anyhow::Result { - for client in self.clients.iter() { - tracing::info!("Fetching accounts from {}", client.url); - let req = match client + self.fetch(retry, |client| async move { + let snapshot = client .inner - .post::(&format!("catchup/{height}/{}/accounts", view.u64(),)) - .body_binary(&accounts.to_vec()) - { - Ok(req) => req, - Err(err) => { - tracing::warn!("failed to construct accounts catchup request: {err:#}"); - continue; - } - }; - let snapshot = match Self::retry(retry, req.send()).await { - Ok(res) => res, - Err(err) => { - tracing::info!(peer = %client.url, "error fetching accounts from peer: {err:#}"); - continue; - } - }; + .post::(&format!("catchup/{height}/{}/accounts", view.u64())) + .body_binary(&accounts.to_vec())? + .send() + .await?; // Verify proofs. for account in accounts { - let Some((proof, _)) = FeeAccountProof::prove(&snapshot, (*account).into()) else { - tracing::warn!(peer = %client.url, "response from peer missing account {account}"); - continue; - }; - if let Err(err) = proof.verify(&fee_merkle_tree_root) { - tracing::warn!(peer = %client.url, "peer gave invalid proof for account {account}: {err:#}"); - continue; - } + let (proof, _) = FeeAccountProof::prove(&snapshot, (*account).into()) + .context(format!("response missing account {account}"))?; + proof + .verify(&fee_merkle_tree_root) + .context(format!("invalid proof for accoujnt {account}"))?; } - return Ok(snapshot); - } - bail!("Could not fetch account from any peer"); + anyhow::Ok(snapshot) + }) + .await } #[tracing::instrument(skip(self, _instance, mt))] @@ -194,35 +263,24 @@ impl StateCatchup for StatePeers { view: ViewNumber, mt: &mut BlockMerkleTree, ) -> anyhow::Result<()> { - for client in self.clients.iter() { - tracing::debug!(peer = %client.url, "fetching frontier from peer"); - match Self::retry( - retry, - client - .get::(&format!("catchup/{height}/{}/blocks", view.u64())) - .send(), - ) - .await - { - Ok(frontier) => { - let Some(elem) = frontier.elem() else { - tracing::warn!(peer = %client.url, "Provided frontier is missing leaf element"); - continue; - }; - match mt.remember(mt.num_leaves() - 1, *elem, &frontier) { - Ok(_) => return Ok(()), - Err(err) => { - tracing::warn!(peer = %client.url, "Error verifying block proof: {err:#}"); - continue; - } - } - } - Err(err) => { - tracing::info!(peer = %client.url, "error fetching blocks from peer: {err:#}"); + *mt = self + .fetch(retry, |client| { + let mut mt = mt.clone(); + async move { + let frontier = client + .get::(&format!("catchup/{height}/{}/blocks", view.u64())) + .send() + .await?; + let elem = frontier + .elem() + .context("provided frontier is missing leaf element")?; + mt.remember(mt.num_leaves() - 1, *elem, &frontier) + .context("verifying block proof")?; + anyhow::Ok(mt) } - } - } - bail!("Could not fetch frontier from any peer"); + }) + .await?; + Ok(()) } async fn try_fetch_chain_config( @@ -230,34 +288,19 @@ impl StateCatchup for StatePeers { retry: usize, commitment: Commitment, ) -> anyhow::Result { - for client in self.clients.iter() { - tracing::info!("Fetching chain config from {}", client.url); - match Self::retry( - retry, - client - .get::(&format!("catchup/chain-config/{}", commitment)) - .send(), - ) - .await - { - Ok(cf) => { - if cf.commit() == commitment { - return Ok(cf); - } else { - tracing::error!( - "Received chain config with mismatched commitment from {}: expected {}, got {}", - client.url, - commitment, - cf.commit(), - ); - } - } - Err(err) => { - tracing::warn!("Error fetching chain config from peer: {}", err); - } - } - } - bail!("Could not fetch chain config from any peer"); + self.fetch(retry, |client| async move { + let cf = client + .get::(&format!("catchup/chain-config/{}", commitment)) + .send() + .await?; + ensure!( + cf.commit() == commitment, + "received chain config with mismatched commitment: expected {commitment}, got {}", + cf.commit() + ); + Ok(cf) + }) + .await } fn backoff(&self) -> &BackoffParams { @@ -534,3 +577,25 @@ impl StateCatchup for NullStateCatchup { "NullStateCatchup".into() } } + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_peer_priority() { + let good_peer = PeerScore { + requests: 1000, + failures: 2, + }; + let bad_peer = PeerScore { + requests: 10, + failures: 1, + }; + assert!(good_peer > bad_peer); + + let mut peers: PriorityQueue<_, _> = [(0, good_peer), (1, bad_peer)].into_iter().collect(); + assert_eq!(peers.pop(), Some((0, good_peer))); + assert_eq!(peers.pop(), Some((1, bad_peer))); + } +}