Skip to content

Commit

Permalink
fix: potential index out of bounds (#5775)
Browse files Browse the repository at this point in the history
Description
---
Fixes a potential index out of bounds error in sync

Motivation and Context
---
If more than one peer is given to the any of the sync services, and any
peer except the last is removed from the list, the code will throw an
index-out-of-bound error.
  • Loading branch information
SWvheerden authored Oct 5, 2023
1 parent ff5a5d8 commit f17ac6b
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 22 deletions.
4 changes: 4 additions & 0 deletions base_layer/core/src/base_node/sync/block_sync/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ pub enum BlockSyncError {
FixedHashSizeError(#[from] FixedHashSizeError),
#[error("This sync round failed")]
SyncRoundFailed,
#[error("Could not find peer info")]
PeerNotFound,
}

impl BlockSyncError {
Expand All @@ -90,6 +92,7 @@ impl BlockSyncError {
BlockSyncError::AllSyncPeersExceedLatency => "AllSyncPeersExceedLatency",
BlockSyncError::FixedHashSizeError(_) => "FixedHashSizeError",
BlockSyncError::SyncRoundFailed => "SyncRoundFailed",
BlockSyncError::PeerNotFound => "PeerNotFound",
}
}
}
Expand All @@ -106,6 +109,7 @@ impl BlockSyncError {
BlockSyncError::NoMoreSyncPeers(_) |
BlockSyncError::AllSyncPeersExceedLatency |
BlockSyncError::FailedToConstructChainBlock |
BlockSyncError::PeerNotFound |
BlockSyncError::SyncRoundFailed => None,

// short ban
Expand Down
22 changes: 14 additions & 8 deletions base_layer/core/src/base_node/sync/block_sync/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,9 @@ impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> {
sync_peer_node_ids.len()
);
let mut latency_counter = 0usize;
for (i, node_id) in sync_peer_node_ids.iter().enumerate() {
let sync_peer = &self.sync_peers[i];
for node_id in sync_peer_node_ids {
let peer_index = self.get_sync_peer_index(&node_id).ok_or(BlockSyncError::PeerNotFound)?;
let sync_peer = &self.sync_peers[peer_index];
self.hooks.call_on_starting_hook(sync_peer);
let mut conn = match self.connect_to_sync_peer(node_id.clone()).await {
Ok(val) => val,
Expand All @@ -153,7 +154,7 @@ impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> {
target: LOG_TARGET,
"Failed to connect to sync peer `{}`: {}", node_id, e
);
self.remove_sync_peer(node_id);
self.remove_sync_peer(&node_id);
continue;
},
};
Expand All @@ -170,15 +171,15 @@ impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> {
target: LOG_TARGET,
"Failed to obtain RPC connection from sync peer `{}`: {}", node_id, e
);
self.remove_sync_peer(node_id);
self.remove_sync_peer(&node_id);
continue;
},
};
let latency = client
.get_last_request_latency()
.expect("unreachable panic: last request latency must be set after connect");
self.sync_peers[i].set_latency(latency);
let sync_peer = self.sync_peers[i].clone();
self.sync_peers[peer_index].set_latency(latency);
let sync_peer = self.sync_peers[peer_index].clone();
info!(
target: LOG_TARGET,
"Attempting to synchronize blocks with `{}` latency: {:.2?}", node_id, latency
Expand All @@ -192,13 +193,13 @@ impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> {
if let Some(reason) = ban_reason {
warn!(target: LOG_TARGET, "{}", err);
self.peer_ban_manager
.ban_peer_if_required(node_id, &Some(reason.clone()))
.ban_peer_if_required(&node_id, &Some(reason.clone()))
.await;
}
if let BlockSyncError::MaxLatencyExceeded { .. } = err {
latency_counter += 1;
} else {
self.remove_sync_peer(node_id);
self.remove_sync_peer(&node_id);
}
},
}
Expand Down Expand Up @@ -423,4 +424,9 @@ impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> {
self.sync_peers.remove(pos);
}
}

