Skip to content

Commit

Permalink
Make sure to root local slots even with hard fork
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Sep 23, 2021
1 parent 7b365c5 commit f616d28
Show file tree
Hide file tree
Showing 6 changed files with 411 additions and 40 deletions.
67 changes: 49 additions & 18 deletions core/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,7 @@ impl Tower {
if let Some(last_voted_slot) = self.last_voted_slot() {
if tower_root <= replayed_root {
// Normally, we goes into this clause with possible help of
// reconcile_blockstore_roots_with_tower()
// reconcile_blockstore_roots_with_external_source()
if slot_history.check(last_voted_slot) == Check::TooOld {
// We could try hard to anchor with other older votes, but opt to simplify the
// following logic
Expand Down Expand Up @@ -1221,45 +1221,61 @@ impl TowerError {
}
}

#[derive(Debug)]
pub enum ExternalRootSource {
Tower(Slot),
HardFork(Slot),
}

impl ExternalRootSource {
fn root(&self) -> Slot {
match self {
ExternalRootSource::Tower(slot) => *slot,
ExternalRootSource::HardFork(slot) => *slot,
}
}
}

// Given an untimely crash, tower may have roots that are not reflected in blockstore,
// or the reverse of this.
// That's because we don't impose any ordering guarantee or any kind of write barriers
// between tower (plain old POSIX fs calls) and blockstore (through RocksDB), when
// `ReplayState::handle_votable_bank()` saves tower before setting blockstore roots.
pub fn reconcile_blockstore_roots_with_tower(
tower: &Tower,
pub fn reconcile_blockstore_roots_with_external_source(
external_source: ExternalRootSource,
blockstore: &Blockstore,
last_blockstore_root: &mut Slot,
) -> blockstore_db::Result<()> {
let tower_root = tower.root();
let last_blockstore_root = blockstore.last_root();
if last_blockstore_root < tower_root {
// Ensure tower_root itself to exist and be marked as rooted in the blockstore
let external_root = external_source.root();
if *last_blockstore_root < external_root {
// Ensure external_root itself to exist and be marked as rooted in the blockstore
// in addition to its ancestors.
let new_roots: Vec<_> = AncestorIterator::new_inclusive(tower_root, blockstore)
.take_while(|current| match current.cmp(&last_blockstore_root) {
let new_roots: Vec<_> = AncestorIterator::new_inclusive(external_root, blockstore)
.take_while(|current| match current.cmp(last_blockstore_root) {
Ordering::Greater => true,
Ordering::Equal => false,
Ordering::Less => panic!(
"couldn't find a last_blockstore_root upwards from: {}!?",
tower_root
external_root
),
})
.collect();
if !new_roots.is_empty() {
info!(
"Reconciling slots as root based on tower root: {:?} ({}..{}) ",
new_roots, tower_root, last_blockstore_root
"Reconciling slots as root based on external root: {:?} ({}..{}) ",
new_roots, external_root, last_blockstore_root
);
blockstore.set_roots(new_roots.iter())?;
*last_blockstore_root = blockstore.last_root();
} else {
// This indicates we're in bad state; but still don't panic here.
// That's because we might have a chance of recovering properly with
// newer snapshot.
warn!(
"Couldn't find any ancestor slots from tower root ({}) \
"Couldn't find any ancestor slots from external source ({:?}) \
towards blockstore root ({}); blockstore pruned or only \
tower moved into new ledger?",
tower_root, last_blockstore_root,
tower moved into new ledger or just hard fork?",
external_source, last_blockstore_root,
);
}
}
Expand Down Expand Up @@ -2737,7 +2753,12 @@ pub mod test {

let mut tower = Tower::default();
tower.vote_state.root_slot = Some(4);
reconcile_blockstore_roots_with_tower(&tower, &blockstore).unwrap();
reconcile_blockstore_roots_with_external_source(
ExternalRootSource::Tower(tower.root()),
&blockstore,
&mut blockstore.last_root(),
)
.unwrap();

assert!(!blockstore.is_root(0));
assert!(blockstore.is_root(1));
Expand Down Expand Up @@ -2769,7 +2790,12 @@ pub mod test {

let mut tower = Tower::default();
tower.vote_state.root_slot = Some(4);
reconcile_blockstore_roots_with_tower(&tower, &blockstore).unwrap();
reconcile_blockstore_roots_with_external_source(
ExternalRootSource::Tower(tower.root()),
&blockstore,
&mut blockstore.last_root(),
)
.unwrap();
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
Expand All @@ -2792,7 +2818,12 @@ pub mod test {
let mut tower = Tower::default();
tower.vote_state.root_slot = Some(4);
assert_eq!(blockstore.last_root(), 0);
reconcile_blockstore_roots_with_tower(&tower, &blockstore).unwrap();
reconcile_blockstore_roots_with_external_source(
ExternalRootSource::Tower(tower.root()),
&blockstore,
&mut blockstore.last_root(),
)
.unwrap();
assert_eq!(blockstore.last_root(), 0);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
Expand Down
43 changes: 38 additions & 5 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use {
cache_block_meta_service::{CacheBlockMetaSender, CacheBlockMetaService},
cluster_info_vote_listener::VoteTracker,
completed_data_sets_service::CompletedDataSetsService,
consensus::{reconcile_blockstore_roots_with_tower, Tower},
consensus::{reconcile_blockstore_roots_with_external_source, ExternalRootSource, Tower},
cost_model::CostModel,
rewards_recorder_service::{RewardsRecorderSender, RewardsRecorderService},
sample_performance_service::SamplePerformanceService,
Expand Down Expand Up @@ -158,6 +158,7 @@ pub struct ValidatorConfig {
pub validator_exit: Arc<RwLock<Exit>>,
pub no_wait_for_vote_to_start_leader: bool,
pub accounts_shrink_ratio: AccountShrinkThreshold,
pub no_hard_fork_blockstore_root_reconcilation_for_local_cluster_test: bool,
}

impl Default for ValidatorConfig {
Expand Down Expand Up @@ -217,6 +218,7 @@ impl Default for ValidatorConfig {
no_wait_for_vote_to_start_leader: true,
accounts_shrink_ratio: AccountShrinkThreshold::default(),
accounts_db_config: None,
no_hard_fork_blockstore_root_reconcilation_for_local_cluster_test: false,
}
}
}
Expand Down Expand Up @@ -1005,14 +1007,16 @@ fn post_process_restored_tower(
.and_then(|tower| {
let root_bank = bank_forks.root_bank();
let slot_history = root_bank.get_slot_history();
// make sure tower isn't corrupted first before the following hard fork check
let tower = tower.adjust_lockouts_after_replay(root_bank.slot(), &slot_history);

// detect cluster-wide restart (hard fork) indirectly via wait_for_supermajority...
if let Some(wait_slot_for_supermajority) = config.wait_for_supermajority {
if root_bank.slot() == wait_slot_for_supermajority {
if wait_slot_for_supermajority == root_bank.slot() {
// intentionally fail to restore tower; we're supposedly in a new hard fork; past
// out-of-chain vote state doesn't make sense at all
// what if --wait-for-supermajority again if the validator restarted?
let message = format!("Hardfork is detected; discarding tower restoration result: {:?}", tower);
let message = format!("Hard fork is detected; discarding tower restoration result: {:?}", tower);
datapoint_error!(
"tower_error",
(
Expand Down Expand Up @@ -1143,11 +1147,22 @@ fn new_banks_from_ledger(
)
.expect("Failed to open ledger database");
blockstore.set_no_compaction(config.no_rocksdb_compaction);
// following boot sequence (esp BankForks) could set root. so stash the original value
// of blockstore root away here as soon as possible.
let mut last_blockstore_root = blockstore.last_root();

let restored_tower = Tower::restore(config.tower_storage.as_ref(), validator_identity);
if let Ok(tower) = &restored_tower {
reconcile_blockstore_roots_with_tower(tower, &blockstore).unwrap_or_else(|err| {
error!("Failed to reconcile blockstore with tower: {:?}", err);
reconcile_blockstore_roots_with_external_source(
ExternalRootSource::Tower(tower.root()),
&blockstore,
&mut last_blockstore_root,
)
.unwrap_or_else(|err| {
error!(
"Failed to reconcile blockstore according to tower: {:?}",
err
);
abort()
});
}
Expand Down Expand Up @@ -1266,6 +1281,24 @@ fn new_banks_from_ledger(
);
}

if let Some(wait_slot_for_supermajority) = config.wait_for_supermajority {
if wait_slot_for_supermajority == bank_forks.root_bank().slot()
&& !config.no_hard_fork_blockstore_root_reconcilation_for_local_cluster_test
{
reconcile_blockstore_roots_with_external_source(
ExternalRootSource::HardFork(wait_slot_for_supermajority),
&blockstore,
&mut last_blockstore_root,
)
.unwrap_or_else(|err| {
error!(
"Failed to reconcile blockstore according to hard fork: {:?}",
err
);
abort()
});
}
}
let tower = post_process_restored_tower(
restored_tower,
validator_identity,
Expand Down
19 changes: 17 additions & 2 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,13 +576,28 @@ impl Blockstore {
Ok(slot_iterator.take_while(move |((shred_slot, _), _)| *shred_slot == slot))
}

pub fn rooted_slot_iterator(&self, slot: Slot) -> Result<impl Iterator<Item = u64> + '_> {
fn prepare_rooted_slot_iterator(
&self,
slot: Slot,
direction: IteratorDirection,
) -> Result<impl Iterator<Item = Slot> + '_> {
let slot_iterator = self
.db
.iter::<cf::Root>(IteratorMode::From(slot, IteratorDirection::Forward))?;
.iter::<cf::Root>(IteratorMode::From(slot, direction))?;
Ok(slot_iterator.map(move |(rooted_slot, _)| rooted_slot))
}

pub fn rooted_slot_iterator(&self, slot: Slot) -> Result<impl Iterator<Item = Slot> + '_> {
self.prepare_rooted_slot_iterator(slot, IteratorDirection::Forward)
}

pub fn reversed_rooted_slot_iterator(
&self,
slot: Slot,
) -> Result<impl Iterator<Item = Slot> + '_> {
self.prepare_rooted_slot_iterator(slot, IteratorDirection::Reverse)
}

fn get_recovery_data_shreds(
index: &mut Index,
set_index: u64,
Expand Down
60 changes: 47 additions & 13 deletions local-cluster/src/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@ use {
gossip_service::discover_cluster,
},
solana_ledger::create_new_tmp_ledger,
solana_runtime::genesis_utils::{
create_genesis_config_with_vote_accounts_and_cluster_type, GenesisConfigInfo,
ValidatorVoteKeypairs,
solana_runtime::{
genesis_utils::{
create_genesis_config_with_vote_accounts_and_cluster_type, GenesisConfigInfo,
ValidatorVoteKeypairs,
},
snapshot_config::SnapshotConfig,
},
solana_sdk::{
account::Account,
account::AccountSharedData,
client::SyncClient,
clock::{DEFAULT_DEV_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT},
clock::{Slot, DEFAULT_DEV_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT},
commitment_config::CommitmentConfig,
epoch_schedule::EpochSchedule,
genesis_config::{ClusterType, GenesisConfig},
Expand All @@ -50,10 +53,13 @@ use {
collections::HashMap,
io::{Error, ErrorKind, Result},
iter,
path::{Path, PathBuf},
sync::{Arc, RwLock},
},
};

const DUMMY_SNAPSHOT_CONFIG_PATH_MARKER: &str = "dummy";

pub struct ClusterConfig {
/// The validator config that should be applied to every node in the cluster
pub validator_configs: Vec<ValidatorConfig>,
Expand Down Expand Up @@ -128,6 +134,23 @@ impl LocalCluster {
Self::new(&mut config, socket_addr_space)
}

fn sync_ledger_path_across_nested_config_fields(
config: &mut ValidatorConfig,
ledger_path: &Path,
) {
config.account_paths = vec![ledger_path.join("accounts")];
config.tower_storage = Arc::new(FileTowerStorage::new(ledger_path.to_path_buf()));
if let Some(snapshot_config) = &mut config.snapshot_config {
let dummy: PathBuf = DUMMY_SNAPSHOT_CONFIG_PATH_MARKER.into();
if snapshot_config.snapshot_archives_dir == dummy {
snapshot_config.snapshot_archives_dir = ledger_path.to_path_buf();
}
if snapshot_config.bank_snapshots_dir == dummy {
snapshot_config.bank_snapshots_dir = ledger_path.join("snapshot");
}
}
}

pub fn new(config: &mut ClusterConfig, socket_addr_space: SocketAddrSpace) -> Self {
assert_eq!(config.validator_configs.len(), config.node_stakes.len());
let mut validator_keys = {
Expand Down Expand Up @@ -215,8 +238,7 @@ impl LocalCluster {
let leader_contact_info = leader_node.info.clone();
let mut leader_config = safe_clone_config(&config.validator_configs[0]);
leader_config.rpc_addrs = Some((leader_node.info.rpc, leader_node.info.rpc_pubsub));
leader_config.account_paths = vec![leader_ledger_path.join("accounts")];
leader_config.tower_storage = Arc::new(FileTowerStorage::new(leader_ledger_path.clone()));
Self::sync_ledger_path_across_nested_config_fields(&mut leader_config, &leader_ledger_path);
let leader_keypair = Arc::new(Keypair::from_bytes(&leader_keypair.to_bytes()).unwrap());
let leader_vote_keypair =
Arc::new(Keypair::from_bytes(&leader_vote_keypair.to_bytes()).unwrap());
Expand Down Expand Up @@ -376,8 +398,7 @@ impl LocalCluster {

let mut config = safe_clone_config(validator_config);
config.rpc_addrs = Some((validator_node.info.rpc, validator_node.info.rpc_pubsub));
config.account_paths = vec![ledger_path.join("accounts")];
config.tower_storage = Arc::new(FileTowerStorage::new(ledger_path.clone()));
Self::sync_ledger_path_across_nested_config_fields(&mut config, &ledger_path);
let voting_keypair = voting_keypair.unwrap();
let validator_server = Validator::new(
validator_node,
Expand Down Expand Up @@ -408,7 +429,7 @@ impl LocalCluster {
validator_pubkey
}

pub fn ledger_path(&self, validator_pubkey: &Pubkey) -> std::path::PathBuf {
pub fn ledger_path(&self, validator_pubkey: &Pubkey) -> PathBuf {
self.validators
.get(validator_pubkey)
.unwrap()
Expand Down Expand Up @@ -639,6 +660,19 @@ impl LocalCluster {
)),
}
}

pub fn create_dummy_load_only_snapshot_config() -> SnapshotConfig {
// DUMMY_SNAPSHOT_CONFIG_PATH_MARKER will be replaced with real value as part of cluster
// node lifecycle.
// There must be some place holder for now...
SnapshotConfig {
full_snapshot_archive_interval_slots: Slot::MAX,
incremental_snapshot_archive_interval_slots: Slot::MAX,
snapshot_archives_dir: DUMMY_SNAPSHOT_CONFIG_PATH_MARKER.into(),
bank_snapshots_dir: DUMMY_SNAPSHOT_CONFIG_PATH_MARKER.into(),
..SnapshotConfig::default()
}
}
}

impl Cluster for LocalCluster {
Expand Down Expand Up @@ -713,10 +747,10 @@ impl Cluster for LocalCluster {
) -> ClusterValidatorInfo {
// Restart the node
let validator_info = &cluster_validator_info.info;
cluster_validator_info.config.account_paths =
vec![validator_info.ledger_path.join("accounts")];
cluster_validator_info.config.tower_storage =
Arc::new(FileTowerStorage::new(validator_info.ledger_path.clone()));
LocalCluster::sync_ledger_path_across_nested_config_fields(
&mut cluster_validator_info.config,
&validator_info.ledger_path,
);
let restarted_node = Validator::new(
node,
validator_info.keypair.clone(),
Expand Down
2 changes: 2 additions & 0 deletions local-cluster/src/validator_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
no_wait_for_vote_to_start_leader: config.no_wait_for_vote_to_start_leader,
accounts_shrink_ratio: config.accounts_shrink_ratio,
accounts_db_config: config.accounts_db_config.clone(),
no_hard_fork_blockstore_root_reconcilation_for_local_cluster_test: config
.no_hard_fork_blockstore_root_reconcilation_for_local_cluster_test,
}
}

Expand Down
Loading

0 comments on commit f616d28

Please sign in to comment.