Skip to content

Commit

Permalink
feat(core/sync): add sync error status
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Sep 20, 2022
1 parent 595e334 commit 827595f
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 34 deletions.
1 change: 1 addition & 0 deletions applications/tari_app_grpc/proto/base_node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ enum BaseNodeState{
CONNECTING = 3;
BLOCK_SYNC = 4;
LISTENING = 5;
SYNC_FAILED = 6;
}

/// return type of GetNewBlockTemplate
Expand Down
24 changes: 7 additions & 17 deletions applications/tari_app_grpc/src/conversions/base_node_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,25 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::borrow::Borrow;

use tari_core::base_node::state_machine_service::states::{
StateInfo,
StateInfo::{BlockSync, Connecting, HeaderSync, HorizonSync, Listening, StartUp},
StateInfo::{BlockSync, Connecting, HeaderSync, HorizonSync, Listening, StartUp, SyncFailed},
};

use crate::tari_rpc as grpc;

impl From<StateInfo> for grpc::BaseNodeState {
fn from(info: StateInfo) -> Self {
match info {
StartUp => grpc::BaseNodeState::HeaderSync,
HeaderSync(_) => grpc::BaseNodeState::HeaderSync,
HorizonSync(_) => grpc::BaseNodeState::HorizonSync,
Connecting(_) => grpc::BaseNodeState::Connecting,
BlockSync(_) => grpc::BaseNodeState::BlockSync,
Listening(_) => grpc::BaseNodeState::Listening,
}
}
}

