diff --git a/base_layer/core/src/base_node/sync/rpc/service.rs b/base_layer/core/src/base_node/sync/rpc/service.rs index bee93f810e9..31e0b248e6c 100644 --- a/base_layer/core/src/base_node/sync/rpc/service.rs +++ b/base_layer/core/src/base_node/sync/rpc/service.rs @@ -25,6 +25,7 @@ use std::{ convert::{TryFrom, TryInto}, sync::{Arc, Weak}, }; +use std::cmp::max; use log::*; use tari_common_types::types::FixedHash; @@ -60,6 +61,8 @@ use crate::{ SyncUtxosResponse, }, }; +use crate::blocks::HistoricalBlock; +use crate::chain_storage::ChainStorageError; const LOG_TARGET: &str = "c::base_node::sync_rpc"; @@ -108,6 +111,7 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ &self, request: Request, ) -> Result, RpcStatus> { + println!("here 1.1.3.1a - server start"); let peer_node_id = request.context().peer_node_id().clone(); let message = request.into_message(); let mut block_event_stream = self.base_node_service.get_block_event_stream(); @@ -160,16 +164,28 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ "Initiating block sync with peer `{}` from height {} to {}", peer_node_id, start_height, end_height, ); - let session_token = self.try_add_exclusive_session(peer_node_id).await?; + let session_token = self.try_add_exclusive_session(peer_node_id.clone()).await?; // Number of blocks to load and push to the stream before loading the next batch const BATCH_SIZE: usize = 2; let (tx, rx) = mpsc::channel(BATCH_SIZE); let span = span!(Level::TRACE, "sync_rpc::block_sync::inner_worker"); + let best_block_height = db.get_chain_metadata().await + .map_err(RpcStatus::log_internal_error(LOG_TARGET))?.height_of_longest_chain(); + let end_height = if end_height > best_block_height { + warn!( + target: LOG_TARGET, + "Peer `{}` requested blocks up to height {}, best block height is {}", peer_node_id, end_height, best_block_height, + ); + best_block_height + } else { + end_height + }; let iter = NonOverlappingIntegerPairIter::new(start_height, end_height + 1, BATCH_SIZE) .map_err(|e| RpcStatus::bad_request(&e))?; task::spawn( async move { + println!("here 1.1.3.1b - server stream task spawned"); // Move token into this task let peer_node_id = session_token; for (start, end) in iter { @@ -210,17 +226,20 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ .fetch_blocks(start..=end, true) .await .map_err(RpcStatus::log_internal_error(LOG_TARGET)); + println!("here 1.1.3.1c - server fetched blocks {} to {}", start, end); if tx.is_closed() { debug!( target: LOG_TARGET, "Block sync session for peer '{}' terminated early", peer_node_id ); + println!("here 1.1.3.1d - server session terminated early (1)!"); break; } match blocks { Ok(blocks) if blocks.is_empty() => { + println!("here 1.1.3.1e - server no more blocks"); break; }, Ok(blocks) => { @@ -228,11 +247,17 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ .into_iter() .map(|hb| hb.try_into_block().map_err(RpcStatus::log_internal_error(LOG_TARGET))) .map(|block| match block { - Ok(b) => proto::base_node::BlockBodyResponse::try_from(b).map_err(|e| { - log::error!(target: LOG_TARGET, "Internal error: {}", e); - RpcStatus::general_default() - }), - Err(err) => Err(err), + Ok(b) => { + println!("here 1.1.3.1f - server block {}", b.header.height); + proto::base_node::BlockBodyResponse::try_from(b).map_err(|e| { + log::error!(target: LOG_TARGET, "Internal error: {}", e); + RpcStatus::general_default() + }) + }, + Err(err) => { + println!("here 1.1.3.1g - server err {}", err); + Err(err) + }, }); // Ensure task stops if the peer prematurely stops their RPC session @@ -241,10 +266,12 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ target: LOG_TARGET, "Block sync session for peer '{}' terminated early", peer_node_id ); + println!("here 1.1.3.1h - server session terminated early (2)!"); break; } }, Err(err) => { + println!("here 1.1.3.1i - server err {}", err); let _result = tx.send(Err(err)).await; break; }, @@ -256,6 +283,7 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ target: LOG_TARGET, "Block sync round complete for peer `{}`.", peer_node_id, ); + println!("here 1.1.3.1j - server spawn task completed"); } .instrument(span), ); diff --git a/base_layer/core/tests/tests/block_sync.rs b/base_layer/core/tests/tests/block_sync.rs index 1b09479c29b..7abd66e621e 100644 --- a/base_layer/core/tests/tests/block_sync.rs +++ b/base_layer/core/tests/tests/block_sync.rs @@ -108,7 +108,7 @@ async fn test_block_sync_peer_supplies_no_blocks_with_ban() { _ => panic!("Expected HeadersSynchronized event"), } - // Alice attempts block sync, Bob will misbehave, but cannot be banned + // Alice attempts block sync, Bob will not send any blocks and be banned println!(); let mut block_sync = sync::initialize_sync_blocks(&bob_node); sync::delete_some_blocks_and_headers(&blocks[5..=10], WhatToDelete::Blocks, &bob_node, Some(true)); @@ -126,3 +126,55 @@ async fn test_block_sync_peer_supplies_no_blocks_with_ban() { // Bob will be banned assert!(sync::wait_for_is_peer_banned(&alice_node, bob_node.node_identity.node_id(), 1).await); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_block_sync_peer_supplies_not_all_blocks_with_ban() { + // env_logger::init(); // Set `$env:RUST_LOG = "trace"` + + // Create the network with Alice node and Bob node + let (mut alice_state_machine, alice_node, bob_node, initial_block, consensus_manager, key_manager) = + sync::create_network_with_local_and_peer_nodes().await; + + // Add some block to Bob's chain + let blocks = sync::create_and_add_some_blocks( + &bob_node, + &initial_block, + 10, + &consensus_manager, + &key_manager, + &[3; 10], + ) + .await; + assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 10); + // Add blocks to Alice's chain + sync::add_some_existing_blocks(&blocks[1..=5], &alice_node); + assert_eq!(alice_node.blockchain_db.get_height().unwrap(), 5); + + // Alice attempts header sync + let mut header_sync = sync::initialize_sync_headers_with_ping_pong_data(&alice_node, &bob_node); + let event = sync::sync_headers_execute(&mut alice_state_machine, &mut header_sync).await; + match event.clone() { + StateEvent::HeadersSynchronized(_) => { + // Good, headers are synced + }, + _ => panic!("Expected HeadersSynchronized event"), + } + + // Alice attempts block sync, Bob will not send all blocks and be banned + println!(); + let mut block_sync = sync::initialize_sync_blocks(&bob_node); + sync::delete_some_blocks_and_headers(&blocks[8..=10], WhatToDelete::Blocks, &bob_node, Some(true)); + assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 8); + let event = sync::sync_blocks_execute(&mut alice_state_machine, &mut block_sync).await; + println!("event: {:?}", event); + match event { + StateEvent::BlockSyncFailed => { + // Good, Bob is banned. + }, + _ => panic!("Expected BlockSyncFailed event"), + } + assert_eq!(alice_node.blockchain_db.get_height().unwrap(), 8); + + // Bob will be banned + assert!(sync::wait_for_is_peer_banned(&alice_node, bob_node.node_identity.node_id(), 1).await); +}