Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Support opening an in-use rocksdb as secondary (bp #10209) #10382

Merged
merged 1 commit into from
Jun 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions genesis/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use solana_clap_utils::{
};
use solana_genesis::{genesis_accounts::add_genesis_accounts, Base64Account};
use solana_ledger::{
blockstore::create_new_ledger, hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
poh::compute_hashes_per_tick,
blockstore::create_new_ledger, blockstore_db::AccessType,
hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, poh::compute_hashes_per_tick,
};
use solana_sdk::{
account::Account,
Expand Down Expand Up @@ -540,6 +540,7 @@ fn main() -> Result<(), Box<dyn error::Error>> {
&ledger_path,
&genesis_config,
max_genesis_archive_unpacked_size,
AccessType::PrimaryOnly,
)?;

println!("{}", genesis_config);
Expand Down
1 change: 1 addition & 0 deletions ledger-tool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ bs58 = "0.3.1"
bytecount = "0.6.0"
clap = "2.33.1"
histogram = "*"
log = { version = "0.4.8" }
serde_json = "1.0.53"
serde_yaml = "0.8.12"
solana-clap-utils = { path = "../clap-utils", version = "1.2.1" }
Expand Down
102 changes: 76 additions & 26 deletions ledger-tool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use solana_ledger::{
bank_forks::{BankForks, SnapshotConfig},
bank_forks_utils,
blockstore::Blockstore,
blockstore_db::{self, Column, Database},
blockstore_db::{self, AccessType, Column, Database},
blockstore_processor::ProcessOptions,
hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE},
rooted_slot_iterator::RootedSlotIterator,
Expand All @@ -31,6 +31,8 @@ use std::{
str::FromStr,
};

use log::*;

#[derive(PartialEq)]
enum LedgerOutputMethod {
Print,
Expand Down Expand Up @@ -519,8 +521,8 @@ fn analyze_storage(database: &Database) -> Result<(), String> {
Ok(())
}

fn open_blockstore(ledger_path: &Path) -> Blockstore {
match Blockstore::open(ledger_path) {
fn open_blockstore(ledger_path: &Path, access_type: AccessType) -> Blockstore {
match Blockstore::open_with_access_type(ledger_path, access_type) {
Ok(blockstore) => blockstore,
Err(err) => {
eprintln!("Failed to open ledger at {:?}: {:?}", ledger_path, err);
Expand All @@ -529,8 +531,8 @@ fn open_blockstore(ledger_path: &Path) -> Blockstore {
}
}

fn open_database(ledger_path: &Path) -> Database {
match Database::open(&ledger_path.join("rocksdb")) {
fn open_database(ledger_path: &Path, access_type: AccessType) -> Database {
match Database::open(&ledger_path.join("rocksdb"), access_type) {
Ok(database) => database,
Err(err) => {
eprintln!("Unable to read the Ledger rocksdb: {:?}", err);
Expand All @@ -553,6 +555,7 @@ fn load_bank_forks(
ledger_path: &PathBuf,
genesis_config: &GenesisConfig,
process_options: ProcessOptions,
access_type: AccessType,
) -> bank_forks_utils::LoadResult {
let snapshot_config = if arg_matches.is_present("no_snapshot") {
None
Expand All @@ -564,15 +567,29 @@ fn load_bank_forks(
compression: CompressionType::Bzip2,
})
};
let blockstore = open_blockstore(&ledger_path, access_type);
let account_paths = if let Some(account_paths) = arg_matches.value_of("account_paths") {
if !blockstore.is_primary_access() {
// Be defenstive, when default account dir is explicitly specified, it's still possible
// to wipe the dir possibly shared by the running validator!
eprintln!("Error: custom accounts path is not supported under secondary access");
exit(1);
}
account_paths.split(',').map(PathBuf::from).collect()
} else {
} else if blockstore.is_primary_access() {
vec![ledger_path.join("accounts")]
} else {
let non_primary_accounts_path = ledger_path.join("accounts.ledger-tool");
warn!(
"Default accounts path is switched aligning with Blockstore's secondary access: {:?}",
non_primary_accounts_path
);
vec![non_primary_accounts_path]
};

bank_forks_utils::load(
&genesis_config,
&open_blockstore(&ledger_path),
&blockstore,
account_paths,
snapshot_config.as_ref(),
process_options,
Expand Down Expand Up @@ -862,7 +879,7 @@ fn main() {
let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot);
let allow_dead_slots = arg_matches.is_present("allow_dead_slots");
output_ledger(
open_blockstore(&ledger_path),
open_blockstore(&ledger_path, AccessType::TryPrimaryThenSecondary),
starting_slot,
allow_dead_slots,
LedgerOutputMethod::Print,
Expand All @@ -885,7 +902,13 @@ fn main() {
..ProcessOptions::default()
};
let genesis_config = open_genesis_config_by(&ledger_path, arg_matches);
match load_bank_forks(arg_matches, &ledger_path, &genesis_config, process_options) {
match load_bank_forks(
arg_matches,
&ledger_path,
&genesis_config,
process_options,
AccessType::TryPrimaryThenSecondary,
) {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
println!(
"{}",
Expand All @@ -904,7 +927,7 @@ fn main() {
("slot", Some(arg_matches)) => {
let slots = values_t_or_exit!(arg_matches, "slots", Slot);
let allow_dead_slots = arg_matches.is_present("allow_dead_slots");
let blockstore = open_blockstore(&ledger_path);
let blockstore = open_blockstore(&ledger_path, AccessType::TryPrimaryThenSecondary);
for slot in slots {
println!("Slot {}", slot);
if let Err(err) = output_slot(
Expand All @@ -921,15 +944,15 @@ fn main() {
let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot);
let allow_dead_slots = arg_matches.is_present("allow_dead_slots");
output_ledger(
open_blockstore(&ledger_path),
open_blockstore(&ledger_path, AccessType::TryPrimaryThenSecondary),
starting_slot,
allow_dead_slots,
LedgerOutputMethod::Json,
);
}
("set-dead-slot", Some(arg_matches)) => {
let slots = values_t_or_exit!(arg_matches, "slots", Slot);
let blockstore = open_blockstore(&ledger_path);
let blockstore = open_blockstore(&ledger_path, AccessType::PrimaryOnly);
for slot in slots {
match blockstore.set_dead_slot(slot) {
Ok(_) => println!("Slot {} dead", slot),
Expand All @@ -954,6 +977,7 @@ fn main() {
&ledger_path,
&open_genesis_config_by(&ledger_path, arg_matches),
process_options,
AccessType::TryPrimaryThenSecondary,
)
.unwrap_or_else(|err| {
eprintln!("Ledger verification failed: {:?}", err);
Expand All @@ -976,6 +1000,7 @@ fn main() {
&ledger_path,
&open_genesis_config_by(&ledger_path, arg_matches),
process_options,
AccessType::TryPrimaryThenSecondary,
) {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
let dot = graph_forks(&bank_forks, arg_matches.is_present("include_all_votes"));
Expand Down Expand Up @@ -1012,7 +1037,13 @@ fn main() {
..ProcessOptions::default()
};
let genesis_config = open_genesis_config_by(&ledger_path, arg_matches);
match load_bank_forks(arg_matches, &ledger_path, &genesis_config, process_options) {
match load_bank_forks(
arg_matches,
&ledger_path,
&genesis_config,
process_options,
AccessType::TryPrimaryThenSecondary,
) {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
let bank = bank_forks.get(snapshot_slot).unwrap_or_else(|| {
eprintln!("Error: Slot {} is not available", snapshot_slot);
Expand Down Expand Up @@ -1079,7 +1110,13 @@ fn main() {
};
let genesis_config = open_genesis_config_by(&ledger_path, arg_matches);
let include_sysvars = arg_matches.is_present("include_sysvars");
match load_bank_forks(arg_matches, &ledger_path, &genesis_config, process_options) {
match load_bank_forks(
arg_matches,
&ledger_path,
&genesis_config,
process_options,
AccessType::TryPrimaryThenSecondary,
) {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
let slot = bank_forks.working_bank().slot();
let bank = bank_forks.get(slot).unwrap_or_else(|| {
Expand Down Expand Up @@ -1121,7 +1158,13 @@ fn main() {
..ProcessOptions::default()
};
let genesis_config = open_genesis_config_by(&ledger_path, arg_matches);
match load_bank_forks(arg_matches, &ledger_path, &genesis_config, process_options) {
match load_bank_forks(
arg_matches,
&ledger_path,
&genesis_config,
process_options,
AccessType::TryPrimaryThenSecondary,
) {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
let slot = bank_forks.working_bank().slot();
let bank = bank_forks.get(slot).unwrap_or_else(|| {
Expand Down Expand Up @@ -1185,12 +1228,12 @@ fn main() {
("purge", Some(arg_matches)) => {
let start_slot = value_t_or_exit!(arg_matches, "start_slot", Slot);
let end_slot = value_t_or_exit!(arg_matches, "end_slot", Slot);
let blockstore = open_blockstore(&ledger_path);
let blockstore = open_blockstore(&ledger_path, AccessType::PrimaryOnly);
blockstore.purge_slots(start_slot, end_slot);
blockstore.purge_from_next_slots(start_slot, end_slot);
}
("list-roots", Some(arg_matches)) => {
let blockstore = open_blockstore(&ledger_path);
let blockstore = open_blockstore(&ledger_path, AccessType::TryPrimaryThenSecondary);
let max_height = if let Some(height) = arg_matches.value_of("max_height") {
usize::from_str(height).expect("Maximum height must be a number")
} else {
Expand Down Expand Up @@ -1243,7 +1286,9 @@ fn main() {
});
}
("bounds", Some(arg_matches)) => {
match open_blockstore(&ledger_path).slot_meta_iterator(0) {
match open_blockstore(&ledger_path, AccessType::TryPrimaryThenSecondary)
.slot_meta_iterator(0)
{
Ok(metas) => {
let all = arg_matches.is_present("all");

Expand All @@ -1269,15 +1314,20 @@ fn main() {
}
}
}
("analyze-storage", _) => match analyze_storage(&open_database(&ledger_path)) {
Ok(()) => {
println!("Ok.");
}
Err(err) => {
eprintln!("Unable to read the Ledger: {:?}", err);
exit(1);
("analyze-storage", _) => {
match analyze_storage(&open_database(
&ledger_path,
AccessType::TryPrimaryThenSecondary,
)) {
Ok(()) => {
println!("Ok.");
}
Err(err) => {
eprintln!("Unable to read the Ledger: {:?}", err);
exit(1);
}
}
},
}
("", _) => {
eprintln!("{}", matches.usage());
exit(1);
Expand Down
39 changes: 32 additions & 7 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
pub use crate::{blockstore_db::BlockstoreError, blockstore_meta::SlotMeta};
use crate::{
blockstore_db::{
columns as cf, Column, Database, IteratorDirection, IteratorMode, LedgerColumn, Result,
WriteBatch,
columns as cf, AccessType, Column, Database, IteratorDirection, IteratorMode, LedgerColumn,
Result, WriteBatch,
},
blockstore_meta::*,
entry::{create_ticks, Entry},
Expand Down Expand Up @@ -192,6 +192,17 @@ impl Blockstore {

/// Opens a Ledger in directory, provides "infinite" window of shreds
pub fn open(ledger_path: &Path) -> Result<Blockstore> {
Self::do_open(ledger_path, AccessType::PrimaryOnly)
}

pub fn open_with_access_type(
ledger_path: &Path,
access_type: AccessType,
) -> Result<Blockstore> {
Self::do_open(ledger_path, access_type)
}

fn do_open(ledger_path: &Path, access_type: AccessType) -> Result<Blockstore> {
fs::create_dir_all(&ledger_path)?;
let blockstore_path = ledger_path.join(BLOCKSTORE_DIRECTORY);

Expand All @@ -200,7 +211,7 @@ impl Blockstore {
// Open the database
let mut measure = Measure::start("open");
info!("Opening database at {:?}", blockstore_path);
let db = Database::open(&blockstore_path)?;
let db = Database::open(&blockstore_path, access_type)?;

// Create the metadata column family
let meta_cf = db.column();
Expand Down Expand Up @@ -282,7 +293,7 @@ impl Blockstore {
pub fn open_with_signal(
ledger_path: &Path,
) -> Result<(Self, Receiver<bool>, CompletedSlotsReceiver)> {
let mut blockstore = Self::open(ledger_path)?;
let mut blockstore = Self::open_with_access_type(ledger_path, AccessType::PrimaryOnly)?;
let (signal_sender, signal_receiver) = sync_channel(1);
let (completed_slots_sender, completed_slots_receiver) =
sync_channel(MAX_COMPLETED_SLOTS_IN_CHANNEL);
Expand Down Expand Up @@ -2176,6 +2187,10 @@ impl Blockstore {
pub fn storage_size(&self) -> Result<u64> {
self.db.storage_size()
}

pub fn is_primary_access(&self) -> bool {
self.db.is_primary_access()
}
}

fn update_slot_meta(
Expand Down Expand Up @@ -2608,12 +2623,13 @@ pub fn create_new_ledger(
ledger_path: &Path,
genesis_config: &GenesisConfig,
max_genesis_archive_unpacked_size: u64,
access_type: AccessType,
) -> Result<Hash> {
Blockstore::destroy(ledger_path)?;
genesis_config.write(&ledger_path)?;

// Fill slot 0 with ticks that link back to the genesis_config to bootstrap the ledger.
let blockstore = Blockstore::open(ledger_path)?;
let blockstore = Blockstore::open_with_access_type(ledger_path, access_type)?;
let ticks_per_slot = genesis_config.ticks_per_slot;
let hashes_per_tick = genesis_config.poh_config.hashes_per_tick.unwrap_or(0);
let entries = create_ticks(ticks_per_slot, hashes_per_tick, genesis_config.hash());
Expand Down Expand Up @@ -2744,7 +2760,11 @@ pub fn get_ledger_path_from_name(name: &str) -> PathBuf {
#[macro_export]
macro_rules! create_new_tmp_ledger {
($genesis_config:expr) => {
$crate::blockstore::create_new_ledger_from_name($crate::tmp_ledger_name!(), $genesis_config)
$crate::blockstore::create_new_ledger_from_name(
$crate::tmp_ledger_name!(),
$genesis_config,
$crate::blockstore_db::AccessType::PrimaryOnly,
)
};
}

Expand All @@ -2770,12 +2790,17 @@ pub fn verify_shred_slots(slot: Slot, parent_slot: Slot, last_root: Slot) -> boo
//
// Note: like `create_new_ledger` the returned ledger will have slot 0 full of ticks (and only
// ticks)
pub fn create_new_ledger_from_name(name: &str, genesis_config: &GenesisConfig) -> (PathBuf, Hash) {
pub fn create_new_ledger_from_name(
name: &str,
genesis_config: &GenesisConfig,
access_type: AccessType,
) -> (PathBuf, Hash) {
let ledger_path = get_ledger_path_from_name(name);
let blockhash = create_new_ledger(
&ledger_path,
genesis_config,
MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
access_type,
)
.unwrap();
(ledger_path, blockhash)
Expand Down
Loading