diff --git a/src/bank_state.rs b/src/bank_state.rs index eab853fb6cca0b..ccb85278b6eab4 100644 --- a/src/bank_state.rs +++ b/src/bank_state.rs @@ -6,6 +6,7 @@ use crate::last_id_queue::{LastIdQueue, MAX_ENTRY_IDS}; use crate::leader_scheduler::TICKS_PER_BLOCK; use crate::poh_recorder::PohRecorder; use crate::runtime::{self, RuntimeError}; +use crate::sched::Sched; use crate::status_cache::StatusCache; use log::Level; use rayon::prelude::*; @@ -30,6 +31,9 @@ pub struct BankCheckpoint { status_cache: RwLock, finalized: AtomicBool, fork_id: AtomicUsize, + /// current schedule and next schedule + /// These are rotated on BankCheckpoint::merge_into_root + sched: RwLock<(Sched, Sched)>, } impl std::fmt::Debug for BankCheckpoint { @@ -211,12 +215,13 @@ impl BankCheckpoint { /// consume the checkpoint into the root state /// self becomes the new root and its fork_id is updated pub fn merge_into_root(&self, other: Self) { - let (accounts, entry_q, status_cache, fork_id) = { + let prev_fork_id = self.fork_id(); + let (accounts, entry_q, status_cache, new_fork_id) = { ( other.accounts, other.entry_q, other.status_cache, - other.fork_id, + other.fork_id(), ) }; self.accounts.merge_into_root(accounts); @@ -228,8 +233,13 @@ impl BankCheckpoint { .write() .unwrap() .merge_into_root(status_cache.into_inner().unwrap()); - self.fork_id - .store(fork_id.load(Ordering::Relaxed), Ordering::Relaxed); + self.fork_id.store(new_fork_id); + if Sched::should_regenerate(prev_fork_id, new_fork_id) { + let next_sched = Sched::new_schedule(&self); + let sched = self.sched.write().unwrap(); + *sched.0 = sched.1; + *sched.1 = next_sched; + } } } diff --git a/src/broadcast_service.rs b/src/broadcast_service.rs index d7bb9f7ada8dcf..9777c589f88908 100644 --- a/src/broadcast_service.rs +++ b/src/broadcast_service.rs @@ -227,7 +227,7 @@ impl BroadcastService { ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => { - return BroadcastServiceReturnType::ChannelDisconnected + return BroadcastServiceReturnType::ChannelDisconnected; } Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), Error::ClusterInfoError(ClusterInfoError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these? diff --git a/src/lib.rs b/src/lib.rs index 192471c9de8f44..e9dbab0bb230a5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -59,6 +59,7 @@ pub mod rpc_mock; pub mod rpc_pubsub; pub mod rpc_request; pub mod runtime; +pub mod sched; pub mod service; pub mod signature; pub mod sigverify; diff --git a/src/sched.rs b/src/sched.rs new file mode 100644 index 00000000000000..c8a5cfbae22c1f --- /dev/null +++ b/src/sched.rs @@ -0,0 +1,60 @@ +use crate::bank_state::BankCheckpoint; +use fnv::FnvHasher; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::vote_program; +use std::hash::Hasher; + +pub const REFRESH_RATE: u64 = 1000; + +struct Sched { + min_slot: u64, + ranks: Vec<(Pubkey, u64)>, +} + +impl Sched { + fn should_regenerate(prev_root: u64, new_root: u64) -> bool { + prev_root / REFRESH_RATE != new_root / REFRESH_RATE + } + + /// ranked leaders + fn new_schedule(root: &BankCheckpoint) -> Sched { + let accounts = root.accounts.accounts_db.read().unwrap(); + let leaders: Vec<(Pubkey, u64)> = accounts + .accounts + .iter() + .filter_map(|(id, account)| { + if vote_program::check_id(&account.owner) { + return Some((id, account.tokens)); + } + None + }) + .collect(); + let start = (Pubkey::default(), 0); + let ranks = leaders + .into_iter() + .scan(start, |z, x| Some((x.0, z.1 + x.1))) + .collect(); + let min_slot = ((root.fork_id() + REFRESH_RATE) / REFRESH_RATE) * REFRESH_RATE; + Sched { ranks, min_slot } + } + + fn compute_node(&self, slot: u64) -> Option { + if slot < self.min_slot { + return None; + } + let total = self.ranks.last().unwrap().1; + // use a stable known hasher because this MUST be the same across the entire network + let mut hasher = FnvHasher::with_key(self.min_slot); + hasher.write(&slot.to_le_bytes()); + let random = hasher.finish(); + let val = random % total; + Some( + self.ranks + .iter() + .skip_while(|l| val < l.1) + .nth(0) + .unwrap() + .0, + ) + } +}