From dd544cb9c8907b05d4aea937f247c087c6484de6 Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Fri, 25 Mar 2022 16:35:46 +0400 Subject: [PATCH] fix(sync): adds extra checks for sync stream termination (#3927) Description --- - adds extra checks in block sync RPC service for premature stream termination - uses rolling average for slow peer detection in header and horizon sync - prevent more than one UTXO sync session per peer Motivation and Context --- Peers may end a sync session and quickly initiate a new one. Because the previous session could be still sending the batch of data for the previous session, the new session will be denied until the data has been fetched. This PR adds additional checks after loading data from the database but before sending it to check if the session has ended. However, this race condition still exists as it is inherent to the rule of only allowing one sync session per peer. In my tests with 600ms max permitted sync latency I did not manage to trigger the `Forbidden: Existing sync session found for this client. Only a single session is permitted` error, so this may still be an issue. But these changes should lessen the chance of this happening. How Has This Been Tested? --- Manually, existing tests --- .../base_node/sync/block_sync/synchronizer.rs | 17 ++++---- .../sync/header_sync/synchronizer.rs | 19 ++++++--- .../sync/horizon_state_sync/synchronizer.rs | 39 +++++++++++------- .../core/src/base_node/sync/rpc/service.rs | 40 ++++++++++++++++--- .../src/base_node/sync/rpc/sync_utxos_task.rs | 35 ++++++++++++---- base_layer/core/src/common/rolling_avg.rs | 7 ++++ comms/src/protocol/rpc/client/mod.rs | 5 +-- 7 files changed, 119 insertions(+), 43 deletions(-) diff --git a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs index 50489b768a..9fd37a12c2 100644 --- a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs @@ -188,6 +188,7 @@ impl BlockSynchronizer { mut client: rpc::BaseNodeSyncRpcClient, max_latency: Duration, ) -> Result<(), BlockSyncError> { + info!(target: LOG_TARGET, "Starting block sync from peer {}", sync_peer); self.hooks.call_on_starting_hook(); let tip_header = self.db.fetch_last_header().await?; @@ -320,7 +321,7 @@ impl BlockSynchronizer { .await?; // Average time between receiving blocks from the peer - used to detect a slow sync peer - let last_avg_latency = avg_latency.calculate_average(); + let last_avg_latency = avg_latency.calculate_average_with_min_samples(5); if let Some(latency) = last_avg_latency { sync_peer.set_latency(latency); } @@ -342,12 +343,14 @@ impl BlockSynchronizer { block.accumulated_data().accumulated_sha_difficulty, latency ); - if last_avg_latency.map(|avg| avg > max_latency).unwrap_or(false) { - return Err(BlockSyncError::MaxLatencyExceeded { - peer: sync_peer.node_id().clone(), - latency, - max_latency, - }); + if let Some(avg_latency) = last_avg_latency { + if avg_latency > max_latency { + return Err(BlockSyncError::MaxLatencyExceeded { + peer: sync_peer.node_id().clone(), + latency: avg_latency, + max_latency, + }); + } } current_block = Some(block); diff --git a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs index da6a073dd8..f4edf826cb 100644 --- a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs @@ -42,6 +42,7 @@ use crate::{ base_node::sync::{hooks::Hooks, rpc, BlockchainSyncConfig, SyncPeer}, blocks::{BlockHeader, ChainBlock, ChainHeader}, chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend}, + common::rolling_avg::RollingAverageTime, consensus::ConsensusManager, proof_of_work::randomx_factory::RandomXFactory, proto::{ @@ -553,6 +554,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { split_info: ChainSplitInfo, max_latency: Duration, ) -> Result<(), BlockHeaderSyncError> { + info!(target: LOG_TARGET, "Starting header sync from peer {}", sync_peer); const COMMIT_EVERY_N_HEADERS: usize = 1000; let mut has_switched_to_new_chain = false; @@ -626,8 +628,10 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { let mut last_sync_timer = Instant::now(); let mut last_total_accumulated_difficulty = 0; + let mut avg_latency = RollingAverageTime::new(20); while let Some(header) = header_stream.next().await { let latency = last_sync_timer.elapsed(); + avg_latency.add_sample(latency); let header = BlockHeader::try_from(header?).map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?; debug!( target: LOG_TARGET, @@ -672,12 +676,15 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { self.hooks .call_on_progress_header_hooks(current_height, split_info.remote_tip_height, &sync_peer); - if latency > max_latency { - return Err(BlockHeaderSyncError::MaxLatencyExceeded { - peer: sync_peer.node_id().clone(), - latency, - max_latency, - }); + let last_avg_latency = avg_latency.calculate_average_with_min_samples(5); + if let Some(avg_latency) = last_avg_latency { + if avg_latency > max_latency { + return Err(BlockHeaderSyncError::MaxLatencyExceeded { + peer: sync_peer.node_id().clone(), + latency: avg_latency, + max_latency, + }); + } } last_sync_timer = Instant::now(); diff --git a/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs index dacc8d8512..3334120111 100644 --- a/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs @@ -32,7 +32,7 @@ use croaring::Bitmap; use futures::{stream::FuturesUnordered, StreamExt}; use log::*; use tari_common_types::types::{Commitment, HashDigest, RangeProofService}; -use tari_comms::connectivity::ConnectivityRequester; +use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeId}; use tari_crypto::{ commitment::HomomorphicCommitment, tari_utilities::{hex::Hex, Hashable}, @@ -58,6 +58,7 @@ use crate::{ MmrTree, PrunedOutput, }, + common::rolling_avg::RollingAverageTime, consensus::ConsensusManager, proto::base_node::{ sync_utxo as proto_sync_utxo, @@ -236,6 +237,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { client: &mut rpc::BaseNodeSyncRpcClient, to_header: &BlockHeader, ) -> Result<(), HorizonSyncError> { + info!(target: LOG_TARGET, "Starting kernel sync from peer {}", sync_peer); let local_num_kernels = self.db().fetch_mmr_size(MmrTree::Kernel).await?; let remote_num_kernels = to_header.kernel_mmr_size; @@ -288,8 +290,10 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { let mut mmr_position = local_num_kernels; let end = remote_num_kernels; let mut last_sync_timer = Instant::now(); + let mut avg_latency = RollingAverageTime::new(20); while let Some(kernel) = kernel_stream.next().await { let latency = last_sync_timer.elapsed(); + avg_latency.add_sample(latency); let kernel: TransactionKernel = kernel?.try_into().map_err(HorizonSyncError::ConversionError)?; kernel .verify_signature() @@ -368,13 +372,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { self.hooks.call_on_progress_horizon_hooks(info); } - if latency > self.max_latency { - return Err(HorizonSyncError::MaxLatencyExceeded { - peer: sync_peer.node_id().clone(), - latency, - max_latency: self.max_latency, - }); - } + self.check_latency(sync_peer.node_id(), &avg_latency)?; last_sync_timer = Instant::now(); } @@ -387,12 +385,27 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { Ok(()) } + fn check_latency(&self, peer: &NodeId, avg_latency: &RollingAverageTime) -> Result<(), HorizonSyncError> { + if let Some(avg_latency) = avg_latency.calculate_average_with_min_samples(5) { + if avg_latency > self.max_latency { + return Err(HorizonSyncError::MaxLatencyExceeded { + peer: peer.clone(), + latency: avg_latency, + max_latency: self.max_latency, + }); + } + } + + Ok(()) + } + async fn synchronize_outputs( &mut self, mut sync_peer: SyncPeer, client: &mut rpc::BaseNodeSyncRpcClient, to_header: &BlockHeader, ) -> Result<(), HorizonSyncError> { + info!(target: LOG_TARGET, "Starting output sync from peer {}", sync_peer); let local_num_outputs = self.db().fetch_mmr_size(MmrTree::Utxo).await?; let remote_num_outputs = to_header.output_mmr_size; @@ -466,9 +479,11 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { let mut witness_mmr = MerkleMountainRange::::new(witness_pruned_set); let mut constants = self.rules.consensus_constants(current_header.height()).clone(); let mut last_sync_timer = Instant::now(); + let mut avg_latency = RollingAverageTime::new(20); while let Some(response) = output_stream.next().await { let latency = last_sync_timer.elapsed(); + avg_latency.add_sample(latency); let res: SyncUtxosResponse = response?; if res.mmr_index != 0 && res.mmr_index != mmr_position { @@ -658,13 +673,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { self.hooks.call_on_progress_horizon_hooks(info); } - if latency > self.max_latency { - return Err(HorizonSyncError::MaxLatencyExceeded { - peer: sync_peer.node_id().clone(), - latency, - max_latency: self.max_latency, - }); - } + self.check_latency(sync_peer.node_id(), &avg_latency)?; last_sync_timer = Instant::now(); } diff --git a/base_layer/core/src/base_node/sync/rpc/service.rs b/base_layer/core/src/base_node/sync/rpc/service.rs index d328f95677..636c8e5200 100644 --- a/base_layer/core/src/base_node/sync/rpc/service.rs +++ b/base_layer/core/src/base_node/sync/rpc/service.rs @@ -203,6 +203,14 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ .await .map_err(RpcStatus::log_internal_error(LOG_TARGET)); + if tx.is_closed() { + debug!( + target: LOG_TARGET, + "Block sync session for peer '{}' terminated early", peer_node_id + ); + break; + } + match blocks { Ok(blocks) if blocks.is_empty() => { break; @@ -226,6 +234,10 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ // Ensure task stops if the peer prematurely stops their RPC session if utils::mpsc::send_all(&tx, blocks).await.is_err() { + debug!( + target: LOG_TARGET, + "Block sync session for peer '{}' terminated early", peer_node_id + ); break; } }, @@ -288,7 +300,7 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ task::spawn( async move { // Move token into this task - let session_token = session_token; + let peer_node_id = session_token; let iter = NonOverlappingIntegerPairIter::new( start_header.height + 1, start_header.height.saturating_add(count).saturating_add(1), @@ -304,6 +316,13 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ .await .map_err(RpcStatus::log_internal_error(LOG_TARGET)); + if tx.is_closed() { + debug!( + target: LOG_TARGET, + "Header sync session for peer '{}' terminated early", peer_node_id + ); + break; + } match headers { Ok(headers) if headers.is_empty() => { break; @@ -325,7 +344,7 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ metrics::active_sync_peers().dec(); debug!( target: LOG_TARGET, - "Header sync round complete for peer `{}`.", session_token, + "Header sync round complete for peer `{}`.", peer_node_id, ); } .instrument(span), @@ -453,6 +472,8 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ let session_token = self.try_add_exclusive_session(peer_node_id).await?; task::spawn(async move { + // Move session token into task + let peer_node_id = session_token; while current_height <= end_height { if tx.is_closed() { break; @@ -461,6 +482,15 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ .fetch_kernels_in_block(current_header_hash.clone()) .await .map_err(RpcStatus::log_internal_error(LOG_TARGET)); + + if tx.is_closed() { + debug!( + target: LOG_TARGET, + "Kernel sync session for peer '{}' terminated early", peer_node_id + ); + break; + } + match res { Ok(kernels) if kernels.is_empty() => { let _ = tx @@ -525,7 +555,7 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ metrics::active_sync_peers().dec(); debug!( target: LOG_TARGET, - "Kernel sync round complete for peer `{}`.", session_token, + "Kernel sync round complete for peer `{}`.", peer_node_id, ); }); Ok(Streaming::new(rx)) @@ -546,9 +576,9 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ req.include_deleted_bitmaps ); - let _session_token = self.try_add_exclusive_session(peer_node_id.clone()).await?; + let session_token = self.try_add_exclusive_session(peer_node_id.clone()).await?; let (tx, rx) = mpsc::channel(200); - let task = SyncUtxosTask::new(self.db()); + let task = SyncUtxosTask::new(self.db(), session_token); task.run(request, tx).await?; Ok(Streaming::new(rx)) diff --git a/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs b/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs index 2c45068bce..43177183f9 100644 --- a/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs +++ b/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs @@ -24,6 +24,7 @@ use std::{sync::Arc, time::Instant}; use log::*; use tari_comms::{ + peer_manager::NodeId, protocol::rpc::{Request, RpcStatus, RpcStatusResultExt}, utils, }; @@ -42,13 +43,14 @@ const LOG_TARGET: &str = "c::base_node::sync_rpc::sync_utxo_task"; pub(crate) struct SyncUtxosTask { db: AsyncBlockchainDb, + peer_node_id: Arc, } impl SyncUtxosTask where B: BlockchainBackend + 'static { - pub(crate) fn new(db: AsyncBlockchainDb) -> Self { - Self { db } + pub(crate) fn new(db: AsyncBlockchainDb, peer_node_id: Arc) -> Self { + Self { db, peer_node_id } } pub(crate) async fn run( @@ -56,7 +58,6 @@ where B: BlockchainBackend + 'static request: Request, mut tx: mpsc::Sender>, ) -> Result<(), RpcStatus> { - let peer = request.context().peer_node_id().clone(); let msg = request.into_message(); let start_header = self .db @@ -105,7 +106,10 @@ where B: BlockchainBackend + 'static let include_pruned_utxos = msg.include_pruned_utxos; let include_deleted_bitmaps = msg.include_deleted_bitmaps; task::spawn(async move { - debug!(target: LOG_TARGET, "Starting UTXO stream for peer '{}'", peer); + debug!( + target: LOG_TARGET, + "Starting UTXO stream for peer '{}'", self.peer_node_id + ); if let Err(err) = self .start_streaming( &mut tx, @@ -118,10 +122,16 @@ where B: BlockchainBackend + 'static ) .await { - debug!(target: LOG_TARGET, "UTXO stream errored for peer '{}': {}", peer, err); + debug!( + target: LOG_TARGET, + "UTXO stream errored for peer '{}': {}", self.peer_node_id, err + ); let _ = tx.send(Err(err)).await; } - debug!(target: LOG_TARGET, "UTXO stream completed for peer '{}'", peer); + debug!( + target: LOG_TARGET, + "UTXO stream completed for peer '{}'", self.peer_node_id + ); metrics::active_sync_peers().dec(); }); @@ -178,7 +188,10 @@ where B: BlockchainBackend + 'static let end = current_header.output_mmr_size; if tx.is_closed() { - debug!(target: LOG_TARGET, "Exiting sync_utxos early because client has gone",); + debug!( + target: LOG_TARGET, + "Peer '{}' exited UTXO sync session early", self.peer_node_id + ); break; } @@ -196,6 +209,14 @@ where B: BlockchainBackend + 'static current_header.height, deleted_diff.cardinality(), ); + if tx.is_closed() { + debug!( + target: LOG_TARGET, + "Peer '{}' exited UTXO sync session early", self.peer_node_id + ); + break; + } + let utxos = utxos .into_iter() .skip(skip_outputs as usize) diff --git a/base_layer/core/src/common/rolling_avg.rs b/base_layer/core/src/common/rolling_avg.rs index 9230eb9667..fdc64d5352 100644 --- a/base_layer/core/src/common/rolling_avg.rs +++ b/base_layer/core/src/common/rolling_avg.rs @@ -59,4 +59,11 @@ impl RollingAverageTime { u64::try_from(total_time.as_nanos()).unwrap_or(u64::MAX) / self.samples.len() as u64, )) } + + pub fn calculate_average_with_min_samples(&self, min_samples: usize) -> Option { + if self.samples.len() < min_samples { + return None; + } + self.calculate_average() + } } diff --git a/comms/src/protocol/rpc/client/mod.rs b/comms/src/protocol/rpc/client/mod.rs index 18f55b53d1..5da98cdc4b 100644 --- a/comms/src/protocol/rpc/client/mod.rs +++ b/comms/src/protocol/rpc/client/mod.rs @@ -717,10 +717,9 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId match Self::convert_to_result(resp) { Ok(Ok(resp)) => { - // The consumer may drop the receiver before all responses are received. - // We just ignore that as we still want obey the protocol and receive messages until the FIN flag or - // the connection is dropped let is_finished = resp.is_finished(); + // The consumer may drop the receiver before all responses are received. + // We handle this by sending a 'FIN' message to the server. if response_tx.is_closed() { warn!( target: LOG_TARGET,