Skip to content

Commit

Permalink
feat: initial horizon sync from prune node (#6109)
Browse files Browse the repository at this point in the history
Description
---
- Implemented initial horizon sync from prune node.
- Added an integration-level unit test to ensure it works as intended.

Motivation and Context
---
This will be more efficient for the network in total

How Has This Been Tested?
---
New integration-level unit test
System-level sync tests from archival node and from prune node

What process can a PR reviewer use to test or verify this change?
---
See integration-level unit test
Perform system-level sync test

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [ ] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [X] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
  • Loading branch information
hansieodendaal authored Feb 2, 2024
1 parent 70d5ad3 commit 2987621
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,12 @@ impl DecideNextSync {
.filter(|sync_peer| {
let remote_metadata = sync_peer.claimed_chain_metadata();
debug!(target: LOG_TARGET, "Peer metadata: {}", remote_metadata);
let remote_is_archival_node = remote_metadata.pruned_height() == 0;
let general_sync_conditions =
// Must be able to provide the correct amount of full blocks past the pruned height (i.e. the
// pruning horizon), otherwise our horizon spec will not be met
remote_metadata.best_block_height().saturating_sub(remote_metadata.pruned_height()) >=
local_metadata.pruning_horizon() &&
// Must have a better blockchain tip than us
remote_metadata.best_block_height() > local_metadata.best_block_height() &&
// Must be able to provide full blocks from the height we need detailed information
remote_metadata.pruned_height() <= local_metadata.best_block_height();
let sync_from_prune_node = !remote_is_archival_node &&
// Must have done initial sync (to detect genesis TXO spends)
local_metadata.best_block_height() > 0;
general_sync_conditions && (remote_is_archival_node || sync_from_prune_node)
// Must be able to provide the correct amount of full blocks past the pruned height (i.e. the
// pruning horizon), otherwise our horizon spec will not be met
remote_metadata.best_block_height().saturating_sub(remote_metadata.pruned_height()) >=
local_metadata.pruning_horizon() &&
// Must have a better blockchain tip than us
remote_metadata.best_block_height() > local_metadata.best_block_height()
})
.collect::<Vec<_>>();

Expand Down
42 changes: 42 additions & 0 deletions base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,39 @@ where B: BlockchainBackend + 'static
current_header.hash().to_hex(),
end_header.hash().to_hex(),
);

// If this is a pruned node and outputs have been requested for an initial sync, we need to discover and send
// the outputs from the genesis block that have been pruned as well
let mut pruned_genesis_block_outputs = Vec::new();
let metadata = self
.db
.get_chain_metadata()
.await
.rpc_status_internal_error(LOG_TARGET)?;
if current_header.height == 1 && metadata.is_pruned_node() {
let genesis_block = self.db.fetch_genesis_block();
for output in genesis_block.block().body.outputs() {
let output_hash = output.hash();
if self
.db
.fetch_output(output_hash)
.await
.rpc_status_internal_error(LOG_TARGET)?
.is_none()
{
trace!(
target: LOG_TARGET,
"Spent genesis TXO (commitment '{}') to peer",
output.commitment.to_hex()
);
pruned_genesis_block_outputs.push(Ok(SyncUtxosResponse {
txo: Some(Txo::Commitment(output.commitment.as_bytes().to_vec())),
mined_header: current_header.hash().to_vec(),
}));
}
}
}

let start_header = current_header.clone();
loop {
let timer = Instant::now();
Expand Down Expand Up @@ -248,6 +281,15 @@ where B: BlockchainBackend + 'static
let mut txos = Vec::with_capacity(outputs.len() + inputs.len());
txos.append(&mut outputs);
txos.append(&mut inputs);
if start_header == current_header {
debug!(
target: LOG_TARGET,
"Adding {} genesis block pruned inputs in response for block #{} '{}'", pruned_genesis_block_outputs.len(),
current_header.height,
current_header_hash
);
txos.append(&mut pruned_genesis_block_outputs);
}
let txos = txos.into_iter();

// Ensure task stops if the peer prematurely stops their RPC session
Expand Down
4 changes: 4 additions & 0 deletions base_layer/core/src/chain_storage/async_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {
pub fn inner(&self) -> &BlockchainDatabase<B> {
&self.db
}

pub fn fetch_genesis_block(&self) -> ChainBlock {
self.db.fetch_genesis_block()
}
}

impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {
Expand Down
19 changes: 12 additions & 7 deletions base_layer/core/src/chain_storage/blockchain_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,11 @@ where B: BlockchainBackend
Ok(blockchain_db)
}

/// Get the genesis block form the consensus manager
pub fn fetch_genesis_block(&self) -> ChainBlock {
self.consensus_manager.get_genesis_block()
}

/// Returns a reference to the consensus cosntants at the current height
pub fn consensus_constants(&self) -> Result<&ConsensusConstants, ChainStorageError> {
let height = self.get_height()?;
Expand Down Expand Up @@ -2363,18 +2368,18 @@ fn prune_database_if_needed<T: BlockchainBackend>(
return Ok(());
}

let db_height = metadata.best_block_height();
let abs_pruning_horizon = db_height.saturating_sub(pruning_horizon);

let prune_to_height_target = metadata.best_block_height().saturating_sub(pruning_horizon);
debug!(
target: LOG_TARGET,
"Current pruned height is: {}, pruning horizon is: {}, while the pruning interval is: {}",
"Blockchain height: {}, pruning horizon: {}, pruned height: {}, prune to height target: {}, pruning interval: {}",
metadata.best_block_height(),
metadata.pruning_horizon(),
metadata.pruned_height(),
abs_pruning_horizon,
prune_to_height_target,
pruning_interval,
);
if metadata.pruned_height() < abs_pruning_horizon.saturating_sub(pruning_interval) {
prune_to_height(db, abs_pruning_horizon)?;
if metadata.pruned_height() < prune_to_height_target.saturating_sub(pruning_interval) {
prune_to_height(db, prune_to_height_target)?;
}

Ok(())
Expand Down
187 changes: 182 additions & 5 deletions base_layer/core/tests/tests/horizon_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ use crate::helpers::{

#[allow(clippy::too_many_lines)]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_horizon_sync_from_archival_node_happy_path() {
async fn test_initial_horizon_sync_from_archival_node_happy_path() {
//` cargo test --release --test core_integration_tests
//` tests::horizon_sync::test_horizon_sync_from_archival_node_happy_path > .\target\output.txt 2>&1
//` tests::horizon_sync::test_initial_horizon_sync_from_archival_node_happy_path > .\target\output.txt 2>&1
// env_logger::init(); // Set `$env:RUST_LOG = "trace"`

// Create the network with Alice (pruning node) and Bob (archival node)
Expand Down Expand Up @@ -285,9 +285,9 @@ async fn test_horizon_sync_from_archival_node_happy_path() {

#[allow(clippy::too_many_lines)]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_horizon_sync_from_prune_node_happy_path() {
async fn test_consecutive_horizon_sync_from_prune_node_happy_path() {
//` cargo test --release --test core_integration_tests
//` tests::horizon_sync::test_horizon_sync_from_prune_node_happy_path > .\target\output.txt 2>&1
//` tests::horizon_sync::test_initial_horizon_sync_from_prune_node_happy_path > .\target\output.txt 2>&1
// env_logger::init(); // Set `$env:RUST_LOG = "trace"`

// Create the network with Alice (pruning node) and Bob (archival node) and Carol (pruning node)
Expand Down Expand Up @@ -657,8 +657,185 @@ async fn test_horizon_sync_from_prune_node_happy_path() {
alice_node.blockchain_db.get_height().unwrap(),
alice_header_height - pruning_horizon_alice
);
// Carol will not be banned
assert!(!sync::wait_for_is_peer_banned(&alice_node, carol_node.node_identity.node_id(), 1).await);
}

#[allow(clippy::too_many_lines)]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_initial_horizon_sync_from_prune_node_happy_path() {
//` cargo test --release --test core_integration_tests
//` tests::horizon_sync::test_initial_horizon_sync_from_prune_node_happy_path > .\target\output.txt 2>&1
// env_logger::init(); // Set `$env:RUST_LOG = "trace"`

// Create the network with Alice (pruning node) and Bob (archival node) and Carol (pruning node)
let pruning_horizon_alice = 4;
let pruning_horizon_carol = 12;
let (mut state_machines, mut peer_nodes, initial_block, consensus_manager, key_manager, initial_coinbase) =
sync::create_network_with_multiple_nodes(vec![
// Alice is a pruned node
BlockchainDatabaseConfig {
orphan_storage_capacity: 5,
pruning_horizon: pruning_horizon_alice,
pruning_interval: 5,
track_reorgs: false,
cleanup_orphans_at_startup: false,
},
// Carol is a pruned node
BlockchainDatabaseConfig {
orphan_storage_capacity: 5,
pruning_horizon: pruning_horizon_carol,
pruning_interval: 5,
track_reorgs: false,
cleanup_orphans_at_startup: false,
},
// Bob is an archival node
BlockchainDatabaseConfig::default(),
])
.await;
let mut alice_state_machine = state_machines.remove(0);
let mut carol_state_machine = state_machines.remove(0);
let alice_node = peer_nodes.remove(0);
let carol_node = peer_nodes.remove(0);
let bob_node = peer_nodes.remove(0);

// Create a blockchain that spends the genesys coinbase early on and then later spends some more coinbase outputs
let follow_up_coinbases_to_spend = 5;
let (_blocks, _coinbases) = sync::create_block_chain_with_transactions(
&bob_node,
&initial_block,
&initial_coinbase,
&consensus_manager,
&key_manager,
min(pruning_horizon_alice, pruning_horizon_carol),
28, // > follow_up_transaction_in_block + pruning_horizon_carol + 1
2, // < pruning_horizon_alice, < pruning_horizon_carol
14, // > pruning_horizon_alice, > pruning_horizon_carol
follow_up_coinbases_to_spend, // > spend_genesis_coinbase_in_block - 1, < follow_up_transaction_in_block
)
.await;

// 1. Carol attempts initial horizon sync from Bob archival node (to pruning height 16)
println!("\n1. Carol attempts initial horizon sync from Bob archival node (to pruning height 16)\n");

let output_hash = initial_coinbase.hash(&key_manager).await.unwrap();
assert!(carol_node.blockchain_db.fetch_output(output_hash).unwrap().is_some());
let commitment = initial_coinbase.commitment(&key_manager).await.unwrap();
assert!(carol_node
.blockchain_db
.fetch_unspent_output_hash_by_commitment(commitment.clone())
.unwrap()
.is_some());

let mut header_sync_carol_from_bob = sync::initialize_sync_headers_with_ping_pong_data(&carol_node, &bob_node);
let event = sync::sync_headers_execute(&mut carol_state_machine, &mut header_sync_carol_from_bob).await;
let carol_header_height = carol_node.blockchain_db.fetch_last_header().unwrap().height;
println!("Event: {} to header {}", state_event(&event), carol_header_height);
assert_eq!(carol_header_height, 28);
let event = decide_horizon_sync(&mut carol_state_machine, header_sync_carol_from_bob).await;
let mut horizon_sync = match event {
StateEvent::ProceedToHorizonSync(sync_peers) => HorizonStateSync::from(sync_peers),
_ => panic!("1. Carol should proceed to horizon sync"),
};
let event = sync::horizon_sync_execute(&mut carol_state_machine, &mut horizon_sync).await;

println!(
"Event: {} to block {}",
state_event(&event),
carol_node.blockchain_db.get_height().unwrap()
);
assert_eq!(event, StateEvent::HorizonStateSynchronized);
assert_eq!(
carol_node.blockchain_db.get_height().unwrap(),
carol_header_height - pruning_horizon_carol
);

assert!(carol_node.blockchain_db.fetch_output(output_hash).unwrap().is_none());
assert!(carol_node
.blockchain_db
.fetch_unspent_output_hash_by_commitment(commitment.clone())
.unwrap()
.is_none());

// Bob will not be banned
assert!(!sync::wait_for_is_peer_banned(&alice_node, bob_node.node_identity.node_id(), 1).await);
assert!(!sync::wait_for_is_peer_banned(&carol_node, bob_node.node_identity.node_id(), 1).await);

// 2. Carol attempts block sync from Bob to the tip (to height 28)
println!("\n2. Carol attempts block sync from Bob to the tip (to height 28)\n");

let mut block_sync = sync::initialize_sync_blocks(&bob_node);
let event = sync::sync_blocks_execute(&mut carol_state_machine, &mut block_sync).await;
println!(
"Event: {} to block {}",
state_event(&event),
carol_node.blockchain_db.get_height().unwrap()
);
assert_eq!(event, StateEvent::BlocksSynchronized);
assert_eq!(
carol_node.blockchain_db.get_height().unwrap(),
carol_node.blockchain_db.fetch_last_header().unwrap().height
);
// Bob will not be banned
assert!(!sync::wait_for_is_peer_banned(&carol_node, bob_node.node_identity.node_id(), 1).await);

// 3. Alice attempts initial horizon sync from Carol prune node (to height 24)
println!("\n3. Alice attempts initial horizon sync from Carol prune node (to height 24)\n");

assert!(alice_node.blockchain_db.fetch_output(output_hash).unwrap().is_some());
assert!(alice_node
.blockchain_db
.fetch_unspent_output_hash_by_commitment(commitment.clone())
.unwrap()
.is_some());

let mut header_sync_alice_from_carol = sync::initialize_sync_headers_with_ping_pong_data(&alice_node, &carol_node);
let event = sync::sync_headers_execute(&mut alice_state_machine, &mut header_sync_alice_from_carol).await;
let alice_header_height = alice_node.blockchain_db.fetch_last_header().unwrap().height;
println!("Event: {} to header {}", state_event(&event), alice_header_height);
assert_eq!(alice_header_height, 28);
let event = decide_horizon_sync(&mut alice_state_machine, header_sync_alice_from_carol).await;
let mut horizon_sync = match event {
StateEvent::ProceedToHorizonSync(sync_peers) => HorizonStateSync::from(sync_peers),
_ => panic!("3. Alice should proceed to horizon sync"),
};
let event = sync::horizon_sync_execute(&mut alice_state_machine, &mut horizon_sync).await;

println!(
"Event: {} to block {}",
state_event(&event),
alice_node.blockchain_db.get_height().unwrap()
);
assert_eq!(event, StateEvent::HorizonStateSynchronized);
assert_eq!(
alice_node.blockchain_db.get_height().unwrap(),
alice_header_height - pruning_horizon_alice
);

assert!(alice_node.blockchain_db.fetch_output(output_hash).unwrap().is_none());
assert!(alice_node
.blockchain_db
.fetch_unspent_output_hash_by_commitment(commitment.clone())
.unwrap()
.is_none());

// Carol will not be banned
assert!(!sync::wait_for_is_peer_banned(&alice_node, carol_node.node_identity.node_id(), 1).await);

// 4. Alice attempts block sync from Carol prune node to the tip (to height 28)
println!("\n4. Alice attempts block sync from Carol prune node to the tip (to height 28)\n");

let mut block_sync = sync::initialize_sync_blocks(&carol_node);
let event = sync::sync_blocks_execute(&mut alice_state_machine, &mut block_sync).await;
println!(
"Event: {} to block {}",
state_event(&event),
alice_node.blockchain_db.get_height().unwrap()
);
assert_eq!(event, StateEvent::BlocksSynchronized);
assert_eq!(
alice_node.blockchain_db.get_height().unwrap(),
alice_node.blockchain_db.fetch_last_header().unwrap().height
);
// Carol will not be banned
assert!(!sync::wait_for_is_peer_banned(&alice_node, carol_node.node_identity.node_id(), 1).await);
}

0 comments on commit 2987621

Please sign in to comment.