Skip to content

Commit

Permalink
fix(pruned mode)!: prune inputs, allow horizon sync resume and other …
Browse files Browse the repository at this point in the history
…fixes (#3521)

Description
---
- delete inputs behind pruning horizon during pruned mode cleanup
- keep cumulative utxo_sum and kernel_sum in `BlockAccumulatedData`
- prune spent outputs that may have not been pruned in previous horizon sync
- bugfix: check rangeproof validation result in horizon sync (failed validation was ignored)
- parallelize rangeproof verification for horizon sync
- prune blocks between new and old horizon height if necessary before syncing 
- fix off-by-one in pruning horizon (pruning horizon is consistently defined as the last pruned block inclusive)
- minor optimisations in horizon sync
- new blockchain db unit tests

Motivation and Context
---
Inputs behind the pruning horizon were not pruned as propagated blocks arrive.
Breaking change: Blockchain database will need to be resynced

Based on #3520 

How Has This Been Tested?
---
Cucumber tests pass
Manually, horizon sync to tip and wallet recovery
  • Loading branch information
sdbondi authored Dec 3, 2021
1 parent e501aa0 commit a4341a0
Show file tree
Hide file tree
Showing 35 changed files with 973 additions and 778 deletions.
2 changes: 1 addition & 1 deletion applications/tari_base_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ where B: BlockchainBackend + 'static
orphan_db_clean_out_threshold: config.orphan_db_clean_out_threshold,
max_randomx_vms: config.max_randomx_vms,
blocks_behind_before_considered_lagging: self.config.blocks_behind_before_considered_lagging,
block_sync_validation_concurrency: num_cpus::get(),
sync_validation_concurrency: num_cpus::get(),
..Default::default()
},
self.rules,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ where B: BlockchainBackend + 'static
rules.clone(),
factories,
config.bypass_range_proof_verification,
config.block_sync_validation_concurrency,
config.sync_validation_concurrency,
);
let max_randomx_vms = config.max_randomx_vms;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub struct BaseNodeStateMachineConfig {
pub max_randomx_vms: usize,
pub blocks_behind_before_considered_lagging: u64,
pub bypass_range_proof_verification: bool,
pub block_sync_validation_concurrency: usize,
pub sync_validation_concurrency: usize,
}

impl Default for BaseNodeStateMachineConfig {
Expand All @@ -68,7 +68,7 @@ impl Default for BaseNodeStateMachineConfig {
max_randomx_vms: 0,
blocks_behind_before_considered_lagging: 0,
bypass_range_proof_verification: false,
block_sync_validation_concurrency: 8,
sync_validation_concurrency: 8,
}
}
}
Expand Down Expand Up @@ -259,9 +259,13 @@ impl<B: BlockchainBackend + 'static> BaseNodeStateMachine<B> {

