Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
hansieodendaal committed Oct 24, 2023
1 parent 0091320 commit 4d23ff9
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 7 deletions.
40 changes: 34 additions & 6 deletions base_layer/core/src/base_node/sync/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::{
convert::{TryFrom, TryInto},
sync::{Arc, Weak},
};
use std::cmp::max;

use log::*;
use tari_common_types::types::FixedHash;
Expand Down Expand Up @@ -60,6 +61,8 @@ use crate::{
SyncUtxosResponse,
},
};
use crate::blocks::HistoricalBlock;
use crate::chain_storage::ChainStorageError;

const LOG_TARGET: &str = "c::base_node::sync_rpc";

Expand Down Expand Up @@ -108,6 +111,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
&self,
request: Request<SyncBlocksRequest>,
) -> Result<Streaming<proto::base_node::BlockBodyResponse>, RpcStatus> {
println!("here 1.1.3.1a - server start");
let peer_node_id = request.context().peer_node_id().clone();
let message = request.into_message();
let mut block_event_stream = self.base_node_service.get_block_event_stream();
Expand Down Expand Up @@ -160,16 +164,28 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
"Initiating block sync with peer `{}` from height {} to {}", peer_node_id, start_height, end_height,
);

let session_token = self.try_add_exclusive_session(peer_node_id).await?;
let session_token = self.try_add_exclusive_session(peer_node_id.clone()).await?;
// Number of blocks to load and push to the stream before loading the next batch
const BATCH_SIZE: usize = 2;
let (tx, rx) = mpsc::channel(BATCH_SIZE);

let span = span!(Level::TRACE, "sync_rpc::block_sync::inner_worker");
let best_block_height = db.get_chain_metadata().await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?.height_of_longest_chain();
let end_height = if end_height > best_block_height {
warn!(
target: LOG_TARGET,
"Peer `{}` requested blocks up to height {}, best block height is {}", peer_node_id, end_height, best_block_height,
);
best_block_height
} else {
end_height
};
let iter = NonOverlappingIntegerPairIter::new(start_height, end_height + 1, BATCH_SIZE)
.map_err(|e| RpcStatus::bad_request(&e))?;
task::spawn(
async move {
println!("here 1.1.3.1b - server stream task spawned");
// Move token into this task
let peer_node_id = session_token;
for (start, end) in iter {
Expand Down Expand Up @@ -210,29 +226,38 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
.fetch_blocks(start..=end, true)
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET));
println!("here 1.1.3.1c - server fetched blocks {} to {}", start, end);

if tx.is_closed() {
debug!(
target: LOG_TARGET,
"Block sync session for peer '{}' terminated early", peer_node_id
);
println!("here 1.1.3.1d - server session terminated early (1)!");
break;
}

match blocks {
Ok(blocks) if blocks.is_empty() => {
println!("here 1.1.3.1e - server no more blocks");
break;
},
Ok(blocks) => {
let blocks = blocks
.into_iter()
.map(|hb| hb.try_into_block().map_err(RpcStatus::log_internal_error(LOG_TARGET)))
.map(|block| match block {
Ok(b) => proto::base_node::BlockBodyResponse::try_from(b).map_err(|e| {
log::error!(target: LOG_TARGET, "Internal error: {}", e);
RpcStatus::general_default()
}),
Err(err) => Err(err),
Ok(b) => {
println!("here 1.1.3.1f - server block {}", b.header.height);
proto::base_node::BlockBodyResponse::try_from(b).map_err(|e| {
log::error!(target: LOG_TARGET, "Internal error: {}", e);
RpcStatus::general_default()
})
},
Err(err) => {
println!("here 1.1.3.1g - server err {}", err);
Err(err)
},
});

// Ensure task stops if the peer prematurely stops their RPC session
Expand All @@ -241,10 +266,12 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
target: LOG_TARGET,
"Block sync session for peer '{}' terminated early", peer_node_id
);
println!("here 1.1.3.1h - server session terminated early (2)!");
break;
}
},
Err(err) => {
println!("here 1.1.3.1i - server err {}", err);
let _result = tx.send(Err(err)).await;
break;
},
Expand All @@ -256,6 +283,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
target: LOG_TARGET,
"Block sync round complete for peer `{}`.", peer_node_id,
);
println!("here 1.1.3.1j - server spawn task completed");
}
.instrument(span),
);
Expand Down
54 changes: 53 additions & 1 deletion base_layer/core/tests/tests/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ async fn test_block_sync_peer_supplies_no_blocks_with_ban() {
_ => panic!("Expected HeadersSynchronized event"),
}

// Alice attempts block sync, Bob will misbehave, but cannot be banned
// Alice attempts block sync, Bob will not send any blocks and be banned
println!();
let mut block_sync = sync::initialize_sync_blocks(&bob_node);
sync::delete_some_blocks_and_headers(&blocks[5..=10], WhatToDelete::Blocks, &bob_node, Some(true));
Expand All @@ -126,3 +126,55 @@ async fn test_block_sync_peer_supplies_no_blocks_with_ban() {
// Bob will be banned
assert!(sync::wait_for_is_peer_banned(&alice_node, bob_node.node_identity.node_id(), 1).await);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_block_sync_peer_supplies_not_all_blocks_with_ban() {
// env_logger::init(); // Set `$env:RUST_LOG = "trace"`

// Create the network with Alice node and Bob node
let (mut alice_state_machine, alice_node, bob_node, initial_block, consensus_manager, key_manager) =
sync::create_network_with_local_and_peer_nodes().await;

// Add some block to Bob's chain
let blocks = sync::create_and_add_some_blocks(
&bob_node,
&initial_block,
10,
&consensus_manager,
&key_manager,
&[3; 10],
)
.await;
assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 10);
// Add blocks to Alice's chain
sync::add_some_existing_blocks(&blocks[1..=5], &alice_node);
assert_eq!(alice_node.blockchain_db.get_height().unwrap(), 5);

// Alice attempts header sync
let mut header_sync = sync::initialize_sync_headers_with_ping_pong_data(&alice_node, &bob_node);
let event = sync::sync_headers_execute(&mut alice_state_machine, &mut header_sync).await;
match event.clone() {
StateEvent::HeadersSynchronized(_) => {
// Good, headers are synced
},
_ => panic!("Expected HeadersSynchronized event"),
}

// Alice attempts block sync, Bob will not send all blocks and be banned
println!();
let mut block_sync = sync::initialize_sync_blocks(&bob_node);
sync::delete_some_blocks_and_headers(&blocks[8..=10], WhatToDelete::Blocks, &bob_node, Some(true));
assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 8);
let event = sync::sync_blocks_execute(&mut alice_state_machine, &mut block_sync).await;
println!("event: {:?}", event);
match event {
StateEvent::BlockSyncFailed => {
// Good, Bob is banned.
},
_ => panic!("Expected BlockSyncFailed event"),
}
assert_eq!(alice_node.blockchain_db.get_height().unwrap(), 8);

// Bob will be banned
assert!(sync::wait_for_is_peer_banned(&alice_node, bob_node.node_identity.node_id(), 1).await);
}

0 comments on commit 4d23ff9

Please sign in to comment.