Skip to content

Commit

Permalink
forking sched idea
Browse files Browse the repository at this point in the history
  • Loading branch information
aeyakovenko committed Jan 28, 2019
1 parent d45ef86 commit 2f4ee4b
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 5 deletions.
18 changes: 14 additions & 4 deletions src/bank_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::last_id_queue::{LastIdQueue, MAX_ENTRY_IDS};
use crate::leader_scheduler::TICKS_PER_BLOCK;
use crate::poh_recorder::PohRecorder;
use crate::runtime::{self, RuntimeError};
use crate::sched::Sched;
use crate::status_cache::StatusCache;
use log::Level;
use rayon::prelude::*;
Expand All @@ -30,6 +31,9 @@ pub struct BankCheckpoint {
status_cache: RwLock<StatusCache>,
finalized: AtomicBool,
fork_id: AtomicUsize,
/// current schedule and next schedule
/// These are rotated on BankCheckpoint::merge_into_root
sched: RwLock<(Sched, Sched)>,
}

impl std::fmt::Debug for BankCheckpoint {
Expand Down Expand Up @@ -211,12 +215,13 @@ impl BankCheckpoint {
/// consume the checkpoint into the root state
/// self becomes the new root and its fork_id is updated
pub fn merge_into_root(&self, other: Self) {
let (accounts, entry_q, status_cache, fork_id) = {
let prev_fork_id = self.fork_id();
let (accounts, entry_q, status_cache, new_fork_id) = {
(
other.accounts,
other.entry_q,
other.status_cache,
other.fork_id,
other.fork_id(),
)
};
self.accounts.merge_into_root(accounts);
Expand All @@ -228,8 +233,13 @@ impl BankCheckpoint {
.write()
.unwrap()
.merge_into_root(status_cache.into_inner().unwrap());
self.fork_id
.store(fork_id.load(Ordering::Relaxed), Ordering::Relaxed);
self.fork_id.store(new_fork_id);
if Sched::should_regenerate(prev_fork_id, new_fork_id) {
let next_sched = Sched::new_schedule(&self);
let sched = self.sched.write().unwrap();
*sched.0 = sched.1;
*sched.1 = next_sched;
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/broadcast_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ impl BroadcastService {
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => {
return BroadcastServiceReturnType::ChannelDisconnected
return BroadcastServiceReturnType::ChannelDisconnected;
}
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
Error::ClusterInfoError(ClusterInfoError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these?
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub mod rpc_mock;
pub mod rpc_pubsub;
pub mod rpc_request;
pub mod runtime;
pub mod sched;
pub mod service;
pub mod signature;
pub mod sigverify;
Expand Down
60 changes: 60 additions & 0 deletions src/sched.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use crate::bank_state::BankCheckpoint;
use fnv::FnvHasher;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::vote_program;
use std::hash::Hasher;

pub const REFRESH_RATE: u64 = 1000;

struct Sched {
min_slot: u64,
ranks: Vec<(Pubkey, u64)>,
}

impl Sched {
fn should_regenerate(prev_root: u64, new_root: u64) -> bool {
prev_root / REFRESH_RATE != new_root / REFRESH_RATE
}

/// ranked leaders
fn new_schedule(root: &BankCheckpoint) -> Sched {
let accounts = root.accounts.accounts_db.read().unwrap();
let leaders: Vec<(Pubkey, u64)> = accounts
.accounts
.iter()
.filter_map(|(id, account)| {
if vote_program::check_id(&account.owner) {
return Some((id, account.tokens));
}
None
})
.collect();
let start = (Pubkey::default(), 0);
let ranks = leaders
.into_iter()
.scan(start, |z, x| Some((x.0, z.1 + x.1)))
.collect();
let min_slot = ((root.fork_id() + REFRESH_RATE) / REFRESH_RATE) * REFRESH_RATE;
Sched { ranks, min_slot }
}

fn compute_node(&self, slot: u64) -> Option<Pubkey> {
if slot < self.min_slot {
return None;
}
let total = self.ranks.last().unwrap().1;
// use a stable known hasher because this MUST be the same across the entire network
let mut hasher = FnvHasher::with_key(self.min_slot);
hasher.write(&slot.to_le_bytes());
let random = hasher.finish();
let val = random % total;
Some(
self.ranks
.iter()
.skip_while(|l| val < l.1)
.nth(0)
.unwrap()
.0,
)
}
}

0 comments on commit 2f4ee4b

Please sign in to comment.