From 827595f88a2e2caf42d55a89b2ec6416d2b50322 Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Tue, 20 Sep 2022 18:41:23 +0400 Subject: [PATCH] feat(core/sync): add sync error status --- .../tari_app_grpc/proto/base_node.proto | 1 + .../src/conversions/base_node_state.rs | 24 ++++-------- .../states/block_sync.rs | 6 +++ .../states/events_and_states.rs | 11 ++++-- .../states/header_sync.rs | 39 ++++++++++++------- .../states/horizon_state_sync.rs | 6 +++ .../src/base_node/sync/block_sync/error.rs | 19 +++++++++ .../base_node/sync/block_sync/synchronizer.rs | 5 +++ .../sync/header_sync/synchronizer.rs | 5 +++ .../sync/horizon_state_sync/synchronizer.rs | 5 +++ 10 files changed, 87 insertions(+), 34 deletions(-) diff --git a/applications/tari_app_grpc/proto/base_node.proto b/applications/tari_app_grpc/proto/base_node.proto index 1499cfc73b8..013fe970d4b 100644 --- a/applications/tari_app_grpc/proto/base_node.proto +++ b/applications/tari_app_grpc/proto/base_node.proto @@ -159,6 +159,7 @@ enum BaseNodeState{ CONNECTING = 3; BLOCK_SYNC = 4; LISTENING = 5; + SYNC_FAILED = 6; } /// return type of GetNewBlockTemplate diff --git a/applications/tari_app_grpc/src/conversions/base_node_state.rs b/applications/tari_app_grpc/src/conversions/base_node_state.rs index 68f29ba3de9..d80be8c234c 100644 --- a/applications/tari_app_grpc/src/conversions/base_node_state.rs +++ b/applications/tari_app_grpc/src/conversions/base_node_state.rs @@ -20,35 +20,25 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +use std::borrow::Borrow; + use tari_core::base_node::state_machine_service::states::{ StateInfo, - StateInfo::{BlockSync, Connecting, HeaderSync, HorizonSync, Listening, StartUp}, + StateInfo::{BlockSync, Connecting, HeaderSync, HorizonSync, Listening, StartUp, SyncFailed}, }; use crate::tari_rpc as grpc; -impl From for grpc::BaseNodeState { - fn from(info: StateInfo) -> Self { - match info { - StartUp => grpc::BaseNodeState::HeaderSync, - HeaderSync(_) => grpc::BaseNodeState::HeaderSync, - HorizonSync(_) => grpc::BaseNodeState::HorizonSync, - Connecting(_) => grpc::BaseNodeState::Connecting, - BlockSync(_) => grpc::BaseNodeState::BlockSync, - Listening(_) => grpc::BaseNodeState::Listening, - } - } -} - -impl From<&StateInfo> for grpc::BaseNodeState { - fn from(info: &StateInfo) -> Self { - match info { +impl> From for grpc::BaseNodeState { + fn from(info: T) -> Self { + match info.borrow() { StartUp => grpc::BaseNodeState::HeaderSync, HeaderSync(_) => grpc::BaseNodeState::HeaderSync, HorizonSync(_) => grpc::BaseNodeState::HorizonSync, Connecting(_) => grpc::BaseNodeState::Connecting, BlockSync(_) => grpc::BaseNodeState::BlockSync, Listening(_) => grpc::BaseNodeState::Listening, + SyncFailed(_) => grpc::BaseNodeState::SyncFailed, } } } diff --git a/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs b/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs index 4d276796eaa..aab3883447b 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs @@ -108,6 +108,12 @@ impl BlockSync { StateEvent::BlocksSynchronized }, Err(err) => { + let _ignore = shared.status_event_sender.send(StatusInfo { + bootstrapped, + state_info: StateInfo::SyncFailed(err.to_short_str().to_string()), + randomx_vm_cnt, + randomx_vm_flags, + }); log_mdc::extend(mdc); warn!(target: LOG_TARGET, "Block sync failed: {}", err); StateEvent::BlockSyncFailed diff --git a/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs b/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs index 7201e170c57..c2721da086f 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs @@ -170,12 +170,14 @@ pub enum StateInfo { HeaderSync(Option), HorizonSync(HorizonSyncInfo), BlockSync(BlockSyncInfo), + SyncFailed(String), Listening(ListeningInfo), } impl StateInfo { pub fn short_desc(&self) -> String { - use StateInfo::{BlockSync, Connecting, HeaderSync, HorizonSync, Listening, StartUp}; + #[allow(clippy::enum_glob_use)] + use StateInfo::*; match self { StartUp => "Starting up".to_string(), Connecting(sync_peer) => format!( @@ -192,6 +194,7 @@ impl StateInfo { BlockSync(info) => format!("Syncing blocks: {}", info.sync_progress_string()), Listening(_) => "Listening".to_string(), + SyncFailed(details) => format!("Sync failed: {}", details), } } @@ -203,9 +206,10 @@ impl StateInfo { } pub fn is_synced(&self) -> bool { - use StateInfo::{BlockSync, Connecting, HeaderSync, HorizonSync, Listening, StartUp}; + #[allow(clippy::enum_glob_use)] + use StateInfo::*; match self { - StartUp | Connecting(_) | HeaderSync(_) | HorizonSync(_) | BlockSync(_) => false, + StartUp | Connecting(_) | HeaderSync(_) | HorizonSync(_) | BlockSync(_) | SyncFailed(_) => false, Listening(info) => info.is_synced(), } } @@ -223,6 +227,7 @@ impl Display for StateInfo { HorizonSync(info) => write!(f, "Synchronizing horizon state: {}", info), BlockSync(info) => write!(f, "Synchronizing blocks: {}", info), Listening(info) => write!(f, "Listening: {}", info), + SyncFailed(details) => write!(f, "Sync failed: {}", details), } } } diff --git a/base_layer/core/src/base_node/state_machine_service/states/header_sync.rs b/base_layer/core/src/base_node/state_machine_service/states/header_sync.rs index 243af2e30e6..236da7dd534 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/header_sync.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/header_sync.rs @@ -144,21 +144,32 @@ impl HeaderSyncState { self.is_synced = true; StateEvent::HeadersSynchronized(sync_peer) }, - Err(err @ BlockHeaderSyncError::SyncFailedAllPeers) => { - log_mdc::extend(mdc); - warn!(target: LOG_TARGET, "{}. Continuing...", err); - StateEvent::Continue - }, - Err(err @ BlockHeaderSyncError::NetworkSilence) => { - log_mdc::extend(mdc); - warn!(target: LOG_TARGET, "{}", err); - self.is_synced = true; - StateEvent::NetworkSilence - }, Err(err) => { - log_mdc::extend(mdc); - debug!(target: LOG_TARGET, "Header sync failed: {}", err); - StateEvent::HeaderSyncFailed + let _ignore = shared.status_event_sender.send(StatusInfo { + bootstrapped, + state_info: StateInfo::SyncFailed("HeaderSyncFailed".to_string()), + randomx_vm_cnt, + randomx_vm_flags, + }); + match err { + BlockHeaderSyncError::SyncFailedAllPeers => { + error!(target: LOG_TARGET, "Header sync failed with all peers. Error: {}", err); + log_mdc::extend(mdc); + warn!(target: LOG_TARGET, "{}. Continuing...", err); + StateEvent::Continue + }, + BlockHeaderSyncError::NetworkSilence => { + log_mdc::extend(mdc); + warn!(target: LOG_TARGET, "{}", err); + self.is_synced = true; + StateEvent::NetworkSilence + }, + _ => { + log_mdc::extend(mdc); + debug!(target: LOG_TARGET, "Header sync failed: {}", err); + StateEvent::HeaderSyncFailed + }, + } }, } } diff --git a/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync.rs b/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync.rs index 5fdf4a2c07d..6cdd482697a 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync.rs @@ -126,6 +126,12 @@ impl HorizonStateSync { StateEvent::HorizonStateSynchronized }, Err(err) => { + let _ignore = shared.status_event_sender.send(StatusInfo { + bootstrapped, + state_info: StateInfo::SyncFailed("HorizonSyncFailed".to_string()), + randomx_vm_cnt, + randomx_vm_flags, + }); warn!(target: LOG_TARGET, "Synchronizing horizon state has failed. {}", err); StateEvent::HorizonStateSyncFailure }, diff --git a/base_layer/core/src/base_node/sync/block_sync/error.rs b/base_layer/core/src/base_node/sync/block_sync/error.rs index b675dddce9f..77195fff92e 100644 --- a/base_layer/core/src/base_node/sync/block_sync/error.rs +++ b/base_layer/core/src/base_node/sync/block_sync/error.rs @@ -61,3 +61,22 @@ pub enum BlockSyncError { #[error("FixedHash size error: {0}")] FixedHashSizeError(#[from] FixedHashSizeError), } + +impl BlockSyncError { + pub fn to_short_str(&self) -> &'static str { + match self { + BlockSyncError::RpcError(_) => "RpcError", + BlockSyncError::RpcRequestError(_) => "RpcRequestError", + BlockSyncError::ChainStorageError(_) => "ChainStorageError", + BlockSyncError::PeerSentBlockThatDidNotFormAChain { .. } => "PeerSentBlockThatDidNotFormAChain", + BlockSyncError::ConnectivityError(_) => "ConnectivityError", + BlockSyncError::NoSyncPeers => "NoSyncPeers", + BlockSyncError::ValidationError(_) => "ValidationError", + BlockSyncError::FailedToConstructChainBlock => "FailedToConstructChainBlock", + BlockSyncError::ProtocolViolation(_) => "ProtocolViolation", + BlockSyncError::MaxLatencyExceeded { .. } => "MaxLatencyExceeded", + BlockSyncError::AllSyncPeersExceedLatency => "AllSyncPeersExceedLatency", + BlockSyncError::FixedHashSizeError(_) => "FixedHashSizeError", + } + } +} diff --git a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs index 319fd172b7d..baf5706775f 100644 --- a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs @@ -127,6 +127,11 @@ impl BlockSynchronizer { async fn attempt_block_sync(&mut self, max_latency: Duration) -> Result<(), BlockSyncError> { let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::>(); + info!( + target: LOG_TARGET, + "Attempting to sync blocks({} sync peers)", + sync_peer_node_ids.len() + ); for (i, node_id) in sync_peer_node_ids.iter().enumerate() { let sync_peer = &self.sync_peers[i]; self.hooks.call_on_starting_hook(sync_peer); diff --git a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs index 49a6208c802..dc49c72e786 100644 --- a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs @@ -129,6 +129,11 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { #[allow(clippy::too_many_lines)] pub async fn try_sync_from_all_peers(&mut self, max_latency: Duration) -> Result { let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::>(); + info!( + target: LOG_TARGET, + "Attempting to sync headers ({} sync peers)", + sync_peer_node_ids.len() + ); for (i, node_id) in sync_peer_node_ids.iter().enumerate() { { let sync_peer = &self.sync_peers[i]; diff --git a/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs index 697d3bfd92b..06fa4ba45a1 100644 --- a/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs @@ -180,6 +180,11 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { } async fn sync(&mut self, header: &BlockHeader) -> Result<(), HorizonSyncError> { + info!( + target: LOG_TARGET, + "Attempting to sync blocks({} sync peers)", + self.sync_peers.len() + ); for (i, sync_peer) in self.sync_peers.iter().enumerate() { self.hooks.call_on_starting_hook(sync_peer); let mut connection = self.connectivity.dial_peer(sync_peer.node_id().clone()).await?;