diff --git a/core/src/validator.rs b/core/src/validator.rs index 3186f1afec0296..79cf0281a12b95 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -181,6 +181,16 @@ impl Validator { sigverify::init(); info!("Done."); + if let Some(shred_version) = config.expected_shred_version { + if let Some(wait_for_supermajority_slot) = config.wait_for_supermajority { + backup_and_clear_blockstore( + ledger_path, + wait_for_supermajority_slot + 1, + shred_version, + ); + } + } + info!("creating bank..."); let ( genesis_config, @@ -381,14 +391,7 @@ impl Validator { (None, None) }; - wait_for_supermajority( - config, - &bank, - &cluster_info, - rpc_override_health_check, - &blockstore, - ledger_path, - ); + wait_for_supermajority(config, &bank, &cluster_info, rpc_override_health_check); let poh_service = PohService::new(poh_recorder.clone(), &poh_config, &exit); assert_eq!( @@ -634,21 +637,39 @@ fn new_banks_from_blockstore( ) } -fn backup_and_clear_blockstore(blockstore: &Arc, ledger_path: &Path, start_slot: Slot) { +fn backup_and_clear_blockstore(ledger_path: &Path, start_slot: Slot, shred_version: u16) { use std::time::Instant; - let folder_name = format!("backup_rocksdb_{}", thread_rng().gen_range(0, 99999)); - let backup_blockstore = Blockstore::open(&ledger_path.join(folder_name)); + let blockstore = Blockstore::open(ledger_path).unwrap(); + let mut do_copy_and_clear = false; + + // Search for shreds with incompatible version in blockstore if let Ok(slot_meta_iterator) = blockstore.slot_meta_iterator(start_slot) { + for (slot, _meta) in slot_meta_iterator { + if let Ok(shreds) = blockstore.get_data_shreds_for_slot(slot, 0) { + for shred in &shreds { + if shred.version() != shred_version { + do_copy_and_clear = true; + break; + } + } + } + } + } + + // If found, then copy shreds to another db and clear from start_slot + if do_copy_and_clear { + let folder_name = format!("backup_rocksdb_{}", thread_rng().gen_range(0, 99999)); + let backup_blockstore = Blockstore::open(&ledger_path.join(folder_name)); let mut last_print = Instant::now(); let mut copied = 0; - let mut end_slot = start_slot; + let mut last_slot = None; + let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot).unwrap(); for (slot, _meta) in slot_meta_iterator { if let Ok(shreds) = blockstore.get_data_shreds_for_slot(slot, 0) { if let Ok(ref backup_blockstore) = backup_blockstore { copied += shreds.len(); let _ = backup_blockstore.insert_shreds(shreds, None, true); } - end_slot = slot; } if last_print.elapsed().as_millis() > 3000 { info!( @@ -657,10 +678,13 @@ fn backup_and_clear_blockstore(blockstore: &Arc, ledger_path: &Path, ); last_print = Instant::now(); } + last_slot = Some(slot); } + let end_slot = last_slot.unwrap(); info!("Purging slots {} to {}", start_slot, end_slot); blockstore.purge_slots_with_delay(start_slot, end_slot, None, PurgeType::Exact); + blockstore.purge_from_next_slots(start_slot, end_slot); info!("Purging done, compacting db.."); if let Err(e) = blockstore.compact_storage(start_slot, end_slot) { warn!( @@ -670,6 +694,7 @@ fn backup_and_clear_blockstore(blockstore: &Arc, ledger_path: &Path, } info!("done"); } + drop(blockstore); } fn wait_for_supermajority( @@ -677,15 +702,11 @@ fn wait_for_supermajority( bank: &Bank, cluster_info: &ClusterInfo, rpc_override_health_check: Arc, - blockstore: &Arc, - ledger_path: &Path, ) { if config.wait_for_supermajority != Some(bank.slot()) { return; } - backup_and_clear_blockstore(blockstore, ledger_path, bank.slot() + 1); - info!( "Waiting for 80% of activated stake at slot {} to be in gossip...", bank.slot() @@ -960,23 +981,27 @@ mod tests { use solana_ledger::{blockstore, entry}; let blockstore_path = get_tmp_ledger_path!(); { - let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap()); + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + + let entries = entry::create_ticks(1, 0, Hash::default()); info!("creating shreds"); let mut last_print = Instant::now(); for i in 1..10 { - let entries = entry::create_ticks(1, 0, Hash::default()); - let shreds = blockstore::entries_to_test_shreds(entries, i, i - 1, true, 1); + let shreds = blockstore::entries_to_test_shreds(entries.clone(), i, i - 1, true, 1); blockstore.insert_shreds(shreds, None, true).unwrap(); if last_print.elapsed().as_millis() > 5000 { info!("inserted {}", i); last_print = Instant::now(); } } + drop(blockstore); - backup_and_clear_blockstore(&blockstore, &blockstore_path, 5); + backup_and_clear_blockstore(&blockstore_path, 5, 2); - for i in 6..10 { + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + assert!(blockstore.meta(4).unwrap().unwrap().next_slots.is_empty()); + for i in 5..10 { assert!(blockstore .get_data_shreds_for_slot(i, 0) .unwrap() diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index f8aaa29a33e96f..9e9084f0b9538d 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -24,6 +24,7 @@ impl Blockstore { let mut batch_start = from_slot; let mut purge_stats = PurgeStats::default(); let mut last_datapoint = Instant::now(); + let mut datapoint_start = batch_start; while batch_start < to_slot { let batch_end = (batch_start + PURGE_BATCH_SIZE).min(to_slot); @@ -33,13 +34,14 @@ impl Blockstore { if last_datapoint.elapsed().as_millis() > 1000 { datapoint_info!( "blockstore-purge", - ("from_slot", batch_start as i64, i64), - ("to_slot", to_slot as i64, i64), + ("from_slot", datapoint_start as i64, i64), + ("to_slot", batch_end as i64, i64), ("delete_range_us", purge_stats.delete_range as i64, i64), ("write_batch_us", purge_stats.write_batch as i64, i64) ); last_datapoint = Instant::now(); purge_stats = PurgeStats::default(); + datapoint_start = batch_end; } match purge_result {