Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Rework backup and clear function #10751

Merged
merged 1 commit into from
Jun 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 47 additions & 22 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,16 @@ impl Validator {
sigverify::init();
info!("Done.");

if let Some(shred_version) = config.expected_shred_version {
if let Some(wait_for_supermajority_slot) = config.wait_for_supermajority {
backup_and_clear_blockstore(
ledger_path,
wait_for_supermajority_slot + 1,
shred_version,
);
}
}

info!("creating bank...");
let (
genesis_config,
Expand Down Expand Up @@ -383,14 +393,7 @@ impl Validator {
(None, None)
};

wait_for_supermajority(
config,
&bank,
&cluster_info,
rpc_override_health_check,
&blockstore,
ledger_path,
);
wait_for_supermajority(config, &bank, &cluster_info, rpc_override_health_check);

let poh_service = PohService::new(poh_recorder.clone(), &poh_config, &exit);
assert_eq!(
Expand Down Expand Up @@ -636,21 +639,39 @@ fn new_banks_from_blockstore(
)
}

fn backup_and_clear_blockstore(blockstore: &Arc<Blockstore>, ledger_path: &Path, start_slot: Slot) {
fn backup_and_clear_blockstore(ledger_path: &Path, start_slot: Slot, shred_version: u16) {
use std::time::Instant;
let folder_name = format!("backup_rocksdb_{}", thread_rng().gen_range(0, 99999));
let backup_blockstore = Blockstore::open(&ledger_path.join(folder_name));
let blockstore = Blockstore::open(ledger_path).unwrap();
let mut do_copy_and_clear = false;

// Search for shreds with incompatible version in blockstore
if let Ok(slot_meta_iterator) = blockstore.slot_meta_iterator(start_slot) {
for (slot, _meta) in slot_meta_iterator {
if let Ok(shreds) = blockstore.get_data_shreds_for_slot(slot, 0) {
for shred in &shreds {
if shred.version() != shred_version {
do_copy_and_clear = true;
break;
}
}
}
}
}

// If found, then copy shreds to another db and clear from start_slot
if do_copy_and_clear {
let folder_name = format!("backup_rocksdb_{}", thread_rng().gen_range(0, 99999));
let backup_blockstore = Blockstore::open(&ledger_path.join(folder_name));
let mut last_print = Instant::now();
let mut copied = 0;
let mut end_slot = start_slot;
let mut last_slot = None;
let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot).unwrap();
for (slot, _meta) in slot_meta_iterator {
if let Ok(shreds) = blockstore.get_data_shreds_for_slot(slot, 0) {
if let Ok(ref backup_blockstore) = backup_blockstore {
copied += shreds.len();
let _ = backup_blockstore.insert_shreds(shreds, None, true);
}
end_slot = slot;
}
if last_print.elapsed().as_millis() > 3000 {
info!(
Expand All @@ -659,10 +680,13 @@ fn backup_and_clear_blockstore(blockstore: &Arc<Blockstore>, ledger_path: &Path,
);
last_print = Instant::now();
}
last_slot = Some(slot);
}

let end_slot = last_slot.unwrap();
info!("Purging slots {} to {}", start_slot, end_slot);
blockstore.purge_slots_with_delay(start_slot, end_slot, None, PurgeType::Exact);
blockstore.purge_from_next_slots(start_slot, end_slot);
info!("Purging done, compacting db..");
if let Err(e) = blockstore.compact_storage(start_slot, end_slot) {
warn!(
Expand All @@ -672,22 +696,19 @@ fn backup_and_clear_blockstore(blockstore: &Arc<Blockstore>, ledger_path: &Path,
}
info!("done");
}
drop(blockstore);
}

fn wait_for_supermajority(
config: &ValidatorConfig,
bank: &Bank,
cluster_info: &ClusterInfo,
rpc_override_health_check: Arc<AtomicBool>,
blockstore: &Arc<Blockstore>,
ledger_path: &Path,
) {
if config.wait_for_supermajority != Some(bank.slot()) {
return;
}

backup_and_clear_blockstore(blockstore, ledger_path, bank.slot() + 1);

info!(
"Waiting for 80% of activated stake at slot {} to be in gossip...",
bank.slot()
Expand Down Expand Up @@ -962,23 +983,27 @@ mod tests {
use solana_ledger::{blockstore, entry};
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap());
let blockstore = Blockstore::open(&blockstore_path).unwrap();

let entries = entry::create_ticks(1, 0, Hash::default());

info!("creating shreds");
let mut last_print = Instant::now();
for i in 1..10 {
let entries = entry::create_ticks(1, 0, Hash::default());
let shreds = blockstore::entries_to_test_shreds(entries, i, i - 1, true, 1);
let shreds = blockstore::entries_to_test_shreds(entries.clone(), i, i - 1, true, 1);
blockstore.insert_shreds(shreds, None, true).unwrap();
if last_print.elapsed().as_millis() > 5000 {
info!("inserted {}", i);
last_print = Instant::now();
}
}
drop(blockstore);

backup_and_clear_blockstore(&blockstore, &blockstore_path, 5);
backup_and_clear_blockstore(&blockstore_path, 5, 2);

for i in 6..10 {
let blockstore = Blockstore::open(&blockstore_path).unwrap();
assert!(blockstore.meta(4).unwrap().unwrap().next_slots.is_empty());
for i in 5..10 {
assert!(blockstore
.get_data_shreds_for_slot(i, 0)
.unwrap()
Expand Down
6 changes: 4 additions & 2 deletions ledger/src/blockstore/blockstore_purge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ impl Blockstore {
let mut batch_start = from_slot;
let mut purge_stats = PurgeStats::default();
let mut last_datapoint = Instant::now();
let mut datapoint_start = batch_start;
while batch_start < to_slot {
let batch_end = (batch_start + PURGE_BATCH_SIZE).min(to_slot);

Expand All @@ -33,13 +34,14 @@ impl Blockstore {
if last_datapoint.elapsed().as_millis() > 1000 {
datapoint_info!(
"blockstore-purge",
("from_slot", batch_start as i64, i64),
("to_slot", to_slot as i64, i64),
("from_slot", datapoint_start as i64, i64),
("to_slot", batch_end as i64, i64),
("delete_range_us", purge_stats.delete_range as i64, i64),
("write_batch_us", purge_stats.write_batch as i64, i64)
);
last_datapoint = Instant::now();
purge_stats = PurgeStats::default();
datapoint_start = batch_end;
}

match purge_result {
Expand Down