/// Polls both the interrupt signal and the given future. If the given future `state_fut` is ready first it's value is
/// returned, otherwise if the interrupt signal is triggered, `StateEvent::UserQuit` is returned.
async fn select_next_state_event<F>(interrupt_signal: ShutdownSignal, state_fut: F) -> StateEvent
where F: Future<Output = StateEvent> {
async fn select_next_state_event<F, I>(interrupt_signal: I, state_fut: F) -> StateEvent
where
F: Future<Output = StateEvent>,
I: Future<Output = ()>,
{
futures::pin_mut!(state_fut);
futures::pin_mut!(interrupt_signal);
// If future A and B are both ready `future::select` will prefer A
match future::select(interrupt_signal, state_fut).await {
Either::Left(_) => StateEvent::UserQuit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,26 @@
// TODO: Move the horizon synchronizer to the `sync` module

use log::*;
mod config;
pub use self::config::HorizonSyncConfig;

mod error;
pub use error::HorizonSyncError;
use horizon_state_synchronization::HorizonStateSynchronization;

use crate::{
base_node::{sync::SyncPeer, BaseNodeStateMachine},
chain_storage::BlockchainBackend,
transactions::CryptoFactories,
};
mod horizon_state_synchronization;
use horizon_state_synchronization::HorizonStateSynchronization;

use super::{
events_and_states::{HorizonSyncInfo, HorizonSyncStatus},
StateEvent,
StateInfo,
};

pub use self::config::HorizonSyncConfig;

mod config;

mod error;

mod horizon_state_synchronization;
use crate::{
base_node::{sync::SyncPeer, BaseNodeStateMachine},
chain_storage::BlockchainBackend,
transactions::CryptoFactories,
};
use log::*;

const LOG_TARGET: &str = "c::bn::state_machine_service::states::horizon_state_sync";

Expand All @@ -72,29 +68,34 @@ impl HorizonStateSync {
) -> StateEvent {
let local_metadata = match shared.db.get_chain_metadata().await {
Ok(metadata) => metadata,
Err(err) => return StateEvent::FatalError(err.to_string()),
Err(err) => return err.into(),
};

if local_metadata.height_of_longest_chain() > 0 &&
local_metadata.height_of_longest_chain() >= local_metadata.pruned_height()
{
let last_header = match shared.db.fetch_last_header().await {
Ok(h) => h,
Err(err) => return err.into(),
};

let horizon_sync_height = local_metadata.horizon_block(last_header.height);
if local_metadata.pruned_height() >= horizon_sync_height {
info!(target: LOG_TARGET, "Horizon state was already synchronized.");
return StateEvent::HorizonStateSynchronized;
}

let horizon_sync_height = match shared.db.fetch_last_header().await {
Ok(header) => header.height.saturating_sub(local_metadata.pruning_horizon()),
Err(err) => return StateEvent::FatalError(err.to_string()),
};

if local_metadata.height_of_longest_chain() > horizon_sync_height {
// We're already synced because we have full blocks higher than our target pruned height
if local_metadata.height_of_longest_chain() >= horizon_sync_height {
info!(
target: LOG_TARGET,
"Tip height is higher than our pruned height. Horizon state is already synchronized."
);
return StateEvent::HorizonStateSynchronized;
}

let info = HorizonSyncInfo::new(vec![self.sync_peer.node_id().clone()], HorizonSyncStatus::Starting);
shared.set_state_info(StateInfo::HorizonSync(info));

let prover = CryptoFactories::default().range_proof;
let mut horizon_state = HorizonStateSynchronization::new(shared, &self.sync_peer, horizon_sync_height, &prover);
let mut horizon_state = HorizonStateSynchronization::new(shared, &self.sync_peer, horizon_sync_height, prover);

match horizon_state.synchronize().await {
Ok(()) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,20 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::num::TryFromIntError;

use thiserror::Error;
use tokio::task;

use tari_comms::{
connectivity::ConnectivityError,
protocol::rpc::{RpcError, RpcStatus},
};
use tari_mmr::error::MerkleMountainRangeError;

use crate::{
base_node::{comms_interface::CommsInterfaceError, state_machine_service::states::helpers::BaseNodeRequestError},
chain_storage::{ChainStorageError, MmrTree},
transactions::transaction_entities::error::TransactionError,
validation::ValidationError,
};
use std::num::TryFromIntError;
use tari_comms::{
connectivity::ConnectivityError,
protocol::rpc::{RpcError, RpcStatus},
};
use tari_mmr::error::MerkleMountainRangeError;
use thiserror::Error;
use tokio::task;

#[derive(Debug, Error)]
pub enum HorizonSyncError {
Expand Down Expand Up @@ -71,7 +68,7 @@ pub enum HorizonSyncError {
ConversionError(String),
#[error("MerkleMountainRangeError: {0}")]
MerkleMountainRangeError(#[from] MerkleMountainRangeError),
#[error("Connectivity Error: {0}")]
#[error("Connectivity error: {0}")]
ConnectivityError(#[from] ConnectivityError),
}

Expand Down
Loading

0 comments on commit a4341a0

Please sign in to comment.