Skip to content

Commit

Permalink
fix: edge cases causing bans during header/block sync
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Dec 15, 2021
1 parent e4bace6 commit 66624bf
Show file tree
Hide file tree
Showing 18 changed files with 248 additions and 84 deletions.
7 changes: 6 additions & 1 deletion applications/tari_base_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use tari_core::{
state_machine_service::{initializer::BaseNodeStateMachineInitializer, states::HorizonSyncConfig},
BaseNodeStateMachineConfig,
BlockSyncConfig,
LocalNodeCommsInterface,
StateMachineHandle,
},
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, BlockchainDatabase},
Expand Down Expand Up @@ -211,6 +212,7 @@ where B: BlockchainBackend + 'static
config: &GlobalConfig,
) -> UnspawnedCommsNode {
let dht = handles.expect_handle::<Dht>();
let base_node_service = handles.expect_handle::<LocalNodeCommsInterface>();
let builder = RpcServer::builder();
let builder = match config.rpc_max_simultaneous_sessions {
Some(limit) => builder.with_maximum_simultaneous_sessions(limit),
Expand All @@ -228,7 +230,10 @@ where B: BlockchainBackend + 'static
// Add your RPC services here ‍🏴‍☠️️☮️🌊
let rpc_server = rpc_server
.add_service(dht.rpc_service())
.add_service(base_node::create_base_node_sync_rpc_service(db.clone()))
.add_service(base_node::create_base_node_sync_rpc_service(
db.clone(),
base_node_service,
))
.add_service(mempool::create_mempool_rpc_service(
handles.expect_handle::<MempoolHandle>(),
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,15 @@ where T: BlockchainBackend + 'static
) -> Result<(), CommsInterfaceError> {
let NewBlock { block_hash } = new_block;

if self.blockchain_db.inner().is_add_block_locked() {
info!(
target: LOG_TARGET,
"Ignoring block message ({}) because add_block is locked",
block_hash.to_hex()
);
return Ok(());
}

// Only a single block request can complete at a time.
// As multiple NewBlock requests arrive from propagation, this semaphore prevents multiple requests to nodes for
// the same full block. The first request that succeeds will stop the node from requesting the block from any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,16 @@ use crate::{
comms_interface::LocalNodeCommsInterface,
state_machine_service::{
states,
states::{BaseNodeState, HorizonSyncConfig, StateEvent, StateInfo, StatusInfo, SyncPeerConfig, SyncStatus},
states::{
BaseNodeState,
HeaderSyncState,
HorizonSyncConfig,
StateEvent,
StateInfo,
StatusInfo,
SyncPeerConfig,
SyncStatus,
},
},
sync::{BlockSyncConfig, SyncValidators},
},
Expand Down Expand Up @@ -137,22 +146,51 @@ impl<B: BlockchainBackend + 'static> BaseNodeStateMachine<B> {
/// Describe the Finite State Machine for the base node. This function describes _every possible_ state
/// transition for the node given its current state and an event that gets triggered.
pub fn transition(&self, state: BaseNodeState, event: StateEvent) -> BaseNodeState {
let db = self.db.inner();
use self::{BaseNodeState::*, StateEvent::*, SyncStatus::*};
match (state, event) {
(Starting(s), Initialized) => Listening(s.into()),
(Listening(_), FallenBehind(Lagging(_, sync_peers))) => HeaderSync(sync_peers.into()),
(HeaderSync(s), HeaderSyncFailed) => Waiting(s.into()),
(HeaderSync(s), Continue | NetworkSilence) => Listening(s.into()),
(
Listening(_),
FallenBehind(Lagging {
local: local_metadata,
sync_peers,
..
}),
) => {
db.set_add_block_lock_flag();
HeaderSync(HeaderSyncState::new(sync_peers, local_metadata))
},
(HeaderSync(s), HeaderSyncFailed) => {
db.clear_add_block_lock_flag();
Waiting(s.into())
},
(HeaderSync(s), Continue | NetworkSilence) => {
db.clear_add_block_lock_flag();
Listening(s.into())
},
(HeaderSync(s), HeadersSynchronized(_)) => DecideNextSync(s.into()),

(DecideNextSync(_), ProceedToHorizonSync(peer)) => HorizonStateSync(peer.into()),
(DecideNextSync(s), Continue) => Listening(s.into()),
(DecideNextSync(s), Continue) => {
db.clear_add_block_lock_flag();
Listening(s.into())
},
(HorizonStateSync(s), HorizonStateSynchronized) => BlockSync(s.into()),
(HorizonStateSync(s), HorizonStateSyncFailure) => Waiting(s.into()),
(HorizonStateSync(s), HorizonStateSyncFailure) => {
db.clear_add_block_lock_flag();
Waiting(s.into())
},

(DecideNextSync(_), ProceedToBlockSync(peer)) => BlockSync(peer.into()),
(BlockSync(s), BlocksSynchronized) => Listening(s.into()),
(BlockSync(s), BlockSyncFailed) => Waiting(s.into()),
(BlockSync(s), BlocksSynchronized) => {
db.clear_add_block_lock_flag();
Listening(s.into())
},
(BlockSync(s), BlockSyncFailed) => {
db.clear_add_block_lock_flag();
Waiting(s.into())
},

(Waiting(s), Continue) => Listening(s.into()),
(_, FatalError(s)) => Shutdown(states::Shutdown::with_reason(s)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::base_node::{
state_machine_service::states::{
BlockSync,
DecideNextSync,
HeaderSync,
HeaderSyncState,
HorizonStateSync,
Listening,
ListeningInfo,
Expand All @@ -44,7 +44,7 @@ use crate::base_node::{
#[derive(Debug)]
pub enum BaseNodeState {
Starting(Starting),
HeaderSync(HeaderSync),
HeaderSync(HeaderSyncState),
DecideNextSync(DecideNextSync),
HorizonStateSync(HorizonStateSync),
BlockSync(BlockSync),
Expand Down Expand Up @@ -86,7 +86,11 @@ impl<E: std::error::Error> From<E> for StateEvent {
#[derive(Debug, Clone, PartialEq)]
pub enum SyncStatus {
// We are behind the chain tip.
Lagging(ChainMetadata, Vec<SyncPeer>),
Lagging {
local: ChainMetadata,
network: ChainMetadata,
sync_peers: Vec<SyncPeer>,
},
UpToDate,
}

Expand All @@ -104,12 +108,14 @@ impl Display for SyncStatus {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
use SyncStatus::*;
match self {
Lagging(m, v) => write!(
Lagging {
network, sync_peers, ..
} => write!(
f,
"Lagging behind {} peers (#{}, Difficulty: {})",
v.len(),
m.height_of_longest_chain(),
m.accumulated_difficulty(),
sync_peers.len(),
network.height_of_longest_chain(),
network.accumulated_difficulty(),
),
UpToDate => f.write_str("UpToDate"),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
use std::{cmp::Ordering, time::Instant};

use log::*;
use tari_common_types::chain_metadata::ChainMetadata;

use crate::{
base_node::{
comms_interface::BlockEvent,
state_machine_service::states::{BlockSyncInfo, Listening, StateEvent, StateInfo, StatusInfo},
state_machine_service::states::{BlockSyncInfo, StateEvent, StateInfo, StatusInfo},
sync::{BlockHeaderSyncError, HeaderSynchronizer, SyncPeer},
BaseNodeStateMachine,
},
Expand All @@ -36,14 +37,15 @@ use crate::{

const LOG_TARGET: &str = "c::bn::header_sync";

#[derive(Clone, Debug, Default)]
pub struct HeaderSync {
#[derive(Clone, Debug)]
pub struct HeaderSyncState {
sync_peers: Vec<SyncPeer>,
is_synced: bool,
local_metadata: ChainMetadata,
}

impl HeaderSync {
pub fn new(mut sync_peers: Vec<SyncPeer>) -> Self {
impl HeaderSyncState {
pub fn new(mut sync_peers: Vec<SyncPeer>, local_metadata: ChainMetadata) -> Self {
// Sort by latency lowest to highest
sync_peers.sort_by(|a, b| match (a.latency(), b.latency()) {
(None, None) => Ordering::Equal,
Expand All @@ -55,6 +57,7 @@ impl HeaderSync {
Self {
sync_peers,
is_synced: false,
local_metadata,
}
}

Expand All @@ -77,6 +80,7 @@ impl HeaderSync {
shared.connectivity.clone(),
&self.sync_peers,
shared.randomx_factory.clone(),
&self.local_metadata,
);

let status_event_sender = shared.status_event_sender.clone();
Expand Down Expand Up @@ -141,14 +145,3 @@ impl HeaderSync {
}
}
}

impl From<Listening> for HeaderSync {
fn from(_: Listening) -> Self {
Default::default()
}
}
impl From<Vec<SyncPeer>> for HeaderSync {
fn from(peers: Vec<SyncPeer>) -> Self {
Self::new(peers)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::{
states::{
BlockSync,
DecideNextSync,
HeaderSync,
HeaderSyncState,
StateEvent,
StateEvent::FatalError,
StateInfo,
Expand Down Expand Up @@ -195,7 +195,7 @@ impl Listening {
if self.is_synced &&
best_metadata.height_of_longest_chain() == local.height_of_longest_chain() + 1 &&
time_since_better_block
.map(|ts: Instant| ts.elapsed() < Duration::from_secs(30))
.map(|ts: Instant| ts.elapsed() < Duration::from_secs(60))
.unwrap_or(true)
{
if time_since_better_block.is_none() {
Expand All @@ -217,7 +217,7 @@ impl Listening {
peer_metadata_list
};

let local = match shared.db.get_chain_metadata().await {
let local_metadata = match shared.db.get_chain_metadata().await {
Ok(m) => m,
Err(e) => {
return FatalError(format!("Could not get local blockchain metadata. {}", e));
Expand All @@ -227,7 +227,7 @@ impl Listening {

let sync_mode = determine_sync_mode(
shared.config.blocks_behind_before_considered_lagging,
&local,
&local_metadata,
best_metadata,
sync_peers,
);
Expand Down Expand Up @@ -266,8 +266,8 @@ impl From<Waiting> for Listening {
}
}

impl From<HeaderSync> for Listening {
fn from(sync: HeaderSync) -> Self {
impl From<HeaderSyncState> for Listening {
fn from(sync: HeaderSyncState) -> Self {
Self {
is_synced: sync.is_synced(),
}
Expand Down Expand Up @@ -356,12 +356,15 @@ fn determine_sync_mode(
return UpToDate;
};

let sync_peers = sync_peers.into_iter().cloned().collect();
debug!(
target: LOG_TARGET,
"Lagging (local height = {}, network height = {})", local_tip_height, network_tip_height
);
Lagging(network.clone(), sync_peers)
Lagging {
local: local.clone(),
network: network.clone(),
sync_peers: sync_peers.into_iter().cloned().collect(),
}
} else {
info!(
target: LOG_TARGET,
Expand Down Expand Up @@ -497,28 +500,28 @@ mod test {

let network = ChainMetadata::new(0, Vec::new(), 0, 0, 500_001);
match determine_sync_mode(0, &local, &network, vec![]) {
SyncStatus::Lagging(n, _) => assert_eq!(n, network),
SyncStatus::Lagging { network: n, .. } => assert_eq!(n, network),
_ => panic!(),
}

let local = ChainMetadata::new(100, Vec::new(), 50, 50, 500_000);
let network = ChainMetadata::new(150, Vec::new(), 0, 0, 500_001);
match determine_sync_mode(0, &local, &network, vec![]) {
SyncStatus::Lagging(n, _) => assert_eq!(n, network),
SyncStatus::Lagging { network: n, .. } => assert_eq!(n, network),
_ => panic!(),
}

let local = ChainMetadata::new(0, Vec::new(), 50, 50, 500_000);
let network = ChainMetadata::new(100, Vec::new(), 0, 0, 500_001);
match determine_sync_mode(0, &local, &network, vec![]) {
SyncStatus::Lagging(n, _) => assert_eq!(n, network),
SyncStatus::Lagging { network: n, .. } => assert_eq!(n, network),
_ => panic!(),
}

let local = ChainMetadata::new(99, Vec::new(), 50, 50, 500_000);
let network = ChainMetadata::new(150, Vec::new(), 0, 0, 500_001);
match determine_sync_mode(0, &local, &network, vec![]) {
SyncStatus::Lagging(n, _) => assert_eq!(n, network),
SyncStatus::Lagging { network: n, .. } => assert_eq!(n, network),
_ => panic!(),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub(crate) mod helpers;
pub use helpers::SyncPeerConfig;

mod header_sync;
pub use header_sync::HeaderSync;
pub use header_sync::HeaderSyncState;

mod sync_decide;
pub use sync_decide::DecideNextSync;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use log::*;
use crate::{
base_node::{
state_machine_service::{
states::{HeaderSync, StateEvent},
states::{HeaderSyncState, StateEvent},
BaseNodeStateMachine,
},
sync::SyncPeer,
Expand Down Expand Up @@ -118,8 +118,8 @@ fn find_best_latency<'a, I: IntoIterator<Item = &'a SyncPeer>>(iter: I) -> Optio
.cloned()
}

impl From<HeaderSync> for DecideNextSync {
fn from(sync: HeaderSync) -> Self {
impl From<HeaderSyncState> for DecideNextSync {
fn from(sync: HeaderSyncState) -> Self {
Self {
sync_peers: sync.into_sync_peers(),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::time::Duration;
use log::info;
use tokio::time::sleep;

use crate::base_node::state_machine_service::states::{BlockSync, HeaderSync, HorizonStateSync, StateEvent};
use crate::base_node::state_machine_service::states::{BlockSync, HeaderSyncState, HorizonStateSync, StateEvent};

const LOG_TARGET: &str = "c::bn::state_machine_service::states::waiting";

Expand Down Expand Up @@ -68,8 +68,8 @@ impl From<BlockSync> for Waiting {
}
}

impl From<HeaderSync> for Waiting {
fn from(_: HeaderSync) -> Self {
impl From<HeaderSyncState> for Waiting {
fn from(_: HeaderSyncState) -> Self {
Default::default()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,10 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
.fetch_chain_header_by_block_hash(block.hash.clone())
.await?
.ok_or_else(|| {
BlockSyncError::ProtocolViolation("Peer sent hash for block header we do not have".into())
BlockSyncError::ProtocolViolation(format!(
"Peer sent hash ({}) for block header we do not have",
block.hash.to_hex()
))
})?;

let current_height = header.height();
Expand Down
Loading

0 comments on commit 66624bf

Please sign in to comment.