From 2987621b2cef6d3b852ed9a1f4215f19b9838e0f Mon Sep 17 00:00:00 2001 From: Hansie Odendaal <39146854+hansieodendaal@users.noreply.github.com> Date: Fri, 2 Feb 2024 14:17:19 +0200 Subject: [PATCH] feat: initial horizon sync from prune node (#6109) Description --- - Implemented initial horizon sync from prune node. - Added an integration-level unit test to ensure it works as intended. Motivation and Context --- This will be more efficient for the network in total How Has This Been Tested? --- New integration-level unit test System-level sync tests from archival node and from prune node What process can a PR reviewer use to test or verify this change? --- See integration-level unit test Perform system-level sync test Breaking Changes --- - [ ] None - [ ] Requires data directory on base node to be deleted - [ ] Requires hard fork - [X] Other - Please specify --- .../states/sync_decide.rs | 20 +- .../src/base_node/sync/rpc/sync_utxos_task.rs | 42 ++++ base_layer/core/src/chain_storage/async_db.rs | 4 + .../src/chain_storage/blockchain_database.rs | 19 +- base_layer/core/tests/tests/horizon_sync.rs | 187 +++++++++++++++++- 5 files changed, 246 insertions(+), 26 deletions(-) diff --git a/base_layer/core/src/base_node/state_machine_service/states/sync_decide.rs b/base_layer/core/src/base_node/state_machine_service/states/sync_decide.rs index e9bfbbde52..ad853601c8 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/sync_decide.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/sync_decide.rs @@ -70,20 +70,12 @@ impl DecideNextSync { .filter(|sync_peer| { let remote_metadata = sync_peer.claimed_chain_metadata(); debug!(target: LOG_TARGET, "Peer metadata: {}", remote_metadata); - let remote_is_archival_node = remote_metadata.pruned_height() == 0; - let general_sync_conditions = - // Must be able to provide the correct amount of full blocks past the pruned height (i.e. the - // pruning horizon), otherwise our horizon spec will not be met - remote_metadata.best_block_height().saturating_sub(remote_metadata.pruned_height()) >= - local_metadata.pruning_horizon() && - // Must have a better blockchain tip than us - remote_metadata.best_block_height() > local_metadata.best_block_height() && - // Must be able to provide full blocks from the height we need detailed information - remote_metadata.pruned_height() <= local_metadata.best_block_height(); - let sync_from_prune_node = !remote_is_archival_node && - // Must have done initial sync (to detect genesis TXO spends) - local_metadata.best_block_height() > 0; - general_sync_conditions && (remote_is_archival_node || sync_from_prune_node) + // Must be able to provide the correct amount of full blocks past the pruned height (i.e. the + // pruning horizon), otherwise our horizon spec will not be met + remote_metadata.best_block_height().saturating_sub(remote_metadata.pruned_height()) >= + local_metadata.pruning_horizon() && + // Must have a better blockchain tip than us + remote_metadata.best_block_height() > local_metadata.best_block_height() }) .collect::>(); diff --git a/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs b/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs index f6c992f0a9..67890f56be 100644 --- a/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs +++ b/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs @@ -132,6 +132,39 @@ where B: BlockchainBackend + 'static current_header.hash().to_hex(), end_header.hash().to_hex(), ); + + // If this is a pruned node and outputs have been requested for an initial sync, we need to discover and send + // the outputs from the genesis block that have been pruned as well + let mut pruned_genesis_block_outputs = Vec::new(); + let metadata = self + .db + .get_chain_metadata() + .await + .rpc_status_internal_error(LOG_TARGET)?; + if current_header.height == 1 && metadata.is_pruned_node() { + let genesis_block = self.db.fetch_genesis_block(); + for output in genesis_block.block().body.outputs() { + let output_hash = output.hash(); + if self + .db + .fetch_output(output_hash) + .await + .rpc_status_internal_error(LOG_TARGET)? + .is_none() + { + trace!( + target: LOG_TARGET, + "Spent genesis TXO (commitment '{}') to peer", + output.commitment.to_hex() + ); + pruned_genesis_block_outputs.push(Ok(SyncUtxosResponse { + txo: Some(Txo::Commitment(output.commitment.as_bytes().to_vec())), + mined_header: current_header.hash().to_vec(), + })); + } + } + } + let start_header = current_header.clone(); loop { let timer = Instant::now(); @@ -248,6 +281,15 @@ where B: BlockchainBackend + 'static let mut txos = Vec::with_capacity(outputs.len() + inputs.len()); txos.append(&mut outputs); txos.append(&mut inputs); + if start_header == current_header { + debug!( + target: LOG_TARGET, + "Adding {} genesis block pruned inputs in response for block #{} '{}'", pruned_genesis_block_outputs.len(), + current_header.height, + current_header_hash + ); + txos.append(&mut pruned_genesis_block_outputs); + } let txos = txos.into_iter(); // Ensure task stops if the peer prematurely stops their RPC session diff --git a/base_layer/core/src/chain_storage/async_db.rs b/base_layer/core/src/chain_storage/async_db.rs index 5d15a9b668..71af766fca 100644 --- a/base_layer/core/src/chain_storage/async_db.rs +++ b/base_layer/core/src/chain_storage/async_db.rs @@ -143,6 +143,10 @@ impl AsyncBlockchainDb { pub fn inner(&self) -> &BlockchainDatabase { &self.db } + + pub fn fetch_genesis_block(&self) -> ChainBlock { + self.db.fetch_genesis_block() + } } impl AsyncBlockchainDb { diff --git a/base_layer/core/src/chain_storage/blockchain_database.rs b/base_layer/core/src/chain_storage/blockchain_database.rs index b8aa04e586..41d4655f78 100644 --- a/base_layer/core/src/chain_storage/blockchain_database.rs +++ b/base_layer/core/src/chain_storage/blockchain_database.rs @@ -301,6 +301,11 @@ where B: BlockchainBackend Ok(blockchain_db) } + /// Get the genesis block form the consensus manager + pub fn fetch_genesis_block(&self) -> ChainBlock { + self.consensus_manager.get_genesis_block() + } + /// Returns a reference to the consensus cosntants at the current height pub fn consensus_constants(&self) -> Result<&ConsensusConstants, ChainStorageError> { let height = self.get_height()?; @@ -2363,18 +2368,18 @@ fn prune_database_if_needed( return Ok(()); } - let db_height = metadata.best_block_height(); - let abs_pruning_horizon = db_height.saturating_sub(pruning_horizon); - + let prune_to_height_target = metadata.best_block_height().saturating_sub(pruning_horizon); debug!( target: LOG_TARGET, - "Current pruned height is: {}, pruning horizon is: {}, while the pruning interval is: {}", + "Blockchain height: {}, pruning horizon: {}, pruned height: {}, prune to height target: {}, pruning interval: {}", + metadata.best_block_height(), + metadata.pruning_horizon(), metadata.pruned_height(), - abs_pruning_horizon, + prune_to_height_target, pruning_interval, ); - if metadata.pruned_height() < abs_pruning_horizon.saturating_sub(pruning_interval) { - prune_to_height(db, abs_pruning_horizon)?; + if metadata.pruned_height() < prune_to_height_target.saturating_sub(pruning_interval) { + prune_to_height(db, prune_to_height_target)?; } Ok(()) diff --git a/base_layer/core/tests/tests/horizon_sync.rs b/base_layer/core/tests/tests/horizon_sync.rs index df83120a60..c1be254adf 100644 --- a/base_layer/core/tests/tests/horizon_sync.rs +++ b/base_layer/core/tests/tests/horizon_sync.rs @@ -34,9 +34,9 @@ use crate::helpers::{ #[allow(clippy::too_many_lines)] #[tokio::test(flavor = "multi_thread", worker_threads = 1)] -async fn test_horizon_sync_from_archival_node_happy_path() { +async fn test_initial_horizon_sync_from_archival_node_happy_path() { //` cargo test --release --test core_integration_tests - //` tests::horizon_sync::test_horizon_sync_from_archival_node_happy_path > .\target\output.txt 2>&1 + //` tests::horizon_sync::test_initial_horizon_sync_from_archival_node_happy_path > .\target\output.txt 2>&1 // env_logger::init(); // Set `$env:RUST_LOG = "trace"` // Create the network with Alice (pruning node) and Bob (archival node) @@ -285,9 +285,9 @@ async fn test_horizon_sync_from_archival_node_happy_path() { #[allow(clippy::too_many_lines)] #[tokio::test(flavor = "multi_thread", worker_threads = 1)] -async fn test_horizon_sync_from_prune_node_happy_path() { +async fn test_consecutive_horizon_sync_from_prune_node_happy_path() { //` cargo test --release --test core_integration_tests - //` tests::horizon_sync::test_horizon_sync_from_prune_node_happy_path > .\target\output.txt 2>&1 + //` tests::horizon_sync::test_initial_horizon_sync_from_prune_node_happy_path > .\target\output.txt 2>&1 // env_logger::init(); // Set `$env:RUST_LOG = "trace"` // Create the network with Alice (pruning node) and Bob (archival node) and Carol (pruning node) @@ -657,8 +657,185 @@ async fn test_horizon_sync_from_prune_node_happy_path() { alice_node.blockchain_db.get_height().unwrap(), alice_header_height - pruning_horizon_alice ); + // Carol will not be banned + assert!(!sync::wait_for_is_peer_banned(&alice_node, carol_node.node_identity.node_id(), 1).await); +} + +#[allow(clippy::too_many_lines)] +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_initial_horizon_sync_from_prune_node_happy_path() { + //` cargo test --release --test core_integration_tests + //` tests::horizon_sync::test_initial_horizon_sync_from_prune_node_happy_path > .\target\output.txt 2>&1 + // env_logger::init(); // Set `$env:RUST_LOG = "trace"` + + // Create the network with Alice (pruning node) and Bob (archival node) and Carol (pruning node) + let pruning_horizon_alice = 4; + let pruning_horizon_carol = 12; + let (mut state_machines, mut peer_nodes, initial_block, consensus_manager, key_manager, initial_coinbase) = + sync::create_network_with_multiple_nodes(vec![ + // Alice is a pruned node + BlockchainDatabaseConfig { + orphan_storage_capacity: 5, + pruning_horizon: pruning_horizon_alice, + pruning_interval: 5, + track_reorgs: false, + cleanup_orphans_at_startup: false, + }, + // Carol is a pruned node + BlockchainDatabaseConfig { + orphan_storage_capacity: 5, + pruning_horizon: pruning_horizon_carol, + pruning_interval: 5, + track_reorgs: false, + cleanup_orphans_at_startup: false, + }, + // Bob is an archival node + BlockchainDatabaseConfig::default(), + ]) + .await; + let mut alice_state_machine = state_machines.remove(0); + let mut carol_state_machine = state_machines.remove(0); + let alice_node = peer_nodes.remove(0); + let carol_node = peer_nodes.remove(0); + let bob_node = peer_nodes.remove(0); + + // Create a blockchain that spends the genesys coinbase early on and then later spends some more coinbase outputs + let follow_up_coinbases_to_spend = 5; + let (_blocks, _coinbases) = sync::create_block_chain_with_transactions( + &bob_node, + &initial_block, + &initial_coinbase, + &consensus_manager, + &key_manager, + min(pruning_horizon_alice, pruning_horizon_carol), + 28, // > follow_up_transaction_in_block + pruning_horizon_carol + 1 + 2, // < pruning_horizon_alice, < pruning_horizon_carol + 14, // > pruning_horizon_alice, > pruning_horizon_carol + follow_up_coinbases_to_spend, // > spend_genesis_coinbase_in_block - 1, < follow_up_transaction_in_block + ) + .await; + + // 1. Carol attempts initial horizon sync from Bob archival node (to pruning height 16) + println!("\n1. Carol attempts initial horizon sync from Bob archival node (to pruning height 16)\n"); + + let output_hash = initial_coinbase.hash(&key_manager).await.unwrap(); + assert!(carol_node.blockchain_db.fetch_output(output_hash).unwrap().is_some()); + let commitment = initial_coinbase.commitment(&key_manager).await.unwrap(); + assert!(carol_node + .blockchain_db + .fetch_unspent_output_hash_by_commitment(commitment.clone()) + .unwrap() + .is_some()); + + let mut header_sync_carol_from_bob = sync::initialize_sync_headers_with_ping_pong_data(&carol_node, &bob_node); + let event = sync::sync_headers_execute(&mut carol_state_machine, &mut header_sync_carol_from_bob).await; + let carol_header_height = carol_node.blockchain_db.fetch_last_header().unwrap().height; + println!("Event: {} to header {}", state_event(&event), carol_header_height); + assert_eq!(carol_header_height, 28); + let event = decide_horizon_sync(&mut carol_state_machine, header_sync_carol_from_bob).await; + let mut horizon_sync = match event { + StateEvent::ProceedToHorizonSync(sync_peers) => HorizonStateSync::from(sync_peers), + _ => panic!("1. Carol should proceed to horizon sync"), + }; + let event = sync::horizon_sync_execute(&mut carol_state_machine, &mut horizon_sync).await; + + println!( + "Event: {} to block {}", + state_event(&event), + carol_node.blockchain_db.get_height().unwrap() + ); + assert_eq!(event, StateEvent::HorizonStateSynchronized); + assert_eq!( + carol_node.blockchain_db.get_height().unwrap(), + carol_header_height - pruning_horizon_carol + ); + + assert!(carol_node.blockchain_db.fetch_output(output_hash).unwrap().is_none()); + assert!(carol_node + .blockchain_db + .fetch_unspent_output_hash_by_commitment(commitment.clone()) + .unwrap() + .is_none()); + // Bob will not be banned - assert!(!sync::wait_for_is_peer_banned(&alice_node, bob_node.node_identity.node_id(), 1).await); + assert!(!sync::wait_for_is_peer_banned(&carol_node, bob_node.node_identity.node_id(), 1).await); + + // 2. Carol attempts block sync from Bob to the tip (to height 28) + println!("\n2. Carol attempts block sync from Bob to the tip (to height 28)\n"); + + let mut block_sync = sync::initialize_sync_blocks(&bob_node); + let event = sync::sync_blocks_execute(&mut carol_state_machine, &mut block_sync).await; + println!( + "Event: {} to block {}", + state_event(&event), + carol_node.blockchain_db.get_height().unwrap() + ); + assert_eq!(event, StateEvent::BlocksSynchronized); + assert_eq!( + carol_node.blockchain_db.get_height().unwrap(), + carol_node.blockchain_db.fetch_last_header().unwrap().height + ); + // Bob will not be banned + assert!(!sync::wait_for_is_peer_banned(&carol_node, bob_node.node_identity.node_id(), 1).await); + + // 3. Alice attempts initial horizon sync from Carol prune node (to height 24) + println!("\n3. Alice attempts initial horizon sync from Carol prune node (to height 24)\n"); + + assert!(alice_node.blockchain_db.fetch_output(output_hash).unwrap().is_some()); + assert!(alice_node + .blockchain_db + .fetch_unspent_output_hash_by_commitment(commitment.clone()) + .unwrap() + .is_some()); + + let mut header_sync_alice_from_carol = sync::initialize_sync_headers_with_ping_pong_data(&alice_node, &carol_node); + let event = sync::sync_headers_execute(&mut alice_state_machine, &mut header_sync_alice_from_carol).await; + let alice_header_height = alice_node.blockchain_db.fetch_last_header().unwrap().height; + println!("Event: {} to header {}", state_event(&event), alice_header_height); + assert_eq!(alice_header_height, 28); + let event = decide_horizon_sync(&mut alice_state_machine, header_sync_alice_from_carol).await; + let mut horizon_sync = match event { + StateEvent::ProceedToHorizonSync(sync_peers) => HorizonStateSync::from(sync_peers), + _ => panic!("3. Alice should proceed to horizon sync"), + }; + let event = sync::horizon_sync_execute(&mut alice_state_machine, &mut horizon_sync).await; + + println!( + "Event: {} to block {}", + state_event(&event), + alice_node.blockchain_db.get_height().unwrap() + ); + assert_eq!(event, StateEvent::HorizonStateSynchronized); + assert_eq!( + alice_node.blockchain_db.get_height().unwrap(), + alice_header_height - pruning_horizon_alice + ); + + assert!(alice_node.blockchain_db.fetch_output(output_hash).unwrap().is_none()); + assert!(alice_node + .blockchain_db + .fetch_unspent_output_hash_by_commitment(commitment.clone()) + .unwrap() + .is_none()); + + // Carol will not be banned + assert!(!sync::wait_for_is_peer_banned(&alice_node, carol_node.node_identity.node_id(), 1).await); + + // 4. Alice attempts block sync from Carol prune node to the tip (to height 28) + println!("\n4. Alice attempts block sync from Carol prune node to the tip (to height 28)\n"); + + let mut block_sync = sync::initialize_sync_blocks(&carol_node); + let event = sync::sync_blocks_execute(&mut alice_state_machine, &mut block_sync).await; + println!( + "Event: {} to block {}", + state_event(&event), + alice_node.blockchain_db.get_height().unwrap() + ); + assert_eq!(event, StateEvent::BlocksSynchronized); + assert_eq!( + alice_node.blockchain_db.get_height().unwrap(), + alice_node.blockchain_db.fetch_last_header().unwrap().height + ); // Carol will not be banned assert!(!sync::wait_for_is_peer_banned(&alice_node, carol_node.node_identity.node_id(), 1).await); }