Skip to content

Commit

Permalink
feat: reset broken sync (#4955)
Browse files Browse the repository at this point in the history
Description
---
If sync fails resets chain to the highest pow chain the node locally has the data to. 

Motivation and Context
---

See: #4866

How Has This Been Tested?
---
Unit tests
  • Loading branch information
SWvheerden authored Nov 28, 2022
1 parent a6e8991 commit 01e9e7e
Show file tree
Hide file tree
Showing 8 changed files with 263 additions and 266 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ impl BlockSync {
});
log_mdc::extend(mdc);
warn!(target: LOG_TARGET, "Block sync failed: {}", err);
if let Err(e) = shared.db.swap_to_highest_pow_chain().await {
error!(
target: LOG_TARGET,
"Failed to reset chain to highest proof of work: {}", e
);
}
StateEvent::BlockSyncFailed
},
}
Expand Down
2 changes: 2 additions & 0 deletions base_layer/core/src/chain_storage/async_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {
make_async_fn!(get_shard_key(height:u64, public_key: PublicKey) -> Option<[u8;32]>, "get_shard_key");

make_async_fn!(fetch_template_registrations<T: RangeBounds<u64>>(range: T) -> Vec<TemplateRegistrationEntry>, "fetch_template_registrations");

make_async_fn!(swap_to_highest_pow_chain() -> (), "swap to highest proof-of-work chain");
}

