Skip to content

Commit

Permalink
v1.3: Backport block time updates (#12423)
Browse files Browse the repository at this point in the history
* Submit a vote timestamp every vote (#10630)

* Submit a timestamp for every vote

* Submit at most one vote timestamp per second

* Submit a timestamp for every new vote

Co-authored-by: Tyera Eulberg <[email protected]>

* Timestamp first vote (#11856)

* Cache block time in Blockstore (#11955)

* Add blockstore column to cache block times

* Add method to cache block time

* Add service to cache block time

* Update rpc getBlockTime to use new method, and refactor blockstore slightly

* Return block_time with confirmed block, if available

* Add measure and warning to cache-block-time

Co-authored-by: Michael Vines <[email protected]>
  • Loading branch information
CriesofCarrots and mvines authored Sep 23, 2020
1 parent 0f3a555 commit 35a1ab9
Show file tree
Hide file tree
Showing 12 changed files with 310 additions and 148 deletions.
76 changes: 76 additions & 0 deletions core/src/cache_block_time_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use crossbeam_channel::{Receiver, RecvTimeoutError, Sender};
use solana_ledger::blockstore::Blockstore;
use solana_measure::measure::Measure;
use solana_runtime::bank::Bank;
use solana_sdk::timing::slot_duration_from_slots_per_year;
use std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{self, Builder, JoinHandle},
time::Duration,
};

pub type CacheBlockTimeReceiver = Receiver<Arc<Bank>>;
pub type CacheBlockTimeSender = Sender<Arc<Bank>>;

pub struct CacheBlockTimeService {
thread_hdl: JoinHandle<()>,
}

const CACHE_BLOCK_TIME_WARNING_MS: u64 = 150;

impl CacheBlockTimeService {
#[allow(clippy::new_ret_no_self)]
pub fn new(
cache_block_time_receiver: CacheBlockTimeReceiver,
blockstore: Arc<Blockstore>,
exit: &Arc<AtomicBool>,
) -> Self {
let exit = exit.clone();
let thread_hdl = Builder::new()
.name("solana-cache-block-time".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
let recv_result = cache_block_time_receiver.recv_timeout(Duration::from_secs(1));
match recv_result {
Err(RecvTimeoutError::Disconnected) => {
break;
}
Ok(bank) => {
let mut cache_block_time_timer = Measure::start("cache_block_time_timer");
Self::cache_block_time(bank, &blockstore);
cache_block_time_timer.stop();
if cache_block_time_timer.as_ms() > CACHE_BLOCK_TIME_WARNING_MS {
warn!(
"cache_block_time operation took: {}ms",
cache_block_time_timer.as_ms()
);
}
}
_ => {}
}
})
.unwrap();
Self { thread_hdl }
}

fn cache_block_time(bank: Arc<Bank>, blockstore: &Arc<Blockstore>) {
let slot_duration = slot_duration_from_slots_per_year(bank.slots_per_year());
let epoch = bank.epoch_schedule().get_epoch(bank.slot());
let stakes = HashMap::new();
let stakes = bank.epoch_vote_accounts(epoch).unwrap_or(&stakes);

if let Err(e) = blockstore.cache_block_time(bank.slot(), slot_duration, stakes) {
error!("cache_block_time failed: slot {:?} {:?}", bank.slot(), e);
}
}

pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}
64 changes: 28 additions & 36 deletions core/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ use solana_sdk::{
};
use solana_vote_program::{
vote_instruction,
vote_state::{
BlockTimestamp, Lockout, Vote, VoteState, MAX_LOCKOUT_HISTORY, TIMESTAMP_SLOT_INTERVAL,
},
vote_state::{BlockTimestamp, Lockout, Vote, VoteState, MAX_LOCKOUT_HISTORY},
};
use std::{
collections::{HashMap, HashSet},
Expand Down Expand Up @@ -345,6 +343,22 @@ impl Tower {
last_vote
}

fn maybe_timestamp(&mut self, current_slot: Slot) -> Option<UnixTimestamp> {
if current_slot > self.last_timestamp.slot
|| self.last_timestamp.slot == 0 && current_slot == self.last_timestamp.slot
{
let timestamp = Utc::now().timestamp();
if timestamp >= self.last_timestamp.timestamp {
self.last_timestamp = BlockTimestamp {
slot: current_slot,
timestamp,
};
return Some(timestamp);
}
}
None
}

pub fn root(&self) -> Option<Slot> {
self.lockouts.root_slot
}
Expand Down Expand Up @@ -653,21 +667,6 @@ impl Tower {
);
}
}

fn maybe_timestamp(&mut self, current_slot: Slot) -> Option<UnixTimestamp> {
if self.last_timestamp.slot == 0
|| self.last_timestamp.slot < (current_slot - (current_slot % TIMESTAMP_SLOT_INTERVAL))
{
let timestamp = Utc::now().timestamp();
self.last_timestamp = BlockTimestamp {
slot: current_slot,
timestamp,
};
Some(timestamp)
} else {
None
}
}
}