// Helper function to get the index to the node_id inside of the vec of peers
fn get_sync_peer_index(&mut self, node_id: &NodeId) -> Option<usize> {
self.sync_peers.iter().position(|p| p.node_id() == node_id)
}
}
3 changes: 3 additions & 0 deletions base_layer/core/src/base_node/sync/header_sync/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ use crate::{blocks::BlockError, chain_storage::ChainStorageError, common::BanRea
pub enum BlockHeaderSyncError {
#[error("No more sync peers available: {0}")]
NoMoreSyncPeers(String),
#[error("Could not find peer info")]
PeerNotFound,
#[error("RPC error: {0}")]
RpcError(#[from] RpcError),
#[error("RPC request failed: {0}")]
Expand Down Expand Up @@ -103,6 +105,7 @@ impl BlockHeaderSyncError {
BlockHeaderSyncError::AllSyncPeersExceedLatency |
BlockHeaderSyncError::ConnectivityError(_) |
BlockHeaderSyncError::NotInSync |
BlockHeaderSyncError::PeerNotFound |
BlockHeaderSyncError::ChainStorageError(_) => None,

// short ban
Expand Down
17 changes: 12 additions & 5 deletions base_layer/core/src/base_node/sync/header_sync/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
sync_peer_node_ids.len()
);
let mut latency_counter = 0usize;
for (i, node_id) in sync_peer_node_ids.iter().enumerate() {
match self.connect_and_attempt_sync(i, node_id, max_latency).await {
for node_id in sync_peer_node_ids {
match self.connect_and_attempt_sync(&node_id, max_latency).await {
Ok(peer) => return Ok(peer),
Err(err) => {
let ban_reason = BlockHeaderSyncError::get_ban_reason(
Expand All @@ -156,13 +156,13 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
if let Some(reason) = ban_reason {
warn!(target: LOG_TARGET, "{}", err);
self.peer_ban_manager
.ban_peer_if_required(node_id, &Some(reason.clone()))
.ban_peer_if_required(&node_id, &Some(reason.clone()))
.await;
}
if let BlockHeaderSyncError::MaxLatencyExceeded { .. } = err {
latency_counter += 1;
} else {
self.remove_sync_peer(node_id);
self.remove_sync_peer(&node_id);
}
},
}
Expand All @@ -179,10 +179,12 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {

async fn connect_and_attempt_sync(
&mut self,
peer_index: usize,
node_id: &NodeId,
max_latency: Duration,
) -> Result<SyncPeer, BlockHeaderSyncError> {
let peer_index = self
.get_sync_peer_index(node_id)
.ok_or(BlockHeaderSyncError::PeerNotFound)?;
let sync_peer = &self.sync_peers[peer_index];
self.hooks.call_on_starting_hook(sync_peer);

Expand Down Expand Up @@ -774,6 +776,11 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
self.sync_peers.remove(pos);
}
}

// Helper function to get the index to the node_id inside of the vec of peers
fn get_sync_peer_index(&mut self, node_id: &NodeId) -> Option<usize> {
self.sync_peers.iter().position(|p| p.node_id() == node_id)
}
}

struct ChainSplitInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ pub enum HorizonSyncError {
FixedHashSizeError(#[from] FixedHashSizeError),
#[error("No more sync peers available: {0}")]
NoMoreSyncPeers(String),
#[error("Could not find peer info")]
PeerNotFound,
}

impl From<TryFromIntError> for HorizonSyncError {
Expand All @@ -117,6 +119,7 @@ impl HorizonSyncError {
HorizonSyncError::RpcError(_) |
HorizonSyncError::RpcStatus(_) |
HorizonSyncError::NoMoreSyncPeers(_) |
HorizonSyncError::PeerNotFound |
HorizonSyncError::JoinError(_) => None,

// short ban
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
sync_peer_node_ids.len()
);
let mut latency_counter = 0usize;
for (i, node_id) in sync_peer_node_ids.iter().enumerate() {
match self.connect_and_attempt_sync(i, node_id, header).await {
for node_id in sync_peer_node_ids {
match self.connect_and_attempt_sync(&node_id, header).await {
Ok(_) => return Ok(()),
// Try another peer
Err(err) => {
Expand All @@ -204,13 +204,13 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
if let Some(reason) = ban_reason {
warn!(target: LOG_TARGET, "{}", err);
self.peer_ban_manager
.ban_peer_if_required(node_id, &Some(reason.clone()))
.ban_peer_if_required(&node_id, &Some(reason.clone()))
.await;
}
if let HorizonSyncError::MaxLatencyExceeded { .. } = err {
latency_counter += 1;
} else {
self.remove_sync_peer(node_id);
self.remove_sync_peer(&node_id);
}
},
}
Expand All @@ -227,14 +227,14 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {

async fn connect_and_attempt_sync(
&mut self,
peer_index: usize,
node_id: &NodeId,
header: &BlockHeader,
) -> Result<(), HorizonSyncError> {
{
let sync_peer = &self.sync_peers[peer_index];
self.hooks.call_on_starting_hook(sync_peer);
}
let peer_index = self
.get_sync_peer_index(node_id)
.ok_or(HorizonSyncError::PeerNotFound)?;
let sync_peer = &self.sync_peers[peer_index];
self.hooks.call_on_starting_hook(sync_peer);

let mut conn = self.dial_sync_peer(node_id).await?;
debug!(
Expand Down Expand Up @@ -985,6 +985,11 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
}
}

// Helper function to get the index to the node_id inside of the vec of peers
fn get_sync_peer_index(&mut self, node_id: &NodeId) -> Option<usize> {
self.sync_peers.iter().position(|p| p.node_id() == node_id)
}

#[inline]
fn db(&self) -> &AsyncBlockchainDb<B> {
&self.db
Expand Down

0 comments on commit f17ac6b

Please sign in to comment.