impl<B: BlockchainBackend + 'static> From<BlockchainDatabase<B>> for AsyncBlockchainDb<B> {
Expand Down
2 changes: 2 additions & 0 deletions base_layer/core/src/chain_storage/blockchain_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ pub trait BlockchainBackend: Send + Sync {
/// Fetches an current tip orphan by hash or returns None if the orphan is not found or is not a tip of any
/// alternate chain
fn fetch_orphan_chain_tip_by_hash(&self, hash: &HashOutput) -> Result<Option<ChainHeader>, ChainStorageError>;
/// Fetches all currently stored orphan tips, if none are stored, returns an empty vec.
fn fetch_all_orphan_chain_tips(&self) -> Result<Vec<ChainHeader>, ChainStorageError>;
/// Fetch all orphans that have `hash` as a previous hash
fn fetch_orphan_children_of(&self, hash: HashOutput) -> Result<Vec<Block>, ChainStorageError>;

Expand Down
304 changes: 129 additions & 175 deletions base_layer/core/src/chain_storage/blockchain_database.rs

Large diffs are not rendered by default.

30 changes: 30 additions & 0 deletions base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2298,6 +2298,36 @@ impl BlockchainBackend for LMDBDatabase {
Ok(Some(chain_header))
}

fn fetch_all_orphan_chain_tips(&self) -> Result<Vec<ChainHeader>, ChainStorageError> {
let txn = self.read_transaction()?;
let tips: Vec<HashOutput> = lmdb_filter_map_values(&txn, &self.orphan_chain_tips_db, Some)?;
let mut result = Vec::new();
for hash in tips {
let orphan: Block =
lmdb_get(&txn, &self.orphans_db, hash.as_slice())?.ok_or_else(|| ChainStorageError::ValueNotFound {
entity: "Orphan",
field: "hash",
value: hash.to_hex(),
})?;

let accumulated_data = lmdb_get(&txn, &self.orphan_header_accumulated_data_db, hash.as_slice())?
.ok_or_else(|| ChainStorageError::ValueNotFound {
entity: "Orphan accumulated data",
field: "hash",
value: hash.to_hex(),
})?;
let height = orphan.header.height;
let chain_header = ChainHeader::try_construct(orphan.header, accumulated_data).ok_or_else(|| {
ChainStorageError::DataInconsistencyDetected {
function: "fetch_orphan_chain_tip_by_hash",
details: format!("Accumulated data mismatch at height #{}", height),
}
})?;
result.push(chain_header);
}
Ok(result)
}

fn fetch_orphan_children_of(&self, parent_hash: HashOutput) -> Result<Vec<Block>, ChainStorageError> {
trace!(
target: LOG_TARGET,
Expand Down
93 changes: 2 additions & 91 deletions base_layer/core/src/chain_storage/tests/blockchain_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
// DAMAGE.
use std::sync::Arc;

use tari_test_utils::unpack_enum;
use tari_utilities::hex::Hex;

use crate::{
blocks::{Block, BlockHeader, BlockHeaderAccumulatedData, ChainHeader, NewBlockTemplate},
chain_storage::{BlockchainDatabase, ChainStorageError},
Expand All @@ -36,8 +33,8 @@ use crate::{
},
transactions::{
tari_amount::T,
test_helpers::{schema_to_transaction, TransactionSchema},
transaction_components::{OutputFeatures, Transaction, UnblindedOutput},
test_helpers::schema_to_transaction,
transaction_components::{Transaction, UnblindedOutput},
},
txn_schema,
};
Expand Down Expand Up @@ -368,92 +365,6 @@ mod fetch_block_hashes_from_header_tip {
}
}

mod add_block {
use super::*;

#[test]
fn it_rejects_duplicate_commitments_in_the_utxo_set() {
let db = setup();
let (blocks, outputs) = add_many_chained_blocks(5, &db);

let prev_block = blocks.last().unwrap();
// Used to help identify the output we're interrogating in this test
let features = OutputFeatures {
maturity: 1,
..Default::default()
};
let (txns, tx_outputs) = schema_to_transaction(&[txn_schema!(
from: vec![outputs[0].clone()],
to: vec![500 * T],
features: features
)]);
let mut prev_utxo = tx_outputs[0].clone();

let (block, _) = create_next_block(&db, prev_block, txns);
db.add_block(block.clone()).unwrap().assert_added();

let prev_block = block;

let (txns, _) = schema_to_transaction(&[TransactionSchema {
from: vec![outputs[1].clone()],
to: vec![],
to_outputs: vec![prev_utxo.clone()],
fee: 5.into(),
lock_height: 0,
features,
script: tari_script::script![Nop],
covenant: Default::default(),
input_data: None,
input_version: None,
output_version: None,
}]);
let commitment_hex = txns[0]
.body
.outputs()
.iter()
.find(|o| o.features.maturity == 1)
.unwrap()
.commitment
.to_hex();

let (block, _) = create_next_block(&db, &prev_block, txns);
let err = db.add_block(block.clone()).unwrap_err();
unpack_enum!(ChainStorageError::KeyExists { key, .. } = err);
assert_eq!(key, commitment_hex);
// Check rollback
let header = db.fetch_header(block.header.height).unwrap();
assert!(header.is_none());

let (txns, _) = schema_to_transaction(&[txn_schema!(from: vec![prev_utxo.clone()], to: vec![50 * T])]);
let (block, _) = create_next_block(&db, &prev_block, txns);
let block = db.add_block(block).unwrap().assert_added();
let prev_block = block.to_arc_block();

// Different metadata so that the output hash is different in txo_hash_to_index_db
prev_utxo.features = OutputFeatures {
metadata: vec![1],
..Default::default()
};
// Now we can reuse a commitment
let (txns, _) = schema_to_transaction(&[TransactionSchema {
from: vec![outputs[1].clone()],
to: vec![],
to_outputs: vec![prev_utxo],
fee: 5.into(),
lock_height: 0,
features: Default::default(),
script: tari_script::script![Nop],
covenant: Default::default(),
input_data: None,
input_version: None,
output_version: None,
}]);

let (block, _) = create_next_block(&db, &prev_block, txns);
db.add_block(block).unwrap().assert_added();
}
}

mod get_stats {
use super::*;

Expand Down
4 changes: 4 additions & 0 deletions base_layer/core/src/test_helpers/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,10 @@ impl BlockchainBackend for TempDatabase {
self.db.as_ref().unwrap().fetch_orphan_chain_tip_by_hash(hash)
}

fn fetch_all_orphan_chain_tips(&self) -> Result<Vec<ChainHeader>, ChainStorageError> {
self.db.as_ref().unwrap().fetch_all_orphan_chain_tips()
}

fn fetch_orphan_children_of(&self, hash: HashOutput) -> Result<Vec<Block>, ChainStorageError> {
self.db.as_ref().unwrap().fetch_orphan_children_of(hash)
}
Expand Down
88 changes: 88 additions & 0 deletions base_layer/core/tests/chain_storage_tests/chain_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,94 @@ fn test_handle_tip_reorg() {
assert!(store.fetch_orphan(*blocks[2].hash()).is_ok());
}

#[test]
fn test_handle_tip_reset() {
// GB --> A1 --> A2(Low PoW) [Main Chain]
// \--> B2(Highest PoW) [Forked Chain]
// Initially, the main chain is GB->A1->A2. B2 has a higher accumulated PoW and when B2 is added the main chain is
// reorged to GB->A1->B2

// Create Main Chain
let network = Network::LocalNet;
let (mut store, mut blocks, mut outputs, consensus_manager) = create_new_blockchain(network);
// Block A1
let txs = vec![txn_schema!(
from: vec![outputs[0][0].clone()],
to: vec![10 * T, 10 * T, 10 * T, 10 * T]
)];
generate_new_block_with_achieved_difficulty(
&mut store,
&mut blocks,
&mut outputs,
txs,
Difficulty::from(1),
&consensus_manager,
)
.unwrap();
// Block A2
let txs = vec![txn_schema!(from: vec![outputs[1][3].clone()], to: vec![6 * T])];
generate_new_block_with_achieved_difficulty(
&mut store,
&mut blocks,
&mut outputs,
txs,
Difficulty::from(3),
&consensus_manager,
)
.unwrap();

// Create Forked Chain

let mut orphan_store = create_store_with_consensus(consensus_manager.clone());
orphan_store.add_block(blocks[1].to_arc_block()).unwrap();
let mut orphan_blocks = vec![blocks[0].clone(), blocks[1].clone()];
let mut orphan_outputs = vec![outputs[0].clone(), outputs[1].clone()];
// Block B2
let txs = vec![txn_schema!(from: vec![orphan_outputs[1][0].clone()], to: vec![5 * T])];
generate_new_block_with_achieved_difficulty(
&mut orphan_store,
&mut orphan_blocks,
&mut orphan_outputs,
txs,
Difficulty::from(7),
&consensus_manager,
)
.unwrap();

// Adding B2 to the main chain will produce a reorg to GB->A1->B2.
if let Ok(BlockAddResult::ChainReorg { .. }) = store.add_block(orphan_blocks[2].to_arc_block()) {
} else {
panic!();
}

assert_eq!(store.fetch_tip_header().unwrap().header().height, 2);
store.rewind_to_height(1).unwrap();
assert_eq!(store.fetch_tip_header().unwrap().header().height, 1);
// both tips should be in the orphan pool
assert!(store.fetch_orphan(*orphan_blocks[2].hash()).is_ok());
assert!(store.fetch_orphan(*blocks[2].hash()).is_ok());
store.swap_to_highest_pow_chain().unwrap();
// should no be on B2

assert_eq!(store.fetch_tip_header().unwrap().header().height, 2);
assert_eq!(store.fetch_tip_header().unwrap().hash(), orphan_blocks[2].hash());
assert!(store.fetch_orphan(*blocks[2].hash()).is_ok());

store.swap_to_highest_pow_chain().unwrap();
// Chain should not have swapped
assert_eq!(store.fetch_tip_header().unwrap().hash(), orphan_blocks[2].hash());
assert!(store.fetch_orphan(*blocks[2].hash()).is_ok());

// lets reset to A1 again
store.rewind_to_height(1).unwrap();
assert_eq!(store.fetch_tip_header().unwrap().header().height, 1);
store.cleanup_all_orphans().unwrap();
store.swap_to_highest_pow_chain().unwrap();
// current main chain should be the highest so is it still?
assert_eq!(store.fetch_tip_header().unwrap().header().height, 1);
assert_eq!(store.fetch_tip_header().unwrap().hash(), blocks[1].hash());
}

#[test]
#[allow(clippy::identity_op)]
#[allow(clippy::too_many_lines)]
Expand Down

0 comments on commit 01e9e7e

Please sign in to comment.