Skip to content

Commit

Permalink
simple replay stage
Browse files Browse the repository at this point in the history
  • Loading branch information
aeyakovenko committed Mar 2, 2019
1 parent 2782922 commit 6b2697f
Show file tree
Hide file tree
Showing 8 changed files with 283 additions and 441 deletions.
39 changes: 39 additions & 0 deletions src/bank_forks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,23 @@ impl BankForks {
working_bank,
}
}
pub fn frozen_banks(&self) -> HashMap<u64, Arc<Bank>> {
let mut frozen_banks: Vec<Arc<Bank>> = vec![];
frozen_banks.extend(self.banks.values().filter(|v| v.is_frozen()).cloned());
frozen_banks.extend(
self.banks
.iter()
.flat_map(|(_, v)| v.parents())
.filter(|v| v.is_frozen()),
);
frozen_banks.into_iter().map(|b| (b.slot(), b)).collect()
}
pub fn active_banks(&self) -> Vec<u64> {
self.banks.iter().map(|(k, _v)| *k).collect()
}
pub fn get(&self, bank_id: u64) -> Option<&Arc<Bank>> {
self.banks.get(&bank_id)
}

pub fn new_from_banks(initial_banks: &[Arc<Bank>]) -> Self {
let mut banks = HashMap::new();
Expand Down Expand Up @@ -82,4 +99,26 @@ mod tests {
assert_eq!(bank_forks[1u64].tick_height(), 1);
assert_eq!(bank_forks.working_bank().tick_height(), 1);
}

#[test]
fn test_bank_forks_frozen_banks() {
let (genesis_block, _) = GenesisBlock::new(10_000);
let bank = Bank::new(&genesis_block);
let mut bank_forks = BankForks::new(0, bank);
let child_bank = Bank::new_from_parent(&bank_forks[0u64], Pubkey::default(), 1);
bank_forks.insert(1, child_bank);
assert!(bank_forks.frozen_banks().get(&0).is_some());
assert!(bank_forks.frozen_banks().get(&1).is_none());
}

#[test]
fn test_bank_forks_active_banks() {
let (genesis_block, _) = GenesisBlock::new(10_000);
let bank = Bank::new(&genesis_block);
let mut bank_forks = BankForks::new(0, bank);
let child_bank = Bank::new_from_parent(&bank_forks[0u64], Pubkey::default(), 1);
bank_forks.insert(1, child_bank);
assert_eq!(bank_forks.active_banks(), vec![1]);
}

}
1 change: 1 addition & 0 deletions src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ mod tests {
poh_service.close().unwrap();
}
#[test]
#[ignore] //flaky
fn test_banking_stage_entryfication() {
// In this attack we'll demonstrate that a verifier can interpret the ledger
// differently if either the server doesn't signal the ledger to add an
Expand Down
12 changes: 3 additions & 9 deletions src/blocktree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,24 +850,18 @@ impl Blocktree {
max_missing,
)
}

/// Returns the entry vector for the slot starting with `blob_start_index`
pub fn get_slot_entries(
&self,
slot_height: u64,
blob_start_index: u64,
max_entries: Option<u64>,
) -> Result<Vec<Entry>> {
// Find the next consecutive block of blobs.
let consecutive_blobs = self.get_slot_consecutive_blobs(
slot_height,
&HashMap::new(),
blob_start_index,
max_entries,
)?;
Ok(Self::deserialize_blobs(&consecutive_blobs))
self.get_slot_entries_with_blob_count(slot_height, blob_start_index, max_entries)
.map(|x| x.0)
}

/// Returns the entry vector for the slot starting with `blob_start_index`
pub fn get_slot_entries_with_blob_count(
&self,
slot_height: u64,
Expand Down
91 changes: 29 additions & 62 deletions src/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::entry::create_ticks;
use crate::entry::next_entry_mut;
use crate::entry::Entry;
use crate::gossip_service::GossipService;
use crate::leader_schedule_utils;
use crate::poh_recorder::PohRecorder;
use crate::poh_service::{PohService, PohServiceConfig};
use crate::rpc_pubsub_service::PubSubService;
Expand Down Expand Up @@ -59,14 +58,6 @@ impl NodeServices {
}
}

