From 5782bfcb6806b73a421bca9d2e4b1e1166d731a0 Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Thu, 5 Dec 2024 14:08:58 -0800 Subject: [PATCH] Limit time for each catchup request to defend against malicious peers --- sequencer/src/api.rs | 7 ++- sequencer/src/catchup.rs | 66 +++++++++++++++++++------ types/src/v0/impls/instance_state.rs | 3 ++ types/src/v0/traits.rs | 73 ++++++++++++++++++++++------ types/src/v0/utils.rs | 9 ++-- 5 files changed, 121 insertions(+), 37 deletions(-) diff --git a/sequencer/src/api.rs b/sequencer/src/api.rs index 3f547c981..eba2108c4 100644 --- a/sequencer/src/api.rs +++ b/sequencer/src/api.rs @@ -1809,13 +1809,16 @@ mod test { // The catchup should successfully retrieve the correct chain config. let node = &network.peers[0]; let peers = node.node_state().peers; - peers.try_fetch_chain_config(cf.commit()).await.unwrap(); + peers.try_fetch_chain_config(0, cf.commit()).await.unwrap(); // Test a catchup request for node #1, which is connected to a dishonest peer. // This request will result in an error due to the malicious chain config provided by the peer. let node = &network.peers[1]; let peers = node.node_state().peers; - peers.try_fetch_chain_config(cf.commit()).await.unwrap_err(); + peers + .try_fetch_chain_config(0, cf.commit()) + .await + .unwrap_err(); network.server.shut_down().await; handle.abort(); diff --git a/sequencer/src/catchup.rs b/sequencer/src/catchup.rs index 6d87f1cde..7b53dacc7 100644 --- a/sequencer/src/catchup.rs +++ b/sequencer/src/catchup.rs @@ -17,9 +17,10 @@ use hotshot_types::{ use itertools::Itertools; use jf_merkle_tree::{prelude::MerkleNode, ForgetableMerkleTreeScheme, MerkleTreeScheme}; use serde::de::DeserializeOwned; -use std::collections::HashMap; +use std::{collections::HashMap, time::Duration}; use surf_disco::Request; use tide_disco::error::ServerError; +use tokio::time::timeout; use url::Url; use vbs::version::StaticVersionType; @@ -87,14 +88,14 @@ impl StatePeers { my_own_validator_config: ValidatorConfig, ) -> anyhow::Result> { self.backoff() - .retry(self, move |provider| { + .retry(self, move |provider, retry| { let my_own_validator_config = my_own_validator_config.clone(); async move { for client in &provider.clients { tracing::info!("fetching config from {}", client.url); - match client + match Self::retry(retry, client .get::("config/hotshot") - .send() + .send()) .await { Ok(res) => { @@ -112,6 +113,26 @@ impl StatePeers { }) .await } + + async fn retry(retry: usize, f: impl Future>) -> anyhow::Result + where + E: std::error::Error + Send + Sync + 'static, + { + // Since we have generally have multiple peers we can catch up from, we want a fairly + // aggressive timeout for requests: if a peer is not responding quickly, we're better off + // just trying the next one rather than waiting, and this prevents a malicious peer from + // delaying catchup for a long time. + // + // However, if we set the timeout _too_ aggressively, we might fail to catch up even from an + // honest peer, and thus never make progress. Thus, we start with a timeout of 500ms, which + // is aggressive but still very reasonable for an HTTP request. If that fails with all of + // our peers, we increase the timeout by 1 second for each successive retry, until we + // eventually succeed. + timeout(Duration::from_millis(500) * (retry as u32 + 1), f) + .await + .context(format!("operation timed out (retry {retry})"))? + .context(format!("operation failed (retry {retry})")) + } } #[async_trait] @@ -119,6 +140,7 @@ impl StateCatchup for StatePeers { #[tracing::instrument(skip(self, _instance))] async fn try_fetch_accounts( &self, + retry: usize, _instance: &NodeState, height: u64, view: ViewNumber, @@ -138,7 +160,7 @@ impl StateCatchup for StatePeers { continue; } }; - let snapshot = match req.send().await { + let snapshot = match Self::retry(retry, req.send()).await { Ok(res) => res, Err(err) => { tracing::info!(peer = %client.url, "error fetching accounts from peer: {err:#}"); @@ -166,6 +188,7 @@ impl StateCatchup for StatePeers { #[tracing::instrument(skip(self, _instance, mt))] async fn try_remember_blocks_merkle_tree( &self, + retry: usize, _instance: &NodeState, height: u64, view: ViewNumber, @@ -173,10 +196,13 @@ impl StateCatchup for StatePeers { ) -> anyhow::Result<()> { for client in self.clients.iter() { tracing::debug!(peer = %client.url, "fetching frontier from peer"); - match client - .get::(&format!("catchup/{height}/{}/blocks", view.u64())) - .send() - .await + match Self::retry( + retry, + client + .get::(&format!("catchup/{height}/{}/blocks", view.u64())) + .send(), + ) + .await { Ok(frontier) => { let Some(elem) = frontier.elem() else { @@ -201,14 +227,18 @@ impl StateCatchup for StatePeers { async fn try_fetch_chain_config( &self, + retry: usize, commitment: Commitment, ) -> anyhow::Result { for client in self.clients.iter() { tracing::info!("Fetching chain config from {}", client.url); - match client - .get::(&format!("catchup/chain-config/{}", commitment)) - .send() - .await + match Self::retry( + retry, + client + .get::(&format!("catchup/chain-config/{}", commitment)) + .send(), + ) + .await { Ok(cf) => { if cf.commit() == commitment { @@ -358,9 +388,10 @@ where { // TODO: add a test for the account proof validation // issue # 2102 (https://github.com/EspressoSystems/espresso-sequencer/issues/2102) - #[tracing::instrument(skip(self, instance))] + #[tracing::instrument(skip(self, _retry, instance))] async fn try_fetch_accounts( &self, + _retry: usize, instance: &NodeState, block_height: u64, view: ViewNumber, @@ -374,9 +405,10 @@ where .0) } - #[tracing::instrument(skip(self, instance, mt))] + #[tracing::instrument(skip(self, _retry, instance, mt))] async fn try_remember_blocks_merkle_tree( &self, + _retry: usize, instance: &NodeState, bh: u64, view: ViewNumber, @@ -401,6 +433,7 @@ where async fn try_fetch_chain_config( &self, + _retry: usize, commitment: Commitment, ) -> anyhow::Result { let cf = self.db.get_chain_config(commitment).await?; @@ -461,6 +494,7 @@ impl NullStateCatchup { impl StateCatchup for NullStateCatchup { async fn try_fetch_accounts( &self, + _retry: usize, _instance: &NodeState, _height: u64, _view: ViewNumber, @@ -472,6 +506,7 @@ impl StateCatchup for NullStateCatchup { async fn try_remember_blocks_merkle_tree( &self, + _retry: usize, _instance: &NodeState, _height: u64, _view: ViewNumber, @@ -482,6 +517,7 @@ impl StateCatchup for NullStateCatchup { async fn try_fetch_chain_config( &self, + _retry: usize, commitment: Commitment, ) -> anyhow::Result { self.chain_configs diff --git a/types/src/v0/impls/instance_state.rs b/types/src/v0/impls/instance_state.rs index 19df55e1b..dfff2a4a3 100644 --- a/types/src/v0/impls/instance_state.rs +++ b/types/src/v0/impls/instance_state.rs @@ -213,6 +213,7 @@ pub mod mock { impl StateCatchup for MockStateCatchup { async fn try_fetch_accounts( &self, + _retry: usize, _instance: &NodeState, _height: u64, view: ViewNumber, @@ -228,6 +229,7 @@ pub mod mock { async fn try_remember_blocks_merkle_tree( &self, + _retry: usize, _instance: &NodeState, _height: u64, view: ViewNumber, @@ -252,6 +254,7 @@ pub mod mock { async fn try_fetch_chain_config( &self, + _retry: usize, _commitment: Commitment, ) -> anyhow::Result { Ok(ChainConfig::default()) diff --git a/types/src/v0/traits.rs b/types/src/v0/traits.rs index 7447a500d..f7f53d63c 100644 --- a/types/src/v0/traits.rs +++ b/types/src/v0/traits.rs @@ -38,6 +38,7 @@ pub trait StateCatchup: Send + Sync { /// Try to fetch the given accounts state, failing without retrying if unable. async fn try_fetch_accounts( &self, + retry: usize, instance: &NodeState, height: u64, view: ViewNumber, @@ -55,10 +56,18 @@ pub trait StateCatchup: Send + Sync { accounts: Vec, ) -> anyhow::Result> { self.backoff() - .retry(self, |provider| { - async { + .retry(self, |provider, retry| { + let accounts = &accounts; + async move { let tree = provider - .try_fetch_accounts(instance, height, view, fee_merkle_tree_root, &accounts) + .try_fetch_accounts( + retry, + instance, + height, + view, + fee_merkle_tree_root, + accounts, + ) .await .map_err(|err| { err.context(format!( @@ -82,6 +91,7 @@ pub trait StateCatchup: Send + Sync { /// Try to fetch and remember the blocks frontier, failing without retrying if unable. async fn try_remember_blocks_merkle_tree( &self, + retry: usize, instance: &NodeState, height: u64, view: ViewNumber, @@ -97,8 +107,8 @@ pub trait StateCatchup: Send + Sync { mt: &mut BlockMerkleTree, ) -> anyhow::Result<()> { self.backoff() - .retry(mt, |mt| { - self.try_remember_blocks_merkle_tree(instance, height, view, mt) + .retry(mt, |mt, retry| { + self.try_remember_blocks_merkle_tree(retry, instance, height, view, mt) .map_err(|err| err.context("fetching frontier")) .boxed() }) @@ -107,6 +117,7 @@ pub trait StateCatchup: Send + Sync { async fn try_fetch_chain_config( &self, + retry: usize, commitment: Commitment, ) -> anyhow::Result; @@ -115,9 +126,9 @@ pub trait StateCatchup: Send + Sync { commitment: Commitment, ) -> anyhow::Result { self.backoff() - .retry(self, |provider| { + .retry(self, |provider, retry| { provider - .try_fetch_chain_config(commitment) + .try_fetch_chain_config(retry, commitment) .map_err(|err| err.context("fetching chain config")) .boxed() }) @@ -132,6 +143,7 @@ pub trait StateCatchup: Send + Sync { impl StateCatchup for Box { async fn try_fetch_accounts( &self, + retry: usize, instance: &NodeState, height: u64, view: ViewNumber, @@ -139,7 +151,14 @@ impl StateCatchup for Box { accounts: &[FeeAccount], ) -> anyhow::Result { (**self) - .try_fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts) + .try_fetch_accounts( + retry, + instance, + height, + view, + fee_merkle_tree_root, + accounts, + ) .await } @@ -158,13 +177,14 @@ impl StateCatchup for Box { async fn try_remember_blocks_merkle_tree( &self, + retry: usize, instance: &NodeState, height: u64, view: ViewNumber, mt: &mut BlockMerkleTree, ) -> anyhow::Result<()> { (**self) - .try_remember_blocks_merkle_tree(instance, height, view, mt) + .try_remember_blocks_merkle_tree(retry, instance, height, view, mt) .await } @@ -182,9 +202,10 @@ impl StateCatchup for Box { async fn try_fetch_chain_config( &self, + retry: usize, commitment: Commitment, ) -> anyhow::Result { - (**self).try_fetch_chain_config(commitment).await + (**self).try_fetch_chain_config(retry, commitment).await } async fn fetch_chain_config( @@ -207,6 +228,7 @@ impl StateCatchup for Box { impl StateCatchup for Arc { async fn try_fetch_accounts( &self, + retry: usize, instance: &NodeState, height: u64, view: ViewNumber, @@ -214,7 +236,14 @@ impl StateCatchup for Arc { accounts: &[FeeAccount], ) -> anyhow::Result { (**self) - .try_fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts) + .try_fetch_accounts( + retry, + instance, + height, + view, + fee_merkle_tree_root, + accounts, + ) .await } @@ -233,13 +262,14 @@ impl StateCatchup for Arc { async fn try_remember_blocks_merkle_tree( &self, + retry: usize, instance: &NodeState, height: u64, view: ViewNumber, mt: &mut BlockMerkleTree, ) -> anyhow::Result<()> { (**self) - .try_remember_blocks_merkle_tree(instance, height, view, mt) + .try_remember_blocks_merkle_tree(retry, instance, height, view, mt) .await } @@ -257,9 +287,10 @@ impl StateCatchup for Arc { async fn try_fetch_chain_config( &self, + retry: usize, commitment: Commitment, ) -> anyhow::Result { - (**self).try_fetch_chain_config(commitment).await + (**self).try_fetch_chain_config(retry, commitment).await } async fn fetch_chain_config( @@ -284,6 +315,7 @@ impl StateCatchup for Vec { #[tracing::instrument(skip(self, instance))] async fn try_fetch_accounts( &self, + retry: usize, instance: &NodeState, height: u64, view: ViewNumber, @@ -292,7 +324,14 @@ impl StateCatchup for Vec { ) -> anyhow::Result { for provider in self { match provider - .try_fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts) + .try_fetch_accounts( + retry, + instance, + height, + view, + fee_merkle_tree_root, + accounts, + ) .await { Ok(tree) => return Ok(tree), @@ -312,6 +351,7 @@ impl StateCatchup for Vec { #[tracing::instrument(skip(self, instance, mt))] async fn try_remember_blocks_merkle_tree( &self, + retry: usize, instance: &NodeState, height: u64, view: ViewNumber, @@ -319,7 +359,7 @@ impl StateCatchup for Vec { ) -> anyhow::Result<()> { for provider in self { match provider - .try_remember_blocks_merkle_tree(instance, height, view, mt) + .try_remember_blocks_merkle_tree(retry, instance, height, view, mt) .await { Ok(()) => return Ok(()), @@ -337,10 +377,11 @@ impl StateCatchup for Vec { async fn try_fetch_chain_config( &self, + retry: usize, commitment: Commitment, ) -> anyhow::Result { for provider in self { - match provider.try_fetch_chain_config(commitment).await { + match provider.try_fetch_chain_config(retry, commitment).await { Ok(cf) => return Ok(cf), Err(err) => { tracing::info!( diff --git a/types/src/v0/utils.rs b/types/src/v0/utils.rs index 0f097c919..1d3dc1837 100644 --- a/types/src/v0/utils.rs +++ b/types/src/v0/utils.rs @@ -239,12 +239,12 @@ impl BackoffParams { pub async fn retry( &self, mut state: S, - f: impl for<'a> Fn(&'a mut S) -> BoxFuture<'a, anyhow::Result>, + f: impl for<'a> Fn(&'a mut S, usize) -> BoxFuture<'a, anyhow::Result>, ) -> anyhow::Result { let mut delay = self.base; - loop { - match f(&mut state).await { - Ok(res) => break Ok(res), + for i in 0.. { + match f(&mut state, i).await { + Ok(res) => return Ok(res), Err(err) if self.disable => { return Err(err.context("Retryable operation failed; retries disabled")); } @@ -257,6 +257,7 @@ impl BackoffParams { } } } + unreachable!() } #[must_use]