Skip to content

Commit

Permalink
Support opening an in-use rocksdb as secondary
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed May 24, 2020
1 parent 7373163 commit 3be691a
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 59 deletions.
4 changes: 3 additions & 1 deletion core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use solana_ledger::{
bank_forks::{BankForks, SnapshotConfig},
bank_forks_utils,
blockstore::{Blockstore, CompletedSlotsReceiver},
blockstore_db::AccessType,
blockstore_processor, create_new_tmp_ledger,
hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE},
leader_schedule::FixedSchedule,
Expand Down Expand Up @@ -581,7 +582,8 @@ fn new_banks_from_blockstore(
}

let (mut blockstore, ledger_signal_receiver, completed_slots_receiver) =
Blockstore::open_with_signal(blockstore_path).expect("Failed to open ledger database");
Blockstore::open_with_signal(blockstore_path, AccessType::OnlyPrimary)
.expect("Failed to open ledger database");
blockstore.set_no_compaction(config.no_rocksdb_compaction);

let process_options = blockstore_processor::ProcessOptions {
Expand Down
5 changes: 3 additions & 2 deletions genesis/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use solana_clap_utils::{
};
use solana_genesis::{genesis_accounts::add_genesis_accounts, Base64Account};
use solana_ledger::{
blockstore::create_new_ledger, hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
poh::compute_hashes_per_tick,
blockstore::create_new_ledger, blockstore_db::AccessType,
hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, poh::compute_hashes_per_tick,
};
use solana_sdk::{
account::Account,
Expand Down Expand Up @@ -540,6 +540,7 @@ fn main() -> Result<(), Box<dyn error::Error>> {
&ledger_path,
&genesis_config,
max_genesis_archive_unpacked_size,
AccessType::OnlyPrimary,
)?;

println!("{}", genesis_config);
Expand Down
90 changes: 63 additions & 27 deletions ledger-tool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use solana_ledger::{
bank_forks::{BankForks, SnapshotConfig},
bank_forks_utils,
blockstore::Blockstore,
blockstore_db::{self, Column, Database},
blockstore_db::{self, AccessType, Column, Database},
blockstore_processor::ProcessOptions,
hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE},
rooted_slot_iterator::RootedSlotIterator,
Expand Down Expand Up @@ -519,8 +519,8 @@ fn analyze_storage(database: &Database) -> Result<(), String> {
Ok(())
}

fn open_blockstore(ledger_path: &Path) -> Blockstore {
match Blockstore::open(ledger_path) {
fn open_blockstore(ledger_path: &Path, access_type: AccessType) -> Blockstore {
match Blockstore::open(ledger_path, access_type) {
Ok(blockstore) => blockstore,
Err(err) => {
eprintln!("Failed to open ledger at {:?}: {:?}", ledger_path, err);
Expand All @@ -529,8 +529,8 @@ fn open_blockstore(ledger_path: &Path) -> Blockstore {
}
}

fn open_database(ledger_path: &Path) -> Database {
match Database::open(&ledger_path.join("rocksdb")) {
fn open_database(ledger_path: &Path, access_type: AccessType) -> Database {
match Database::open(&ledger_path.join("rocksdb"), access_type) {
Ok(database) => database,
Err(err) => {
eprintln!("Unable to read the Ledger rocksdb: {:?}", err);
Expand All @@ -553,6 +553,7 @@ fn load_bank_forks(
ledger_path: &PathBuf,
genesis_config: &GenesisConfig,
process_options: ProcessOptions,
access_type: AccessType,
) -> bank_forks_utils::LoadResult {
let snapshot_config = if arg_matches.is_present("no_snapshot") {
None
Expand All @@ -564,15 +565,22 @@ fn load_bank_forks(
compression: CompressionType::Bzip2,
})
};
let blockstore = open_blockstore(&ledger_path, access_type);
let account_paths = if let Some(account_paths) = arg_matches.value_of("account_paths") {
account_paths.split(',').map(PathBuf::from).collect()
} else {
vec![ledger_path.join("accounts")]
if blockstore.is_primary_access() {
vec![ledger_path.join("accounts")]
} else {
let non_primary_accounts_path = ledger_path.join("accounts.ledger-tool");
eprintln!("Default accounts path is switched aligning with Blockstore's non-primary access: {:?}", non_primary_accounts_path);
vec![non_primary_accounts_path]
}
};

bank_forks_utils::load(
&genesis_config,
&open_blockstore(&ledger_path),
&blockstore,
account_paths,
snapshot_config.as_ref(),
process_options,
Expand Down Expand Up @@ -873,7 +881,7 @@ fn main() {
let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot);
let allow_dead_slots = arg_matches.is_present("allow_dead_slots");
output_ledger(
open_blockstore(&ledger_path),
open_blockstore(&ledger_path, AccessType::AllowSecondary),
starting_slot,
allow_dead_slots,
LedgerOutputMethod::Print,
Expand All @@ -896,7 +904,13 @@ fn main() {
..ProcessOptions::default()
};
let genesis_config = open_genesis_config_by(&ledger_path, arg_matches);
match load_bank_forks(arg_matches, &ledger_path, &genesis_config, process_options) {
match load_bank_forks(
arg_matches,
&ledger_path,
&genesis_config,
process_options,
AccessType::AllowSecondary,
) {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
println!(
"{}",
Expand All @@ -915,7 +929,7 @@ fn main() {
("slot", Some(arg_matches)) => {
let slots = values_t_or_exit!(arg_matches, "slots", Slot);
let allow_dead_slots = arg_matches.is_present("allow_dead_slots");
let blockstore = open_blockstore(&ledger_path);
let blockstore = open_blockstore(&ledger_path, AccessType::AllowSecondary);
for slot in slots {
println!("Slot {}", slot);
if let Err(err) = output_slot(
Expand All @@ -932,15 +946,15 @@ fn main() {
let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot);
let allow_dead_slots = arg_matches.is_present("allow_dead_slots");
output_ledger(
open_blockstore(&ledger_path),
open_blockstore(&ledger_path, AccessType::AllowSecondary),
starting_slot,
allow_dead_slots,
LedgerOutputMethod::Json,
);
}
("set-dead-slot", Some(arg_matches)) => {
let slots = values_t_or_exit!(arg_matches, "slots", Slot);
let blockstore = open_blockstore(&ledger_path);
let blockstore = open_blockstore(&ledger_path, AccessType::OnlyPrimary);
for slot in slots {
match blockstore.set_dead_slot(slot) {
Ok(_) => println!("Slot {} dead", slot),
Expand All @@ -965,6 +979,7 @@ fn main() {
&ledger_path,
&open_genesis_config_by(&ledger_path, arg_matches),
process_options,
AccessType::AllowSecondary,
)
.unwrap_or_else(|err| {
eprintln!("Ledger verification failed: {:?}", err);
Expand All @@ -987,6 +1002,7 @@ fn main() {
&ledger_path,
&open_genesis_config_by(&ledger_path, arg_matches),
process_options,
AccessType::AllowSecondary,
) {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
let dot = graph_forks(&bank_forks, arg_matches.is_present("include_all_votes"));
Expand Down Expand Up @@ -1023,7 +1039,13 @@ fn main() {
..ProcessOptions::default()
};
let genesis_config = open_genesis_config_by(&ledger_path, arg_matches);
match load_bank_forks(arg_matches, &ledger_path, &genesis_config, process_options) {
match load_bank_forks(
arg_matches,
&ledger_path,
&genesis_config,
process_options,
AccessType::AllowSecondary,
) {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
let bank = bank_forks.get(snapshot_slot).unwrap_or_else(|| {
eprintln!("Error: Slot {} is not available", snapshot_slot);
Expand Down Expand Up @@ -1088,7 +1110,13 @@ fn main() {
};
let genesis_config = open_genesis_config_by(&ledger_path, arg_matches);
let include_sysvars = arg_matches.is_present("include_sysvars");
match load_bank_forks(arg_matches, &ledger_path, &genesis_config, process_options) {
match load_bank_forks(
arg_matches,
&ledger_path,
&genesis_config,
process_options,
AccessType::AllowSecondary,
) {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
let slot = bank_forks.working_bank().slot();
let bank = bank_forks.get(slot).unwrap_or_else(|| {
Expand Down Expand Up @@ -1130,7 +1158,13 @@ fn main() {
..ProcessOptions::default()
};
let genesis_config = open_genesis_config_by(&ledger_path, arg_matches);
match load_bank_forks(arg_matches, &ledger_path, &genesis_config, process_options) {
match load_bank_forks(
arg_matches,
&ledger_path,
&genesis_config,
process_options,
AccessType::AllowSecondary,
) {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
let slot = bank_forks.working_bank().slot();
let bank = bank_forks.get(slot).unwrap_or_else(|| {
Expand Down Expand Up @@ -1195,12 +1229,12 @@ fn main() {
let start_slot = value_t_or_exit!(arg_matches, "start_slot", Slot);
let end_slot = value_t!(arg_matches, "end_slot", Slot);
let end_slot = end_slot.map_or(None, Some);
let blockstore = open_blockstore(&ledger_path);
let blockstore = open_blockstore(&ledger_path, AccessType::OnlyPrimary);
blockstore.purge_slots(start_slot, end_slot);
}
("prune", Some(arg_matches)) => {
if let Some(prune_file_path) = arg_matches.value_of("slot_list") {
let blockstore = open_blockstore(&ledger_path);
let blockstore = open_blockstore(&ledger_path, AccessType::OnlyPrimary);
let prune_file = File::open(prune_file_path.to_string()).unwrap();
let slot_hashes: BTreeMap<u64, String> =
serde_yaml::from_reader(prune_file).unwrap();
Expand Down Expand Up @@ -1236,7 +1270,7 @@ fn main() {
}
}
("list-roots", Some(arg_matches)) => {
let blockstore = open_blockstore(&ledger_path);
let blockstore = open_blockstore(&ledger_path, AccessType::AllowSecondary);
let max_height = if let Some(height) = arg_matches.value_of("max_height") {
usize::from_str(height).expect("Maximum height must be a number")
} else {
Expand Down Expand Up @@ -1289,7 +1323,7 @@ fn main() {
});
}
("bounds", Some(arg_matches)) => {
match open_blockstore(&ledger_path).slot_meta_iterator(0) {
match open_blockstore(&ledger_path, AccessType::AllowSecondary).slot_meta_iterator(0) {
Ok(metas) => {
let all = arg_matches.is_present("all");

Expand All @@ -1315,15 +1349,17 @@ fn main() {
}
}
}
("analyze-storage", _) => match analyze_storage(&open_database(&ledger_path)) {
Ok(()) => {
println!("Ok.");
}
Err(err) => {
eprintln!("Unable to read the Ledger: {:?}", err);
exit(1);
("analyze-storage", _) => {
match analyze_storage(&open_database(&ledger_path, AccessType::AllowSecondary)) {
Ok(()) => {
println!("Ok.");
}
Err(err) => {
eprintln!("Unable to read the Ledger: {:?}", err);
exit(1);
}
}
},
}
("", _) => {
eprintln!("{}", matches.usage());
exit(1);
Expand Down
31 changes: 23 additions & 8 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
pub use crate::{blockstore_db::BlockstoreError, blockstore_meta::SlotMeta};
use crate::{
blockstore_db::{
columns as cf, Column, Database, IteratorDirection, IteratorMode, LedgerColumn, Result,
WriteBatch,
columns as cf, AccessType, Column, Database, IteratorDirection, IteratorMode, LedgerColumn,
Result, WriteBatch,
},
blockstore_meta::*,
entry::{create_ticks, Entry},
Expand Down Expand Up @@ -176,7 +176,7 @@ impl Blockstore {
}

/// Opens a Ledger in directory, provides "infinite" window of shreds
pub fn open(ledger_path: &Path) -> Result<Blockstore> {
pub fn open(ledger_path: &Path, access_type: AccessType) -> Result<Blockstore> {
fs::create_dir_all(&ledger_path)?;
let blockstore_path = ledger_path.join(BLOCKSTORE_DIRECTORY);

Expand All @@ -185,7 +185,7 @@ impl Blockstore {
// Open the database
let mut measure = Measure::start("open");
info!("Opening database at {:?}", blockstore_path);
let db = Database::open(&blockstore_path)?;
let db = Database::open(&blockstore_path, access_type)?;

// Create the metadata column family
let meta_cf = db.column();
Expand Down Expand Up @@ -266,8 +266,9 @@ impl Blockstore {

pub fn open_with_signal(
ledger_path: &Path,
access_type: AccessType,
) -> Result<(Self, Receiver<bool>, CompletedSlotsReceiver)> {
let mut blockstore = Self::open(ledger_path)?;
let mut blockstore = Self::open(ledger_path, access_type)?;
let (signal_sender, signal_receiver) = sync_channel(1);
let (completed_slots_sender, completed_slots_receiver) =
sync_channel(MAX_COMPLETED_SLOTS_IN_CHANNEL);
Expand Down Expand Up @@ -2316,6 +2317,10 @@ impl Blockstore {
pub fn storage_size(&self) -> Result<u64> {
self.db.storage_size()
}

pub fn is_primary_access(&self) -> bool {
self.db.is_primary_access()
}
}

fn update_slot_meta(
Expand Down Expand Up @@ -2748,12 +2753,13 @@ pub fn create_new_ledger(
ledger_path: &Path,
genesis_config: &GenesisConfig,
max_genesis_archive_unpacked_size: u64,
access_type: AccessType,
) -> Result<Hash> {
Blockstore::destroy(ledger_path)?;
genesis_config.write(&ledger_path)?;

// Fill slot 0 with ticks that link back to the genesis_config to bootstrap the ledger.
let blockstore = Blockstore::open(ledger_path)?;
let blockstore = Blockstore::open(ledger_path, access_type)?;
let ticks_per_slot = genesis_config.ticks_per_slot;
let hashes_per_tick = genesis_config.poh_config.hashes_per_tick.unwrap_or(0);
let entries = create_ticks(ticks_per_slot, hashes_per_tick, genesis_config.hash());
Expand Down Expand Up @@ -2884,7 +2890,11 @@ pub fn get_ledger_path_from_name(name: &str) -> PathBuf {
#[macro_export]
macro_rules! create_new_tmp_ledger {
($genesis_config:expr) => {
$crate::blockstore::create_new_ledger_from_name($crate::tmp_ledger_name!(), $genesis_config)
$crate::blockstore::create_new_ledger_from_name(
$crate::tmp_ledger_name!(),
$genesis_config,
AccessType::OnlyPrimary,
)
};
}

Expand All @@ -2910,12 +2920,17 @@ pub fn verify_shred_slots(slot: Slot, parent_slot: Slot, last_root: Slot) -> boo
//
// Note: like `create_new_ledger` the returned ledger will have slot 0 full of ticks (and only
// ticks)
pub fn create_new_ledger_from_name(name: &str, genesis_config: &GenesisConfig) -> (PathBuf, Hash) {
pub fn create_new_ledger_from_name(
name: &str,
genesis_config: &GenesisConfig,
access_type: AccessType,
) -> (PathBuf, Hash) {
let ledger_path = get_ledger_path_from_name(name);
let blockhash = create_new_ledger(
&ledger_path,
genesis_config,
MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
access_type,
)
.unwrap();
(ledger_path, blockhash)
Expand Down
Loading

0 comments on commit 3be691a

Please sign in to comment.