Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip leader slots until a vote lands #15607

Merged
merged 1 commit into from
Mar 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1455,6 +1455,8 @@ pub mod test {
None,
&mut self.heaviest_subtree_fork_choice,
&mut BTreeMap::new(),
&mut true,
&mut Vec::new(),
)
}

Expand Down
48 changes: 48 additions & 0 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use solana_sdk::{
genesis_config::ClusterType,
hash::Hash,
pubkey::Pubkey,
signature::Signature,
signature::{Keypair, Signer},
timing::timestamp,
transaction::Transaction,
Expand All @@ -63,6 +64,7 @@ pub const SUPERMINORITY_THRESHOLD: f64 = 1f64 / 3f64;
pub const MAX_UNCONFIRMED_SLOTS: usize = 5;
pub const DUPLICATE_LIVENESS_THRESHOLD: f64 = 0.1;
pub const DUPLICATE_THRESHOLD: f64 = 1.0 - SWITCH_FORK_THRESHOLD - DUPLICATE_LIVENESS_THRESHOLD;
const MAX_VOTE_SIGNATURES: usize = 200;

#[derive(PartialEq, Debug)]
pub(crate) enum HeaviestForkFailures {
Expand Down Expand Up @@ -111,6 +113,7 @@ pub struct ReplayStageConfig {
pub rewards_recorder_sender: Option<RewardsRecorderSender>,
pub cache_block_time_sender: Option<CacheBlockTimeSender>,
pub bank_notification_sender: Option<BankNotificationSender>,
pub wait_for_vote_to_start_leader: bool,
}

#[derive(Default)]
Expand Down Expand Up @@ -282,6 +285,7 @@ impl ReplayStage {
rewards_recorder_sender,
cache_block_time_sender,
bank_notification_sender,
wait_for_vote_to_start_leader,
} = config;

trace!("replay stage");
Expand Down Expand Up @@ -312,6 +316,8 @@ impl ReplayStage {
let mut skipped_slots_info = SkippedSlotsInfo::default();
let mut replay_timing = ReplayTiming::default();
let mut gossip_duplicate_confirmed_slots: GossipDuplicateConfirmedSlots = BTreeMap::new();
let mut voted_signatures = Vec::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let mut voted_signatures = Vec::new();
let mut voted_signatures = Vec::with_capacity(201);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 201?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose a 200 signature limit for how large this can be. I'm not sure we need to size it initially, most cases should not use the full 200 and the path isn't that performance sensitive.

let mut has_new_vote_been_rooted = !wait_for_vote_to_start_leader;
loop {
let allocated = thread_mem_usage::Allocatedp::default();

Expand Down Expand Up @@ -523,6 +529,8 @@ impl ReplayStage {
&cache_block_time_sender,
&bank_notification_sender,
&mut gossip_duplicate_confirmed_slots,
&mut voted_signatures,
&mut has_new_vote_been_rooted,
);
};
voting_time.stop();
Expand Down Expand Up @@ -614,6 +622,7 @@ impl ReplayStage {
&progress,
&retransmit_slots_sender,
&mut skipped_slots_info,
has_new_vote_been_rooted,
);

let poh_bank = poh_recorder.lock().unwrap().bank();
Expand Down Expand Up @@ -1020,7 +1029,12 @@ impl ReplayStage {
progress_map: &ProgressMap,
retransmit_slots_sender: &RetransmitSlotsSender,
skipped_slots_info: &mut SkippedSlotsInfo,
has_new_vote_been_rooted: bool,
) {
if !has_new_vote_been_rooted {
info!("Haven't landed a vote, so skipping my leader slot");
return;
}
// all the individual calls to poh_recorder.lock() are designed to
// increase granularity, decrease contention

Expand Down Expand Up @@ -1238,6 +1252,8 @@ impl ReplayStage {
cache_block_time_sender: &Option<CacheBlockTimeSender>,
bank_notification_sender: &Option<BankNotificationSender>,
gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots,
vote_signatures: &mut Vec<Signature>,
has_new_vote_been_rooted: &mut bool,
) {
if bank.is_empty() {
inc_new_counter_info!("replay_stage-voted_empty_bank", 1);
Expand Down Expand Up @@ -1290,6 +1306,8 @@ impl ReplayStage {
highest_confirmed_root,
heaviest_subtree_fork_choice,
gossip_duplicate_confirmed_slots,
has_new_vote_been_rooted,
vote_signatures,
);
subscriptions.notify_roots(rooted_slots);
if let Some(sender) = bank_notification_sender {
Expand Down Expand Up @@ -1319,6 +1337,8 @@ impl ReplayStage {
last_vote,
&tower_slots,
switch_fork_decision,
vote_signatures,
*has_new_vote_been_rooted,
);
}

Expand All @@ -1330,6 +1350,8 @@ impl ReplayStage {
vote: Vote,
tower: &[Slot],
switch_fork_decision: &SwitchForkDecision,
vote_signatures: &mut Vec<Signature>,
has_new_vote_been_rooted: bool,
) {
if authorized_voter_keypairs.is_empty() {
return;
Expand Down Expand Up @@ -1399,6 +1421,14 @@ impl ReplayStage {

let mut vote_tx = Transaction::new_with_payer(&[vote_ix], Some(&node_keypair.pubkey()));

if !has_new_vote_been_rooted {
vote_signatures.push(vote_tx.signatures[0]);
if vote_signatures.len() > MAX_VOTE_SIGNATURES {
vote_signatures.remove(0);
}
} else {
vote_signatures.clear();
}
let blockhash = bank.last_blockhash();
vote_tx.partial_sign(&[node_keypair.as_ref()], blockhash);
vote_tx.partial_sign(&[authorized_voter_keypair.as_ref()], blockhash);
Expand Down Expand Up @@ -2125,13 +2155,27 @@ impl ReplayStage {
highest_confirmed_root: Option<Slot>,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots,
has_new_vote_been_rooted: &mut bool,
voted_signatures: &mut Vec<Signature>,
) {
bank_forks.write().unwrap().set_root(
new_root,
accounts_background_request_sender,
highest_confirmed_root,
);
let r_bank_forks = bank_forks.read().unwrap();
let new_root_bank = &r_bank_forks[new_root];
if !*has_new_vote_been_rooted {
for signature in voted_signatures.iter() {
if new_root_bank.get_signature_status(signature).is_some() {
*has_new_vote_been_rooted = true;
break;
}
}
if *has_new_vote_been_rooted {
std::mem::take(voted_signatures);
}
}
progress.handle_new_root(&r_bank_forks);
heaviest_subtree_fork_choice.set_root(new_root);
let mut slots_ge_root = gossip_duplicate_confirmed_slots.split_off(&new_root);
Expand Down Expand Up @@ -2553,6 +2597,8 @@ pub(crate) mod tests {
None,
&mut heaviest_subtree_fork_choice,
&mut gossip_duplicate_confirmed_slots,
&mut true,
&mut Vec::new(),
);
assert_eq!(bank_forks.read().unwrap().root(), root);
assert_eq!(progress.len(), 1);
Expand Down Expand Up @@ -2609,6 +2655,8 @@ pub(crate) mod tests {
Some(confirmed_root),
&mut heaviest_subtree_fork_choice,
&mut BTreeMap::new(),
&mut true,
&mut Vec::new(),
);
assert_eq!(bank_forks.read().unwrap().root(), root);
assert!(bank_forks.read().unwrap().get(confirmed_root).is_some());
Expand Down
1 change: 1 addition & 0 deletions core/src/test_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ impl TestValidator {
warp_slot: config.warp_slot,
bpf_jit: !config.no_bpf_jit,
validator_exit: config.validator_exit.clone(),
no_wait_for_vote_to_start_leader: true,
..ValidatorConfig::default()
};

Expand Down
2 changes: 2 additions & 0 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub struct TvuConfig {
pub use_index_hash_calculation: bool,
pub rocksdb_compaction_interval: Option<u64>,
pub rocksdb_max_compaction_jitter: Option<u64>,
pub wait_for_vote_to_start_leader: bool,
}

impl Tvu {
Expand Down Expand Up @@ -259,6 +260,7 @@ impl Tvu {
rewards_recorder_sender,
cache_block_time_sender,
bank_notification_sender,
wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader,
};

let replay_stage = ReplayStage::new(
Expand Down
77 changes: 52 additions & 25 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ pub struct ValidatorConfig {
pub accounts_db_use_index_hash_calculation: bool,
pub tpu_coalesce_ms: u64,
pub validator_exit: Arc<RwLock<ValidatorExit>>,
pub no_wait_for_vote_to_start_leader: bool,
}

impl Default for ValidatorConfig {
Expand Down Expand Up @@ -184,6 +185,7 @@ impl Default for ValidatorConfig {
accounts_db_use_index_hash_calculation: true,
tpu_coalesce_ms: DEFAULT_TPU_COALESCE_MS,
validator_exit: Arc::new(RwLock::new(ValidatorExit::default())),
no_wait_for_vote_to_start_leader: true,
}
}
}
Expand Down Expand Up @@ -629,15 +631,20 @@ impl Validator {
check_poh_speed(&genesis_config, None);
}

if wait_for_supermajority(
let waited_for_supermajority = if let Ok(waited) = wait_for_supermajority(
config,
&bank,
&cluster_info,
rpc_override_health_check,
&start_progress,
) {
waited
} else {
abort();
}
};

let wait_for_vote_to_start_leader =
!waited_for_supermajority && !config.no_wait_for_vote_to_start_leader;

let poh_service = PohService::new(
poh_recorder.clone(),
Expand Down Expand Up @@ -725,6 +732,7 @@ impl Validator {
use_index_hash_calculation: config.accounts_db_use_index_hash_calculation,
rocksdb_compaction_interval: config.rocksdb_compaction_interval,
rocksdb_max_compaction_jitter: config.rocksdb_compaction_interval,
wait_for_vote_to_start_leader,
},
&max_slots,
);
Expand Down Expand Up @@ -1292,30 +1300,41 @@ fn initialize_rpc_transaction_history_services(
}
}

// Return true on error, indicating the validator should exit.
#[derive(Debug, PartialEq)]
enum ValidatorError {
BadExpectedBankHash,
NotEnoughLedgerData,
}

// Return if the validator waited on other nodes to start. In this case
// it should not wait for one of it's votes to land to produce blocks
// because if the whole network is waiting, then it will stall.
//
// Error indicates that a bad hash was encountered or another condition
// that is unrecoverable and the validator should exit.
fn wait_for_supermajority(
config: &ValidatorConfig,
bank: &Bank,
cluster_info: &ClusterInfo,
rpc_override_health_check: Arc<AtomicBool>,
start_progress: &Arc<RwLock<ValidatorStartProgress>>,
) -> bool {
) -> Result<bool, ValidatorError> {
if let Some(wait_for_supermajority) = config.wait_for_supermajority {
match wait_for_supermajority.cmp(&bank.slot()) {
std::cmp::Ordering::Less => return false,
std::cmp::Ordering::Less => return Ok(false),
std::cmp::Ordering::Greater => {
error!(
"Ledger does not have enough data to wait for supermajority, \
please enable snapshot fetch. Has {} needs {}",
bank.slot(),
wait_for_supermajority
);
return true;
return Err(ValidatorError::NotEnoughLedgerData);
}
_ => {}
}
} else {
return false;
return Ok(false);
}

if let Some(expected_bank_hash) = config.expected_bank_hash {
Expand All @@ -1325,7 +1344,7 @@ fn wait_for_supermajority(
bank.hash(),
expected_bank_hash
);
return true;
return Err(ValidatorError::BadExpectedBankHash);
}
}

Expand All @@ -1350,7 +1369,7 @@ fn wait_for_supermajority(
sleep(Duration::new(1, 0));
}
rpc_override_health_check.store(false, Ordering::Relaxed);
false
Ok(true)
}

fn report_target_features() {
Expand Down Expand Up @@ -1641,17 +1660,21 @@ mod tests {
&cluster_info,
rpc_override_health_check.clone(),
&start_progress,
));
)
.unwrap());

// bank=0, wait=1, should fail
config.wait_for_supermajority = Some(1);
assert!(wait_for_supermajority(
&config,
&bank,
&cluster_info,
rpc_override_health_check.clone(),
&start_progress,
));
assert_eq!(
wait_for_supermajority(
&config,
&bank,
&cluster_info,
rpc_override_health_check.clone(),
&start_progress,
),
Err(ValidatorError::NotEnoughLedgerData)
);

// bank=1, wait=0, should pass, bank is past the wait slot
let bank = Bank::new_from_parent(&bank, &Pubkey::default(), 1);
Expand All @@ -1662,18 +1685,22 @@ mod tests {
&cluster_info,
rpc_override_health_check.clone(),
&start_progress,
));
)
.unwrap());

// bank=1, wait=1, equal, but bad hash provided
config.wait_for_supermajority = Some(1);
config.expected_bank_hash = Some(hash(&[1]));
assert!(wait_for_supermajority(
&config,
&bank,
&cluster_info,
rpc_override_health_check,
&start_progress,
));
assert_eq!(
wait_for_supermajority(
&config,
&bank,
&cluster_info,
rpc_override_health_check,
&start_progress,
),
Err(ValidatorError::BadExpectedBankHash)
);
}

#[test]
Expand Down
1 change: 1 addition & 0 deletions local-cluster/src/validator_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
tpu_coalesce_ms: config.tpu_coalesce_ms,
validator_exit: Arc::new(RwLock::new(ValidatorExit::default())),
poh_hashes_per_batch: config.poh_hashes_per_batch,
no_wait_for_vote_to_start_leader: config.no_wait_for_vote_to_start_leader,
}
}

Expand Down
1 change: 1 addition & 0 deletions multinode-demo/bootstrap-validator.sh
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ args+=(
--vote-account "$vote_account"
--rpc-faucet-address 127.0.0.1:9900
--no-poh-speed-test
--no-wait-for-vote-to-start-leader
)
default_arg --gossip-port 8001
default_arg --log -
Expand Down
1 change: 1 addition & 0 deletions run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ args=(
--init-complete-file "$dataDir"/init-completed
--snapshot-compression none
--require-tower
--no-wait-for-vote-to-start-leader
)
# shellcheck disable=SC2086
solana-validator "${args[@]}" $SOLANA_RUN_SH_VALIDATOR_ARGS &
Expand Down
Loading