Skip to content

Commit

Permalink
Limit time for each catchup request to defend against malicious peers
Browse files Browse the repository at this point in the history
  • Loading branch information
jbearer committed Dec 5, 2024
1 parent a48b5b2 commit 5782bfc
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 37 deletions.
7 changes: 5 additions & 2 deletions sequencer/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1809,13 +1809,16 @@ mod test {
// The catchup should successfully retrieve the correct chain config.
let node = &network.peers[0];
let peers = node.node_state().peers;
peers.try_fetch_chain_config(cf.commit()).await.unwrap();
peers.try_fetch_chain_config(0, cf.commit()).await.unwrap();

// Test a catchup request for node #1, which is connected to a dishonest peer.
// This request will result in an error due to the malicious chain config provided by the peer.
let node = &network.peers[1];
let peers = node.node_state().peers;
peers.try_fetch_chain_config(cf.commit()).await.unwrap_err();
peers
.try_fetch_chain_config(0, cf.commit())
.await
.unwrap_err();

network.server.shut_down().await;
handle.abort();
Expand Down
66 changes: 51 additions & 15 deletions sequencer/src/catchup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ use hotshot_types::{
use itertools::Itertools;
use jf_merkle_tree::{prelude::MerkleNode, ForgetableMerkleTreeScheme, MerkleTreeScheme};
use serde::de::DeserializeOwned;
use std::collections::HashMap;
use std::{collections::HashMap, time::Duration};
use surf_disco::Request;
use tide_disco::error::ServerError;
use tokio::time::timeout;
use url::Url;
use vbs::version::StaticVersionType;

Expand Down Expand Up @@ -87,14 +88,14 @@ impl<ApiVer: StaticVersionType> StatePeers<ApiVer> {
my_own_validator_config: ValidatorConfig<PubKey>,
) -> anyhow::Result<NetworkConfig<PubKey>> {
self.backoff()
.retry(self, move |provider| {
.retry(self, move |provider, retry| {
let my_own_validator_config = my_own_validator_config.clone();
async move {
for client in &provider.clients {
tracing::info!("fetching config from {}", client.url);
match client
match Self::retry(retry, client
.get::<PublicNetworkConfig>("config/hotshot")
.send()
.send())
.await
{
Ok(res) => {
Expand All @@ -112,13 +113,34 @@ impl<ApiVer: StaticVersionType> StatePeers<ApiVer> {
})
.await
}

async fn retry<T, E>(retry: usize, f: impl Future<Output = Result<T, E>>) -> anyhow::Result<T>
where
E: std::error::Error + Send + Sync + 'static,
{
// Since we have generally have multiple peers we can catch up from, we want a fairly
// aggressive timeout for requests: if a peer is not responding quickly, we're better off
// just trying the next one rather than waiting, and this prevents a malicious peer from
// delaying catchup for a long time.
//
// However, if we set the timeout _too_ aggressively, we might fail to catch up even from an
// honest peer, and thus never make progress. Thus, we start with a timeout of 500ms, which
// is aggressive but still very reasonable for an HTTP request. If that fails with all of
// our peers, we increase the timeout by 1 second for each successive retry, until we
// eventually succeed.
timeout(Duration::from_millis(500) * (retry as u32 + 1), f)
.await
.context(format!("operation timed out (retry {retry})"))?
.context(format!("operation failed (retry {retry})"))
}
}

#[async_trait]
impl<ApiVer: StaticVersionType> StateCatchup for StatePeers<ApiVer> {
#[tracing::instrument(skip(self, _instance))]
async fn try_fetch_accounts(
&self,
retry: usize,
_instance: &NodeState,
height: u64,
view: ViewNumber,
Expand All @@ -138,7 +160,7 @@ impl<ApiVer: StaticVersionType> StateCatchup for StatePeers<ApiVer> {
continue;
}
};
let snapshot = match req.send().await {
let snapshot = match Self::retry(retry, req.send()).await {
Ok(res) => res,
Err(err) => {
tracing::info!(peer = %client.url, "error fetching accounts from peer: {err:#}");
Expand Down Expand Up @@ -166,17 +188,21 @@ impl<ApiVer: StaticVersionType> StateCatchup for StatePeers<ApiVer> {
#[tracing::instrument(skip(self, _instance, mt))]
async fn try_remember_blocks_merkle_tree(
&self,
retry: usize,
_instance: &NodeState,
height: u64,
view: ViewNumber,
mt: &mut BlockMerkleTree,
) -> anyhow::Result<()> {
for client in self.clients.iter() {
tracing::debug!(peer = %client.url, "fetching frontier from peer");
match client
.get::<BlocksFrontier>(&format!("catchup/{height}/{}/blocks", view.u64()))
.send()
.await
match Self::retry(
retry,
client
.get::<BlocksFrontier>(&format!("catchup/{height}/{}/blocks", view.u64()))
.send(),
)
.await
{
Ok(frontier) => {
let Some(elem) = frontier.elem() else {
Expand All @@ -201,14 +227,18 @@ impl<ApiVer: StaticVersionType> StateCatchup for StatePeers<ApiVer> {

async fn try_fetch_chain_config(
&self,
retry: usize,
commitment: Commitment<ChainConfig>,
) -> anyhow::Result<ChainConfig> {
for client in self.clients.iter() {
tracing::info!("Fetching chain config from {}", client.url);
match client
.get::<ChainConfig>(&format!("catchup/chain-config/{}", commitment))
.send()
.await
match Self::retry(
retry,
client
.get::<ChainConfig>(&format!("catchup/chain-config/{}", commitment))
.send(),
)
.await
{
Ok(cf) => {
if cf.commit() == commitment {
Expand Down Expand Up @@ -358,9 +388,10 @@ where
{
// TODO: add a test for the account proof validation
// issue # 2102 (https://github.com/EspressoSystems/espresso-sequencer/issues/2102)
#[tracing::instrument(skip(self, instance))]
#[tracing::instrument(skip(self, _retry, instance))]
async fn try_fetch_accounts(
&self,
_retry: usize,
instance: &NodeState,
block_height: u64,
view: ViewNumber,
Expand All @@ -374,9 +405,10 @@ where
.0)
}

#[tracing::instrument(skip(self, instance, mt))]
#[tracing::instrument(skip(self, _retry, instance, mt))]
async fn try_remember_blocks_merkle_tree(
&self,
_retry: usize,
instance: &NodeState,
bh: u64,
view: ViewNumber,
Expand All @@ -401,6 +433,7 @@ where

async fn try_fetch_chain_config(
&self,
_retry: usize,
commitment: Commitment<ChainConfig>,
) -> anyhow::Result<ChainConfig> {
let cf = self.db.get_chain_config(commitment).await?;
Expand Down Expand Up @@ -461,6 +494,7 @@ impl NullStateCatchup {
impl StateCatchup for NullStateCatchup {
async fn try_fetch_accounts(
&self,
_retry: usize,
_instance: &NodeState,
_height: u64,
_view: ViewNumber,
Expand All @@ -472,6 +506,7 @@ impl StateCatchup for NullStateCatchup {

async fn try_remember_blocks_merkle_tree(
&self,
_retry: usize,
_instance: &NodeState,
_height: u64,
_view: ViewNumber,
Expand All @@ -482,6 +517,7 @@ impl StateCatchup for NullStateCatchup {

async fn try_fetch_chain_config(
&self,
_retry: usize,
commitment: Commitment<ChainConfig>,
) -> anyhow::Result<ChainConfig> {
self.chain_configs
Expand Down
3 changes: 3 additions & 0 deletions types/src/v0/impls/instance_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ pub mod mock {
impl StateCatchup for MockStateCatchup {
async fn try_fetch_accounts(
&self,
_retry: usize,
_instance: &NodeState,
_height: u64,
view: ViewNumber,
Expand All @@ -228,6 +229,7 @@ pub mod mock {

async fn try_remember_blocks_merkle_tree(
&self,
_retry: usize,
_instance: &NodeState,
_height: u64,
view: ViewNumber,
Expand All @@ -252,6 +254,7 @@ pub mod mock {

async fn try_fetch_chain_config(
&self,
_retry: usize,
_commitment: Commitment<ChainConfig>,
) -> anyhow::Result<ChainConfig> {
Ok(ChainConfig::default())
Expand Down
Loading

0 comments on commit 5782bfc

Please sign in to comment.