Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support opening an in-use rocksdb as secondary #10209

Merged
merged 12 commits into from
Jun 3, 2020
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions genesis/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use solana_clap_utils::{
};
use solana_genesis::{genesis_accounts::add_genesis_accounts, Base64Account};
use solana_ledger::{
blockstore::create_new_ledger, hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
poh::compute_hashes_per_tick,
blockstore::create_new_ledger, blockstore_db::AccessType,
hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, poh::compute_hashes_per_tick,
};
use solana_sdk::{
account::Account,
Expand Down Expand Up @@ -540,6 +540,7 @@ fn main() -> Result<(), Box<dyn error::Error>> {
&ledger_path,
&genesis_config,
max_genesis_archive_unpacked_size,
AccessType::OnlyPrimary,
)?;

println!("{}", genesis_config);
Expand Down
1 change: 1 addition & 0 deletions ledger-tool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ bs58 = "0.3.1"
bytecount = "0.6.0"
clap = "2.33.1"
histogram = "*"
log = { version = "0.4.8" }
serde_json = "1.0.53"
serde_yaml = "0.8.12"
solana-clap-utils = { path = "../clap-utils", version = "1.3.0" }
Expand Down
96 changes: 70 additions & 26 deletions ledger-tool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use solana_ledger::{
bank_forks::{BankForks, SnapshotConfig},
bank_forks_utils,
blockstore::Blockstore,
blockstore_db::{self, Column, Database},
blockstore_db::{self, AccessType, Column, Database},
blockstore_processor::ProcessOptions,
hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE},
rooted_slot_iterator::RootedSlotIterator,
Expand All @@ -31,6 +31,8 @@ use std::{
str::FromStr,
};

use log::*;

#[derive(PartialEq)]
enum LedgerOutputMethod {
Print,
Expand Down Expand Up @@ -519,8 +521,8 @@ fn analyze_storage(database: &Database) -> Result<(), String> {
Ok(())
}

fn open_blockstore(ledger_path: &Path) -> Blockstore {
match Blockstore::open(ledger_path) {
fn open_blockstore(ledger_path: &Path, access_type: AccessType) -> Blockstore {
match Blockstore::open_with_access_type(ledger_path, access_type) {
Ok(blockstore) => blockstore,
Err(err) => {
eprintln!("Failed to open ledger at {:?}: {:?}", ledger_path, err);
Expand All @@ -529,8 +531,8 @@ fn open_blockstore(ledger_path: &Path) -> Blockstore {
}
}

fn open_database(ledger_path: &Path) -> Database {
match Database::open(&ledger_path.join("rocksdb")) {
fn open_database(ledger_path: &Path, access_type: AccessType) -> Database {
match Database::open(&ledger_path.join("rocksdb"), access_type) {
Ok(database) => database,
Err(err) => {
eprintln!("Unable to read the Ledger rocksdb: {:?}", err);
Expand All @@ -553,6 +555,7 @@ fn load_bank_forks(
ledger_path: &PathBuf,
genesis_config: &GenesisConfig,
process_options: ProcessOptions,
access_type: AccessType,
) -> bank_forks_utils::LoadResult {
let snapshot_config = if arg_matches.is_present("no_snapshot") {
None
Expand All @@ -564,15 +567,28 @@ fn load_bank_forks(
compression: CompressionType::Bzip2,
})
};
let blockstore = open_blockstore(&ledger_path, access_type);
let account_paths = if let Some(account_paths) = arg_matches.value_of("account_paths") {
if !blockstore.is_primary_access() {
// Be defenstive, when default account dir is explicitly specified, it's still possible
// to wipe the dir possibly shared by the running validator!
panic!("custom accounts path is not supported under secondary access");
ryoqun marked this conversation as resolved.
Show resolved Hide resolved
}
account_paths.split(',').map(PathBuf::from).collect()
} else {
} else if blockstore.is_primary_access() {
vec![ledger_path.join("accounts")]
} else {
let non_primary_accounts_path = ledger_path.join("accounts.ledger-tool");
warn!(
"Default accounts path is switched aligning with Blockstore's secondary access: {:?}",
non_primary_accounts_path
);
vec![non_primary_accounts_path]
};

bank_forks_utils::load(
&genesis_config,
&open_blockstore(&ledger_path),
&blockstore,
account_paths,
snapshot_config.as_ref(),
process_options,
Expand Down Expand Up @@ -862,7 +878,7 @@ fn main() {
let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot);
let allow_dead_slots = arg_matches.is_present("allow_dead_slots");
output_ledger(
open_blockstore(&ledger_path),
open_blockstore(&ledger_path, AccessType::AllowSecondary),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: carefully audit for the appropriate AccessType for all of subcommands.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, I've tested all the subcommands and they worked as intended.

@mvines FYI, I found graph to be very useful to create a graph right from a still running validator's ledger. :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

totally, this is gonna be yuuuuge!

starting_slot,
allow_dead_slots,
LedgerOutputMethod::Print,
Expand All @@ -885,7 +901,13 @@ fn main() {
..ProcessOptions::default()
};
let genesis_config = open_genesis_config_by(&ledger_path, arg_matches);
match load_bank_forks(arg_matches, &ledger_path, &genesis_config, process_options) {
match load_bank_forks(
arg_matches,
&ledger_path,
&genesis_config,
process_options,
AccessType::AllowSecondary,
) {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
println!(
"{}",
Expand All @@ -904,7 +926,7 @@ fn main() {
("slot", Some(arg_matches)) => {
let slots = values_t_or_exit!(arg_matches, "slots", Slot);
let allow_dead_slots = arg_matches.is_present("allow_dead_slots");
let blockstore = open_blockstore(&ledger_path);
let blockstore = open_blockstore(&ledger_path, AccessType::AllowSecondary);
for slot in slots {
println!("Slot {}", slot);
if let Err(err) = output_slot(
Expand All @@ -921,15 +943,15 @@ fn main() {
let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot);
let allow_dead_slots = arg_matches.is_present("allow_dead_slots");
output_ledger(
open_blockstore(&ledger_path),
open_blockstore(&ledger_path, AccessType::AllowSecondary),
starting_slot,
allow_dead_slots,
LedgerOutputMethod::Json,
);
}
("set-dead-slot", Some(arg_matches)) => {
let slots = values_t_or_exit!(arg_matches, "slots", Slot);
let blockstore = open_blockstore(&ledger_path);
let blockstore = open_blockstore(&ledger_path, AccessType::OnlyPrimary);
for slot in slots {
match blockstore.set_dead_slot(slot) {
Ok(_) => println!("Slot {} dead", slot),
Expand All @@ -954,6 +976,7 @@ fn main() {
&ledger_path,
&open_genesis_config_by(&ledger_path, arg_matches),
process_options,
AccessType::AllowSecondary,
)
.unwrap_or_else(|err| {
eprintln!("Ledger verification failed: {:?}", err);
Expand All @@ -976,6 +999,7 @@ fn main() {
&ledger_path,
&open_genesis_config_by(&ledger_path, arg_matches),
process_options,
AccessType::AllowSecondary,
) {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
let dot = graph_forks(&bank_forks, arg_matches.is_present("include_all_votes"));
Expand Down Expand Up @@ -1012,7 +1036,13 @@ fn main() {
..ProcessOptions::default()
};
let genesis_config = open_genesis_config_by(&ledger_path, arg_matches);
match load_bank_forks(arg_matches, &ledger_path, &genesis_config, process_options) {
match load_bank_forks(
ryoqun marked this conversation as resolved.
Show resolved Hide resolved
arg_matches,
&ledger_path,
&genesis_config,
process_options,
AccessType::AllowSecondary,
) {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
let bank = bank_forks.get(snapshot_slot).unwrap_or_else(|| {
eprintln!("Error: Slot {} is not available", snapshot_slot);
Expand Down Expand Up @@ -1079,7 +1109,13 @@ fn main() {
};
let genesis_config = open_genesis_config_by(&ledger_path, arg_matches);
let include_sysvars = arg_matches.is_present("include_sysvars");
match load_bank_forks(arg_matches, &ledger_path, &genesis_config, process_options) {
match load_bank_forks(
arg_matches,
&ledger_path,
&genesis_config,
process_options,
AccessType::AllowSecondary,
) {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
let slot = bank_forks.working_bank().slot();
let bank = bank_forks.get(slot).unwrap_or_else(|| {
Expand Down Expand Up @@ -1121,7 +1157,13 @@ fn main() {
..ProcessOptions::default()
};
let genesis_config = open_genesis_config_by(&ledger_path, arg_matches);
match load_bank_forks(arg_matches, &ledger_path, &genesis_config, process_options) {
match load_bank_forks(
arg_matches,
&ledger_path,
&genesis_config,
process_options,
AccessType::AllowSecondary,
) {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
let slot = bank_forks.working_bank().slot();
let bank = bank_forks.get(slot).unwrap_or_else(|| {
Expand Down Expand Up @@ -1185,12 +1227,12 @@ fn main() {
("purge", Some(arg_matches)) => {
let start_slot = value_t_or_exit!(arg_matches, "start_slot", Slot);
let end_slot = value_t_or_exit!(arg_matches, "end_slot", Slot);
let blockstore = open_blockstore(&ledger_path);
let blockstore = open_blockstore(&ledger_path, AccessType::OnlyPrimary);
blockstore.purge_slots(start_slot, end_slot);
blockstore.purge_from_next_slots(start_slot, end_slot);
}
("list-roots", Some(arg_matches)) => {
let blockstore = open_blockstore(&ledger_path);
let blockstore = open_blockstore(&ledger_path, AccessType::AllowSecondary);
let max_height = if let Some(height) = arg_matches.value_of("max_height") {
usize::from_str(height).expect("Maximum height must be a number")
} else {
Expand Down Expand Up @@ -1243,7 +1285,7 @@ fn main() {
});
}
("bounds", Some(arg_matches)) => {
match open_blockstore(&ledger_path).slot_meta_iterator(0) {
match open_blockstore(&ledger_path, AccessType::AllowSecondary).slot_meta_iterator(0) {
Ok(metas) => {
let all = arg_matches.is_present("all");

Expand All @@ -1269,15 +1311,17 @@ fn main() {
}
}
}
("analyze-storage", _) => match analyze_storage(&open_database(&ledger_path)) {
Ok(()) => {
println!("Ok.");
}
Err(err) => {
eprintln!("Unable to read the Ledger: {:?}", err);
exit(1);
("analyze-storage", _) => {
match analyze_storage(&open_database(&ledger_path, AccessType::AllowSecondary)) {
Ok(()) => {
println!("Ok.");
}
Err(err) => {
eprintln!("Unable to read the Ledger: {:?}", err);
exit(1);
}
}
},
}
("", _) => {
eprintln!("{}", matches.usage());
exit(1);
Expand Down
53 changes: 46 additions & 7 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
pub use crate::{blockstore_db::BlockstoreError, blockstore_meta::SlotMeta};
use crate::{
blockstore_db::{
columns as cf, Column, Database, IteratorDirection, IteratorMode, LedgerColumn, Result,
WriteBatch,
columns as cf, AccessType, Column, Database, IteratorDirection, IteratorMode, LedgerColumn,
Result, WriteBatch,
},
blockstore_meta::*,
entry::{create_ticks, Entry},
Expand Down Expand Up @@ -177,6 +177,17 @@ impl Blockstore {

/// Opens a Ledger in directory, provides "infinite" window of shreds
pub fn open(ledger_path: &Path) -> Result<Blockstore> {
Self::do_open(ledger_path, AccessType::OnlyPrimary)
}

pub fn open_with_access_type(
ledger_path: &Path,
access_type: AccessType,
) -> Result<Blockstore> {
Self::do_open(ledger_path, access_type)
}

pub fn do_open(ledger_path: &Path, access_type: AccessType) -> Result<Blockstore> {
fs::create_dir_all(&ledger_path)?;
let blockstore_path = ledger_path.join(BLOCKSTORE_DIRECTORY);

Expand All @@ -185,7 +196,7 @@ impl Blockstore {
// Open the database
let mut measure = Measure::start("open");
info!("Opening database at {:?}", blockstore_path);
let db = Database::open(&blockstore_path)?;
let db = Database::open(&blockstore_path, access_type)?;

// Create the metadata column family
let meta_cf = db.column();
Expand Down Expand Up @@ -267,7 +278,21 @@ impl Blockstore {
pub fn open_with_signal(
ledger_path: &Path,
) -> Result<(Self, Receiver<bool>, CompletedSlotsReceiver)> {
let mut blockstore = Self::open(ledger_path)?;
Self::do_open_with_signal(ledger_path, AccessType::OnlyPrimary)
}

pub fn open_with_signal_with_access_type(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn open_with_signal_with_access_type(
pub fn open_with_signal_and_access_type(

ledger_path: &Path,
access_type: AccessType,
) -> Result<(Self, Receiver<bool>, CompletedSlotsReceiver)> {
Self::do_open_with_signal(ledger_path, access_type)
}

pub fn do_open_with_signal(
ledger_path: &Path,
access_type: AccessType,
) -> Result<(Self, Receiver<bool>, CompletedSlotsReceiver)> {
let mut blockstore = Self::open_with_access_type(ledger_path, access_type)?;
let (signal_sender, signal_receiver) = sync_channel(1);
let (completed_slots_sender, completed_slots_receiver) =
sync_channel(MAX_COMPLETED_SLOTS_IN_CHANNEL);
Expand Down Expand Up @@ -2355,6 +2380,10 @@ impl Blockstore {
pub fn storage_size(&self) -> Result<u64> {
self.db.storage_size()
}

pub fn is_primary_access(&self) -> bool {
self.db.is_primary_access()
}
}

fn update_slot_meta(
Expand Down Expand Up @@ -2787,12 +2816,13 @@ pub fn create_new_ledger(
ledger_path: &Path,
genesis_config: &GenesisConfig,
max_genesis_archive_unpacked_size: u64,
access_type: AccessType,
) -> Result<Hash> {
Blockstore::destroy(ledger_path)?;
genesis_config.write(&ledger_path)?;

// Fill slot 0 with ticks that link back to the genesis_config to bootstrap the ledger.
let blockstore = Blockstore::open(ledger_path)?;
let blockstore = Blockstore::open_with_access_type(ledger_path, access_type)?;
let ticks_per_slot = genesis_config.ticks_per_slot;
let hashes_per_tick = genesis_config.poh_config.hashes_per_tick.unwrap_or(0);
let entries = create_ticks(ticks_per_slot, hashes_per_tick, genesis_config.hash());
Expand Down Expand Up @@ -2923,7 +2953,11 @@ pub fn get_ledger_path_from_name(name: &str) -> PathBuf {
#[macro_export]
macro_rules! create_new_tmp_ledger {
($genesis_config:expr) => {
$crate::blockstore::create_new_ledger_from_name($crate::tmp_ledger_name!(), $genesis_config)
$crate::blockstore::create_new_ledger_from_name(
$crate::tmp_ledger_name!(),
$genesis_config,
$crate::blockstore_db::AccessType::OnlyPrimary,
)
};
}

Expand All @@ -2949,12 +2983,17 @@ pub fn verify_shred_slots(slot: Slot, parent_slot: Slot, last_root: Slot) -> boo
//
// Note: like `create_new_ledger` the returned ledger will have slot 0 full of ticks (and only
// ticks)
pub fn create_new_ledger_from_name(name: &str, genesis_config: &GenesisConfig) -> (PathBuf, Hash) {
pub fn create_new_ledger_from_name(
name: &str,
genesis_config: &GenesisConfig,
access_type: AccessType,
) -> (PathBuf, Hash) {
let ledger_path = get_ledger_path_from_name(name);
let blockhash = create_new_ledger(
&ledger_path,
genesis_config,
MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
access_type,
)
.unwrap();
(ledger_path, blockhash)
Expand Down
Loading