#[cfg(test)]
Expand All @@ -694,12 +693,7 @@ pub mod test {
vote_state::{Vote, VoteStateVersions, MAX_LOCKOUT_HISTORY},
vote_transaction,
};
use std::{
collections::HashMap,
rc::Rc,
sync::RwLock,
{thread::sleep, time::Duration},
};
use std::{collections::HashMap, rc::Rc, sync::RwLock};
use trees::{tr, Tree, TreeWalk};

pub(crate) struct VoteSimulator {
Expand Down Expand Up @@ -1832,17 +1826,15 @@ pub mod test {
#[test]
fn test_maybe_timestamp() {
let mut tower = Tower::default();
assert!(tower.maybe_timestamp(TIMESTAMP_SLOT_INTERVAL).is_some());
let BlockTimestamp { slot, timestamp } = tower.last_timestamp;

assert_eq!(tower.maybe_timestamp(1), None);
assert_eq!(tower.maybe_timestamp(slot), None);
assert_eq!(tower.maybe_timestamp(slot + 1), None);

sleep(Duration::from_secs(1));
assert!(tower
.maybe_timestamp(slot + TIMESTAMP_SLOT_INTERVAL + 1)
.is_some());
assert!(tower.last_timestamp.timestamp > timestamp);
assert!(tower.maybe_timestamp(0).is_some());
assert!(tower.maybe_timestamp(1).is_some());
assert!(tower.maybe_timestamp(0).is_none()); // Refuse to timestamp an older slot
assert!(tower.maybe_timestamp(1).is_none()); // Refuse to timestamp the same slot twice

tower.last_timestamp.timestamp -= 1; // Move last_timestamp into the past
assert!(tower.maybe_timestamp(2).is_some()); // slot 2 gets a timestamp

tower.last_timestamp.timestamp += 1_000_000; // Move last_timestamp well into the future
assert!(tower.maybe_timestamp(3).is_none()); // slot 3 gets no timestamp
}
}
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod accounts_hash_verifier;
pub mod banking_stage;
pub mod bigtable_upload_service;
pub mod broadcast_stage;
pub mod cache_block_time_service;
pub mod cluster_info_vote_listener;
pub mod commitment_service;
pub mod completed_data_sets_service;
Expand Down
41 changes: 41 additions & 0 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use crate::{
bank_weight_fork_choice::BankWeightForkChoice,
broadcast_stage::RetransmitSlotsSender,
cache_block_time_service::CacheBlockTimeSender,
cluster_info::ClusterInfo,
cluster_info_vote_listener::VoteTracker,
cluster_slots::ClusterSlots,
Expand Down Expand Up @@ -106,6 +107,7 @@ pub struct ReplayStageConfig {
pub block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
pub transaction_status_sender: Option<TransactionStatusSender>,
pub rewards_recorder_sender: Option<RewardsRecorderSender>,
pub cache_block_time_sender: Option<CacheBlockTimeSender>,
}

#[derive(Default)]
Expand Down Expand Up @@ -235,6 +237,7 @@ impl ReplayStage {
block_commitment_cache,
transaction_status_sender,
rewards_recorder_sender,
cache_block_time_sender,
} = config;

trace!("replay stage");
Expand Down Expand Up @@ -494,6 +497,7 @@ impl ReplayStage {
&subscriptions,
&block_commitment_cache,
&mut heaviest_subtree_fork_choice,
&cache_block_time_sender,
)?;
};
voting_time.stop();
Expand Down Expand Up @@ -1004,6 +1008,7 @@ impl ReplayStage {
subscriptions: &Arc<RpcSubscriptions>,
block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
cache_block_time_sender: &Option<CacheBlockTimeSender>,
) -> Result<()> {
if bank.is_empty() {
inc_new_counter_info!("replay_stage-voted_empty_bank", 1);
Expand All @@ -1029,6 +1034,12 @@ impl ReplayStage {
blockstore
.set_roots(&rooted_slots)
.expect("Ledger set roots failed");
Self::cache_block_times(
blockstore,
bank_forks,
&rooted_slots,
cache_block_time_sender,
);
let highest_confirmed_root = Some(
block_commitment_cache
.read()
Expand Down Expand Up @@ -1855,6 +1866,36 @@ impl ReplayStage {
}
}

fn cache_block_times(
blockstore: &Arc<Blockstore>,
bank_forks: &Arc<RwLock<BankForks>>,
rooted_slots: &[Slot],
cache_block_time_sender: &Option<CacheBlockTimeSender>,
) {
if let Some(cache_block_time_sender) = cache_block_time_sender {
for slot in rooted_slots {
if blockstore
.get_block_time(*slot)
.unwrap_or_default()
.is_none()
{
if let Some(rooted_bank) = bank_forks.read().unwrap().get(*slot) {
cache_block_time_sender
.send(rooted_bank.clone())
.unwrap_or_else(|err| {
warn!("cache_block_time_sender failed: {:?}", err)
});
} else {
error!(
"rooted_bank {:?} not available in BankForks; block time not cached",
slot
);
}
}
}
}
}

pub fn get_unlock_switch_vote_slot(cluster_type: ClusterType) -> Slot {
match cluster_type {
ClusterType::Development => 0,
Expand Down
22 changes: 8 additions & 14 deletions core/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ use solana_sdk::{
stake_history::StakeHistory,
system_instruction,
sysvar::{stake_history, Sysvar},
timing::slot_duration_from_slots_per_year,
transaction::{self, Transaction},
};
use solana_stake_program::stake_state::StakeState;
Expand Down Expand Up @@ -697,18 +696,7 @@ impl JsonRpcRequestProcessor {
.unwrap()
.highest_confirmed_root()
{
// This calculation currently assumes that bank.slots_per_year will remain unchanged after
// genesis (ie. that this bank's slot_per_year will be applicable to any rooted slot being
// queried). If these values will be variable in the future, those timing parameters will
// need to be stored persistently, and the slot_duration calculation will likely need to be
// moved upstream into blockstore. Also, an explicit commitment level will need to be set.
let bank = self.bank(None);
let slot_duration = slot_duration_from_slots_per_year(bank.slots_per_year());
let epoch = bank.epoch_schedule().get_epoch(slot);
let stakes = HashMap::new();
let stakes = bank.epoch_vote_accounts(epoch).unwrap_or(&stakes);

let result = self.blockstore.get_block_time(slot, slot_duration, stakes);
let result = self.blockstore.get_block_time(slot);
self.check_slot_cleaned_up(&result, slot)?;
Ok(result.ok().unwrap_or(None))
} else {
Expand Down Expand Up @@ -2577,6 +2565,7 @@ pub mod tests {
nonce, rpc_port,
signature::{Keypair, Signer},
system_program, system_transaction,
timing::slot_duration_from_slots_per_year,
transaction::{self, TransactionError},
};
use solana_transaction_status::{EncodedTransaction, TransactionWithStatusMeta, UiMessage};
Expand All @@ -2589,7 +2578,7 @@ pub mod tests {
state::AccountState as TokenAccountState,
state::Mint,
};
use std::collections::HashMap;
use std::{collections::HashMap, time::Duration};

const TEST_MINT_LAMPORTS: u64 = 1_000_000;
const TEST_SLOTS_PER_EPOCH: u64 = DELINQUENT_VALIDATOR_SLOT_DISTANCE + 1;
Expand Down Expand Up @@ -2695,6 +2684,11 @@ pub mod tests {

for root in roots.iter() {
bank_forks.write().unwrap().set_root(*root, &None, Some(0));
let mut stakes = HashMap::new();
stakes.insert(leader_vote_keypair.pubkey(), (1, Account::default()));
blockstore
.cache_block_time(*root, Duration::from_millis(400), &stakes)
.unwrap();
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/rpc_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl From<RpcCustomError> for Error {
},
RpcCustomError::BlockNotAvailable { slot } => Self {
code: ErrorCode::ServerError(JSON_RPC_SERVER_ERROR_4),
message: format!("Block not available for slot {}", slot,),
message: format!("Block not available for slot {}", slot),
data: None,
},
RpcCustomError::RpcNodeUnhealthy => Self {
Expand Down
4 changes: 4 additions & 0 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
accounts_background_service::AccountsBackgroundService,
accounts_hash_verifier::AccountsHashVerifier,
broadcast_stage::RetransmitSlotsSender,
cache_block_time_service::CacheBlockTimeSender,
cluster_info::ClusterInfo,
cluster_info_vote_listener::{VerifiedVoteReceiver, VoteTracker},
cluster_slots::ClusterSlots,
Expand Down Expand Up @@ -96,6 +97,7 @@ impl Tvu {
cfg: Option<Arc<AtomicBool>>,
transaction_status_sender: Option<TransactionStatusSender>,
rewards_recorder_sender: Option<RewardsRecorderSender>,
cache_block_time_sender: Option<CacheBlockTimeSender>,
snapshot_package_sender: Option<AccountsPackageSender>,
vote_tracker: Arc<VoteTracker>,
retransmit_slots_sender: RetransmitSlotsSender,
Expand Down Expand Up @@ -191,6 +193,7 @@ impl Tvu {
block_commitment_cache,
transaction_status_sender,
rewards_recorder_sender,
cache_block_time_sender,
};

let replay_stage = ReplayStage::new(
Expand Down Expand Up @@ -327,6 +330,7 @@ pub mod tests {
None,
None,
None,
None,
Arc::new(VoteTracker::new(&bank)),
retransmit_slots_sender,
verified_vote_receiver,
Expand Down
Loading

0 comments on commit 35a1ab9

Please sign in to comment.