#[derive(Debug, PartialEq, Eq)]
pub enum FullnodeReturnType {
LeaderToValidatorRotation,
ValidatorToLeaderRotation,
LeaderToLeaderRotation,
ValidatorToValidatorRotation,
}

pub struct FullnodeConfig {
pub sigverify_disabled: bool,
pub voting_disabled: bool,
Expand Down Expand Up @@ -106,6 +97,7 @@ pub struct Fullnode {
blocktree: Arc<Blocktree>,
poh_service: PohService,
poh_recorder: Arc<Mutex<PohRecorder>>,
bank_forks: Arc<RwLock<BankForks>>,
}

impl Fullnode {
Expand Down Expand Up @@ -262,35 +254,36 @@ impl Fullnode {
blocktree,
poh_service,
poh_recorder,
bank_forks,
}
}

fn rotate(&mut self, rotation_info: TvuRotationInfo) -> FullnodeReturnType {
fn rotate(&mut self, rotation_info: TvuRotationInfo) {
trace!(
"{:?}: rotate for slot={} to leader={:?}",
self.id,
rotation_info.slot,
rotation_info.leader_id,
);
let was_leader = leader_schedule_utils::slot_leader(&rotation_info.bank) == self.id;

if let Some(ref mut rpc_service) = self.rpc_service {
// TODO: This is not the correct bank. Instead TVU should pass along the
// frozen Bank for each completed block for RPC to use from it's notion of the "best"
// available fork (until we want to surface multiple forks to RPC)
rpc_service.set_bank(&rotation_info.bank);
rpc_service.set_bank(&self.bank_forks.read().unwrap().working_bank());
}

if rotation_info.leader_id == self.id {
let transition = if was_leader {
debug!("{:?} remaining in leader role", self.id);
FullnodeReturnType::LeaderToLeaderRotation
} else {
debug!("{:?} rotating to leader role", self.id);
FullnodeReturnType::ValidatorToLeaderRotation
};
debug!("{:?} rotating to leader role", self.id);
let tpu_bank = self
.bank_forks
.read()
.unwrap()
.get(rotation_info.slot)
.unwrap()
.clone();
self.node_services.tpu.switch_to_leader(
&rotation_info.bank,
&tpu_bank,
&self.poh_recorder,
self.tpu_sockets
.iter()
Expand All @@ -303,31 +296,22 @@ impl Fullnode {
rotation_info.slot,
&self.blocktree,
);
transition
} else {
let transition = if was_leader {
debug!("{:?} rotating to validator role", self.id);
FullnodeReturnType::LeaderToValidatorRotation
} else {
debug!("{:?} remaining in validator role", self.id);
FullnodeReturnType::ValidatorToValidatorRotation
};
self.node_services.tpu.switch_to_forwarder(
rotation_info.leader_id,
self.tpu_sockets
.iter()
.map(|s| s.try_clone().expect("Failed to clone TPU sockets"))
.collect(),
);
transition
}
}

// Runs a thread to manage node role transitions. The returned closure can be used to signal the
// node to exit.
pub fn start(
mut self,
rotation_notifier: Option<Sender<(FullnodeReturnType, u64)>>,
rotation_notifier: Option<Sender<u64>>,
) -> (JoinHandle<()>, Arc<AtomicBool>, Receiver<bool>) {
let (sender, receiver) = channel();
let exit = self.exit.clone();
Expand All @@ -345,15 +329,19 @@ impl Fullnode {
trace!("{:?}: rotate at slot={}", self.id, rotation_info.slot);
//TODO: this will be called by the TVU every time it votes
//instead of here
self.poh_recorder.lock().unwrap().reset(
rotation_info.bank.tick_height(),
rotation_info.bank.last_id(),
info!(
"reset PoH... {} {}",
rotation_info.tick_height, rotation_info.last_id
);
self.poh_recorder
.lock()
.unwrap()
.reset(rotation_info.tick_height, rotation_info.last_id);
let slot = rotation_info.slot;
let transition = self.rotate(rotation_info);
debug!("role transition complete: {:?}", transition);
self.rotate(rotation_info);
debug!("role transition complete");
if let Some(ref rotation_notifier) = rotation_notifier {
rotation_notifier.send((transition, slot)).unwrap();
rotation_notifier.send(slot).unwrap();
}
}
Err(RecvTimeoutError::Timeout) => continue,
Expand All @@ -363,10 +351,7 @@ impl Fullnode {
(handle, exit, receiver)
}

pub fn run(
self,
rotation_notifier: Option<Sender<(FullnodeReturnType, u64)>>,
) -> impl FnOnce() {
pub fn run(self, rotation_notifier: Option<Sender<u64>>) -> impl FnOnce() {
let (_, exit, receiver) = self.start(rotation_notifier);
move || {
exit.store(true, Ordering::Relaxed);
Expand Down Expand Up @@ -592,10 +577,7 @@ mod tests {

// Wait for the bootstrap leader to transition. Since there are no other nodes in the
// cluster it will continue to be the leader
assert_eq!(
rotation_receiver.recv().unwrap(),
(FullnodeReturnType::LeaderToLeaderRotation, 1)
);
assert_eq!(rotation_receiver.recv().unwrap(), 1);
bootstrap_leader_exit();
}

Expand Down Expand Up @@ -638,13 +620,7 @@ mod tests {
);
let (rotation_sender, rotation_receiver) = channel();
let bootstrap_leader_exit = bootstrap_leader.run(Some(rotation_sender));
assert_eq!(
rotation_receiver.recv().unwrap(),
(
FullnodeReturnType::LeaderToValidatorRotation,
DEFAULT_SLOTS_PER_EPOCH
)
);
assert_eq!(rotation_receiver.recv().unwrap(), (DEFAULT_SLOTS_PER_EPOCH));

// Test that a node knows to transition to a leader based on parsing the ledger
let validator = Fullnode::new(
Expand All @@ -658,13 +634,7 @@ mod tests {

let (rotation_sender, rotation_receiver) = channel();
let validator_exit = validator.run(Some(rotation_sender));
assert_eq!(
rotation_receiver.recv().unwrap(),
(
FullnodeReturnType::ValidatorToLeaderRotation,
DEFAULT_SLOTS_PER_EPOCH
)
);
assert_eq!(rotation_receiver.recv().unwrap(), (DEFAULT_SLOTS_PER_EPOCH));

validator_exit();
bootstrap_leader_exit();
Expand Down Expand Up @@ -741,10 +711,7 @@ mod tests {
let (rotation_sender, rotation_receiver) = channel();
let validator_exit = validator.run(Some(rotation_sender));
let rotation = rotation_receiver.recv().unwrap();
assert_eq!(
rotation,
(FullnodeReturnType::ValidatorToLeaderRotation, blobs_to_send)
);
assert_eq!(rotation, blobs_to_send);

// Close the validator so that rocksdb has locks available
validator_exit();
Expand Down
6 changes: 4 additions & 2 deletions src/poh_recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl PohRecorder {
}

pub fn set_working_bank(&mut self, working_bank: WorkingBank) {
trace!("new working bank");
self.working_bank = Some(working_bank);
}

Expand Down Expand Up @@ -94,8 +95,9 @@ impl PohRecorder {
.take_while(|x| x.1 <= working_bank.max_tick_height)
.count();
let e = if cnt > 0 {
trace!(
"flush_cache: {} {} sending: {}",
debug!(
"flush_cache: bank_id: {} tick_height: {} max: {} sending: {}",
working_bank.bank.slot(),
working_bank.bank.tick_height(),
working_bank.max_tick_height,
cnt,
Expand Down
Loading

0 comments on commit 6b2697f

Please sign in to comment.