Skip to content

Commit

Permalink
Skip leader slots until a vote lands (#15607)
Browse files Browse the repository at this point in the history
  • Loading branch information
sakridge authored Mar 26, 2021
1 parent b041b55 commit b99ae8f
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 25 deletions.
2 changes: 2 additions & 0 deletions core/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1455,6 +1455,8 @@ pub mod test {
None,
&mut self.heaviest_subtree_fork_choice,
&mut BTreeMap::new(),
&mut true,
&mut Vec::new(),
)
}

Expand Down
48 changes: 48 additions & 0 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use solana_sdk::{
genesis_config::ClusterType,
hash::Hash,
pubkey::Pubkey,
signature::Signature,
signature::{Keypair, Signer},
timing::timestamp,
transaction::Transaction,
Expand All @@ -63,6 +64,7 @@ pub const SUPERMINORITY_THRESHOLD: f64 = 1f64 / 3f64;
pub const MAX_UNCONFIRMED_SLOTS: usize = 5;
pub const DUPLICATE_LIVENESS_THRESHOLD: f64 = 0.1;
pub const DUPLICATE_THRESHOLD: f64 = 1.0 - SWITCH_FORK_THRESHOLD - DUPLICATE_LIVENESS_THRESHOLD;
const MAX_VOTE_SIGNATURES: usize = 200;

#[derive(PartialEq, Debug)]
pub(crate) enum HeaviestForkFailures {
Expand Down Expand Up @@ -111,6 +113,7 @@ pub struct ReplayStageConfig {
pub rewards_recorder_sender: Option<RewardsRecorderSender>,
pub cache_block_time_sender: Option<CacheBlockTimeSender>,
pub bank_notification_sender: Option<BankNotificationSender>,
pub wait_for_vote_to_start_leader: bool,
}

#[derive(Default)]
Expand Down Expand Up @@ -282,6 +285,7 @@ impl ReplayStage {
rewards_recorder_sender,
cache_block_time_sender,
bank_notification_sender,
wait_for_vote_to_start_leader,
} = config;

trace!("replay stage");
Expand Down Expand Up @@ -312,6 +316,8 @@ impl ReplayStage {
let mut skipped_slots_info = SkippedSlotsInfo::default();
let mut replay_timing = ReplayTiming::default();
let mut gossip_duplicate_confirmed_slots: GossipDuplicateConfirmedSlots = BTreeMap::new();
let mut voted_signatures = Vec::new();
let mut has_new_vote_been_rooted = !wait_for_vote_to_start_leader;
loop {
let allocated = thread_mem_usage::Allocatedp::default();

Expand Down Expand Up @@ -523,6 +529,8 @@ impl ReplayStage {
&cache_block_time_sender,
&bank_notification_sender,
&mut gossip_duplicate_confirmed_slots,
&mut voted_signatures,
&mut has_new_vote_been_rooted,
);
};
voting_time.stop();
Expand Down Expand Up @@ -614,6 +622,7 @@ impl ReplayStage {
&progress,
&retransmit_slots_sender,
&mut skipped_slots_info,
has_new_vote_been_rooted,
);

let poh_bank = poh_recorder.lock().unwrap().bank();
Expand Down Expand Up @@ -1020,7 +1029,12 @@ impl ReplayStage {
progress_map: &ProgressMap,
retransmit_slots_sender: &RetransmitSlotsSender,
skipped_slots_info: &mut SkippedSlotsInfo,
has_new_vote_been_rooted: bool,
) {
if !has_new_vote_been_rooted {
info!("Haven't landed a vote, so skipping my leader slot");
return;
}
// all the individual calls to poh_recorder.lock() are designed to
// increase granularity, decrease contention

Expand Down Expand Up @@ -1238,6 +1252,8 @@ impl ReplayStage {
cache_block_time_sender: &Option<CacheBlockTimeSender>,
bank_notification_sender: &Option<BankNotificationSender>,
gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots,
vote_signatures: &mut Vec<Signature>,
has_new_vote_been_rooted: &mut bool,
) {
if bank.is_empty() {
inc_new_counter_info!("replay_stage-voted_empty_bank", 1);
Expand Down Expand Up @@ -1290,6 +1306,8 @@ impl ReplayStage {
highest_confirmed_root,
heaviest_subtree_fork_choice,
gossip_duplicate_confirmed_slots,
has_new_vote_been_rooted,
vote_signatures,
);
subscriptions.notify_roots(rooted_slots);
if let Some(sender) = bank_notification_sender {
Expand Down Expand Up @@ -1319,6 +1337,8 @@ impl ReplayStage {
last_vote,
&tower_slots,
switch_fork_decision,
vote_signatures,
*has_new_vote_been_rooted,
);
}

Expand All @@ -1330,6 +1350,8 @@ impl ReplayStage {
vote: Vote,
tower: &[Slot],
switch_fork_decision: &SwitchForkDecision,
vote_signatures: &mut Vec<Signature>,
has_new_vote_been_rooted: bool,
) {
if authorized_voter_keypairs.is_empty() {
return;
Expand Down Expand Up @@ -1399,6 +1421,14 @@ impl ReplayStage {

let mut vote_tx = Transaction::new_with_payer(&[vote_ix], Some(&node_keypair.pubkey()));

if !has_new_vote_been_rooted {
vote_signatures.push(vote_tx.signatures[0]);
if vote_signatures.len() > MAX_VOTE_SIGNATURES {
vote_signatures.remove(0);
}
} else {
vote_signatures.clear();
}
let blockhash = bank.last_blockhash();
vote_tx.partial_sign(&[node_keypair.as_ref()], blockhash);
vote_tx.partial_sign(&[authorized_voter_keypair.as_ref()], blockhash);
Expand Down Expand Up @@ -2125,13 +2155,27 @@ impl ReplayStage {
highest_confirmed_root: Option<Slot>,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots,
has_new_vote_been_rooted: &mut bool,
voted_signatures: &mut Vec<Signature>,
) {
bank_forks.write().unwrap().set_root(
new_root,
accounts_background_request_sender,
highest_confirmed_root,
);
let r_bank_forks = bank_forks.read().unwrap();
let new_root_bank = &r_bank_forks[new_root];
if !*has_new_vote_been_rooted {
for signature in voted_signatures.iter() {
if new_root_bank.get_signature_status(signature).is_some() {
*has_new_vote_been_rooted = true;
break;
}
}
if *has_new_vote_been_rooted {
std::mem::take(voted_signatures);
}
}
progress.handle_new_root(&r_bank_forks);
heaviest_subtree_fork_choice.set_root(new_root);
let mut slots_ge_root = gossip_duplicate_confirmed_slots.split_off(&new_root);
Expand Down Expand Up @@ -2553,6 +2597,8 @@ pub(crate) mod tests {
None,
&mut heaviest_subtree_fork_choice,
&mut gossip_duplicate_confirmed_slots,
&mut true,
&mut Vec::new(),
);
assert_eq!(bank_forks.read().unwrap().root(), root);
assert_eq!(progress.len(), 1);
Expand Down Expand Up @@ -2609,6 +2655,8 @@ pub(crate) mod tests {
Some(confirmed_root),
&mut heaviest_subtree_fork_choice,
&mut BTreeMap::new(),
&mut true,
&mut Vec::new(),
);
assert_eq!(bank_forks.read().unwrap().root(), root);
assert!(bank_forks.read().unwrap().get(confirmed_root).is_some());
Expand Down
1 change: 1 addition & 0 deletions core/src/test_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ impl TestValidator {
warp_slot: config.warp_slot,
bpf_jit: !config.no_bpf_jit,
validator_exit: config.validator_exit.clone(),
no_wait_for_vote_to_start_leader: true,
..ValidatorConfig::default()
};

Expand Down
2 changes: 2 additions & 0 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub struct TvuConfig {
pub use_index_hash_calculation: bool,
pub rocksdb_compaction_interval: Option<u64>,
pub rocksdb_max_compaction_jitter: Option<u64>,
pub wait_for_vote_to_start_leader: bool,
}

impl Tvu {
Expand Down Expand Up @@ -259,6 +260,7 @@ impl Tvu {
rewards_recorder_sender,
cache_block_time_sender,
bank_notification_sender,
wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader,
};

let replay_stage = ReplayStage::new(
Expand Down
77 changes: 52 additions & 25 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ pub struct ValidatorConfig {
pub accounts_db_use_index_hash_calculation: bool,
pub tpu_coalesce_ms: u64,
pub validator_exit: Arc<RwLock<ValidatorExit>>,
pub no_wait_for_vote_to_start_leader: bool,
}

impl Default for ValidatorConfig {
Expand Down Expand Up @@ -184,6 +185,7 @@ impl Default for ValidatorConfig {
accounts_db_use_index_hash_calculation: true,
tpu_coalesce_ms: DEFAULT_TPU_COALESCE_MS,
validator_exit: Arc::new(RwLock::new(ValidatorExit::default())),
no_wait_for_vote_to_start_leader: true,
}
}
}
Expand Down Expand Up @@ -629,15 +631,20 @@ impl Validator {
check_poh_speed(&genesis_config, None);
}

if wait_for_supermajority(
let waited_for_supermajority = if let Ok(waited) = wait_for_supermajority(
config,
&bank,
&cluster_info,
rpc_override_health_check,
&start_progress,
) {
waited
} else {
abort();
}
};

let wait_for_vote_to_start_leader =
!waited_for_supermajority && !config.no_wait_for_vote_to_start_leader;

let poh_service = PohService::new(
poh_recorder.clone(),
Expand Down Expand Up @@ -725,6 +732,7 @@ impl Validator {
use_index_hash_calculation: config.accounts_db_use_index_hash_calculation,
rocksdb_compaction_interval: config.rocksdb_compaction_interval,
rocksdb_max_compaction_jitter: config.rocksdb_compaction_interval,
wait_for_vote_to_start_leader,
},
&max_slots,
);
Expand Down Expand Up @@ -1292,30 +1300,41 @@ fn initialize_rpc_transaction_history_services(
}
}

// Return true on error, indicating the validator should exit.
#[derive(Debug, PartialEq)]
enum ValidatorError {
BadExpectedBankHash,
NotEnoughLedgerData,
}

// Return if the validator waited on other nodes to start. In this case
// it should not wait for one of it's votes to land to produce blocks
// because if the whole network is waiting, then it will stall.
//
// Error indicates that a bad hash was encountered or another condition
// that is unrecoverable and the validator should exit.
fn wait_for_supermajority(
config: &ValidatorConfig,
bank: &Bank,
cluster_info: &ClusterInfo,
rpc_override_health_check: Arc<AtomicBool>,
start_progress: &Arc<RwLock<ValidatorStartProgress>>,
) -> bool {
) -> Result<bool, ValidatorError> {
if let Some(wait_for_supermajority) = config.wait_for_supermajority {
match wait_for_supermajority.cmp(&bank.slot()) {
std::cmp::Ordering::Less => return false,
std::cmp::Ordering::Less => return Ok(false),
std::cmp::Ordering::Greater => {
error!(
"Ledger does not have enough data to wait for supermajority, \
please enable snapshot fetch. Has {} needs {}",
bank.slot(),
wait_for_supermajority
);
return true;
return Err(ValidatorError::NotEnoughLedgerData);
}
_ => {}
}
} else {
return false;
return Ok(false);
}

if let Some(expected_bank_hash) = config.expected_bank_hash {
Expand All @@ -1325,7 +1344,7 @@ fn wait_for_supermajority(
bank.hash(),
expected_bank_hash
);
return true;
return Err(ValidatorError::BadExpectedBankHash);
}
}

Expand All @@ -1350,7 +1369,7 @@ fn wait_for_supermajority(
sleep(Duration::new(1, 0));
}
rpc_override_health_check.store(false, Ordering::Relaxed);
false
Ok(true)
}

fn report_target_features() {
Expand Down Expand Up @@ -1641,17 +1660,21 @@ mod tests {
&cluster_info,
rpc_override_health_check.clone(),
&start_progress,
));
)
.unwrap());

// bank=0, wait=1, should fail
config.wait_for_supermajority = Some(1);
assert!(wait_for_supermajority(
&config,
&bank,
&cluster_info,
rpc_override_health_check.clone(),
&start_progress,
));
assert_eq!(
wait_for_supermajority(
&config,
&bank,
&cluster_info,
rpc_override_health_check.clone(),
&start_progress,
),
Err(ValidatorError::NotEnoughLedgerData)
);

// bank=1, wait=0, should pass, bank is past the wait slot
let bank = Bank::new_from_parent(&bank, &Pubkey::default(), 1);
Expand All @@ -1662,18 +1685,22 @@ mod tests {
&cluster_info,
rpc_override_health_check.clone(),
&start_progress,
));
)
.unwrap());

// bank=1, wait=1, equal, but bad hash provided
config.wait_for_supermajority = Some(1);
config.expected_bank_hash = Some(hash(&[1]));
assert!(wait_for_supermajority(
&config,
&bank,
&cluster_info,
rpc_override_health_check,
&start_progress,
));
assert_eq!(
wait_for_supermajority(
&config,
&bank,
&cluster_info,
rpc_override_health_check,
&start_progress,
),
Err(ValidatorError::BadExpectedBankHash)
);
}

#[test]
Expand Down
1 change: 1 addition & 0 deletions local-cluster/src/validator_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
tpu_coalesce_ms: config.tpu_coalesce_ms,
validator_exit: Arc::new(RwLock::new(ValidatorExit::default())),
poh_hashes_per_batch: config.poh_hashes_per_batch,
no_wait_for_vote_to_start_leader: config.no_wait_for_vote_to_start_leader,
}
}

Expand Down
1 change: 1 addition & 0 deletions multinode-demo/bootstrap-validator.sh
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ args+=(
--vote-account "$vote_account"
--rpc-faucet-address 127.0.0.1:9900
--no-poh-speed-test
--no-wait-for-vote-to-start-leader
)
default_arg --gossip-port 8001
default_arg --log -
Expand Down
1 change: 1 addition & 0 deletions run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ args=(
--init-complete-file "$dataDir"/init-completed
--snapshot-compression none
--require-tower
--no-wait-for-vote-to-start-leader
)
# shellcheck disable=SC2086
solana-validator "${args[@]}" $SOLANA_RUN_SH_VALIDATOR_ARGS &
Expand Down
Loading

0 comments on commit b99ae8f

Please sign in to comment.