impl From<&StateInfo> for grpc::BaseNodeState {
fn from(info: &StateInfo) -> Self {
match info {
impl<T: Borrow<StateInfo>> From<T> for grpc::BaseNodeState {
fn from(info: T) -> Self {
match info.borrow() {
StartUp => grpc::BaseNodeState::HeaderSync,
HeaderSync(_) => grpc::BaseNodeState::HeaderSync,
HorizonSync(_) => grpc::BaseNodeState::HorizonSync,
Connecting(_) => grpc::BaseNodeState::Connecting,
BlockSync(_) => grpc::BaseNodeState::BlockSync,
Listening(_) => grpc::BaseNodeState::Listening,
SyncFailed(_) => grpc::BaseNodeState::SyncFailed,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ impl BlockSync {
StateEvent::BlocksSynchronized
},
Err(err) => {
let _ignore = shared.status_event_sender.send(StatusInfo {
bootstrapped,
state_info: StateInfo::SyncFailed(err.to_short_str().to_string()),
randomx_vm_cnt,
randomx_vm_flags,
});
log_mdc::extend(mdc);
warn!(target: LOG_TARGET, "Block sync failed: {}", err);
StateEvent::BlockSyncFailed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,14 @@ pub enum StateInfo {
HeaderSync(Option<BlockSyncInfo>),
HorizonSync(HorizonSyncInfo),
BlockSync(BlockSyncInfo),
SyncFailed(String),
Listening(ListeningInfo),
}

impl StateInfo {
pub fn short_desc(&self) -> String {
use StateInfo::{BlockSync, Connecting, HeaderSync, HorizonSync, Listening, StartUp};
#[allow(clippy::enum_glob_use)]
use StateInfo::*;
match self {
StartUp => "Starting up".to_string(),
Connecting(sync_peer) => format!(
Expand All @@ -192,6 +194,7 @@ impl StateInfo {

BlockSync(info) => format!("Syncing blocks: {}", info.sync_progress_string()),
Listening(_) => "Listening".to_string(),
SyncFailed(details) => format!("Sync failed: {}", details),
}
}

Expand All @@ -203,9 +206,10 @@ impl StateInfo {
}

pub fn is_synced(&self) -> bool {
use StateInfo::{BlockSync, Connecting, HeaderSync, HorizonSync, Listening, StartUp};
#[allow(clippy::enum_glob_use)]
use StateInfo::*;
match self {
StartUp | Connecting(_) | HeaderSync(_) | HorizonSync(_) | BlockSync(_) => false,
StartUp | Connecting(_) | HeaderSync(_) | HorizonSync(_) | BlockSync(_) | SyncFailed(_) => false,
Listening(info) => info.is_synced(),
}
}
Expand All @@ -223,6 +227,7 @@ impl Display for StateInfo {
HorizonSync(info) => write!(f, "Synchronizing horizon state: {}", info),
BlockSync(info) => write!(f, "Synchronizing blocks: {}", info),
Listening(info) => write!(f, "Listening: {}", info),
SyncFailed(details) => write!(f, "Sync failed: {}", details),
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,21 +144,32 @@ impl HeaderSyncState {
self.is_synced = true;
StateEvent::HeadersSynchronized(sync_peer)
},
Err(err @ BlockHeaderSyncError::SyncFailedAllPeers) => {
log_mdc::extend(mdc);
warn!(target: LOG_TARGET, "{}. Continuing...", err);
StateEvent::Continue
},
Err(err @ BlockHeaderSyncError::NetworkSilence) => {
log_mdc::extend(mdc);
warn!(target: LOG_TARGET, "{}", err);
self.is_synced = true;
StateEvent::NetworkSilence
},
Err(err) => {
log_mdc::extend(mdc);
debug!(target: LOG_TARGET, "Header sync failed: {}", err);
StateEvent::HeaderSyncFailed
let _ignore = shared.status_event_sender.send(StatusInfo {
bootstrapped,
state_info: StateInfo::SyncFailed("HeaderSyncFailed".to_string()),
randomx_vm_cnt,
randomx_vm_flags,
});
match err {
BlockHeaderSyncError::SyncFailedAllPeers => {
error!(target: LOG_TARGET, "Header sync failed with all peers. Error: {}", err);
log_mdc::extend(mdc);
warn!(target: LOG_TARGET, "{}. Continuing...", err);
StateEvent::Continue
},
BlockHeaderSyncError::NetworkSilence => {
log_mdc::extend(mdc);
warn!(target: LOG_TARGET, "{}", err);
self.is_synced = true;
StateEvent::NetworkSilence
},
_ => {
log_mdc::extend(mdc);
debug!(target: LOG_TARGET, "Header sync failed: {}", err);
StateEvent::HeaderSyncFailed
},
}
},
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ impl HorizonStateSync {
StateEvent::HorizonStateSynchronized
},
Err(err) => {
let _ignore = shared.status_event_sender.send(StatusInfo {
bootstrapped,
state_info: StateInfo::SyncFailed("HorizonSyncFailed".to_string()),
randomx_vm_cnt,
randomx_vm_flags,
});
warn!(target: LOG_TARGET, "Synchronizing horizon state has failed. {}", err);
StateEvent::HorizonStateSyncFailure
},
Expand Down
19 changes: 19 additions & 0 deletions base_layer/core/src/base_node/sync/block_sync/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,22 @@ pub enum BlockSyncError {
#[error("FixedHash size error: {0}")]
FixedHashSizeError(#[from] FixedHashSizeError),
}

impl BlockSyncError {
pub fn to_short_str(&self) -> &'static str {
match self {
BlockSyncError::RpcError(_) => "RpcError",
BlockSyncError::RpcRequestError(_) => "RpcRequestError",
BlockSyncError::ChainStorageError(_) => "ChainStorageError",
BlockSyncError::PeerSentBlockThatDidNotFormAChain { .. } => "PeerSentBlockThatDidNotFormAChain",
BlockSyncError::ConnectivityError(_) => "ConnectivityError",
BlockSyncError::NoSyncPeers => "NoSyncPeers",
BlockSyncError::ValidationError(_) => "ValidationError",
BlockSyncError::FailedToConstructChainBlock => "FailedToConstructChainBlock",
BlockSyncError::ProtocolViolation(_) => "ProtocolViolation",
BlockSyncError::MaxLatencyExceeded { .. } => "MaxLatencyExceeded",
BlockSyncError::AllSyncPeersExceedLatency => "AllSyncPeersExceedLatency",
BlockSyncError::FixedHashSizeError(_) => "FixedHashSizeError",
}
}
}
5 changes: 5 additions & 0 deletions base_layer/core/src/base_node/sync/block_sync/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {

async fn attempt_block_sync(&mut self, max_latency: Duration) -> Result<(), BlockSyncError> {
let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::<Vec<_>>();
info!(
target: LOG_TARGET,
"Attempting to sync blocks({} sync peers)",
sync_peer_node_ids.len()
);
for (i, node_id) in sync_peer_node_ids.iter().enumerate() {
let sync_peer = &self.sync_peers[i];
self.hooks.call_on_starting_hook(sync_peer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
#[allow(clippy::too_many_lines)]
pub async fn try_sync_from_all_peers(&mut self, max_latency: Duration) -> Result<SyncPeer, BlockHeaderSyncError> {
let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::<Vec<_>>();
info!(
target: LOG_TARGET,
"Attempting to sync headers ({} sync peers)",
sync_peer_node_ids.len()
);
for (i, node_id) in sync_peer_node_ids.iter().enumerate() {
{
let sync_peer = &self.sync_peers[i];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
}

async fn sync(&mut self, header: &BlockHeader) -> Result<(), HorizonSyncError> {
info!(
target: LOG_TARGET,
"Attempting to sync blocks({} sync peers)",
self.sync_peers.len()
);
for (i, sync_peer) in self.sync_peers.iter().enumerate() {
self.hooks.call_on_starting_hook(sync_peer);
let mut connection = self.connectivity.dial_peer(sync_peer.node_id().clone()).await?;
Expand Down

0 comments on commit 827595f

Please sign in to comment.