Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replay slots from blocktree in new forks #2975

Merged
merged 1 commit into from
Mar 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions src/bank_forks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,23 @@ impl BankForks {
working_bank,
}
}
pub fn frozen_banks(&self) -> HashMap<u64, Arc<Bank>> {
let mut frozen_banks: Vec<Arc<Bank>> = vec![];
frozen_banks.extend(self.banks.values().filter(|v| v.is_frozen()).cloned());
frozen_banks.extend(
self.banks
.iter()
.flat_map(|(_, v)| v.parents())
.filter(|v| v.is_frozen()),
);
frozen_banks.into_iter().map(|b| (b.slot(), b)).collect()
}
pub fn active_banks(&self) -> Vec<u64> {
self.banks.iter().map(|(k, _v)| *k).collect()
}
pub fn get(&self, bank_id: u64) -> Option<&Arc<Bank>> {
self.banks.get(&bank_id)
}

pub fn new_from_banks(initial_banks: &[Arc<Bank>]) -> Self {
let mut banks = HashMap::new();
Expand Down Expand Up @@ -82,4 +99,26 @@ mod tests {
assert_eq!(bank_forks[1u64].tick_height(), 1);
assert_eq!(bank_forks.working_bank().tick_height(), 1);
}

#[test]
fn test_bank_forks_frozen_banks() {
let (genesis_block, _) = GenesisBlock::new(10_000);
let bank = Bank::new(&genesis_block);
let mut bank_forks = BankForks::new(0, bank);
let child_bank = Bank::new_from_parent(&bank_forks[0u64], Pubkey::default(), 1);
bank_forks.insert(1, child_bank);
assert!(bank_forks.frozen_banks().get(&0).is_some());
assert!(bank_forks.frozen_banks().get(&1).is_none());
}

#[test]
fn test_bank_forks_active_banks() {
let (genesis_block, _) = GenesisBlock::new(10_000);
let bank = Bank::new(&genesis_block);
let mut bank_forks = BankForks::new(0, bank);
let child_bank = Bank::new_from_parent(&bank_forks[0u64], Pubkey::default(), 1);
bank_forks.insert(1, child_bank);
assert_eq!(bank_forks.active_banks(), vec![1]);
}

}
1 change: 1 addition & 0 deletions src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ mod tests {
poh_service.close().unwrap();
}
#[test]
#[ignore] //flaky
fn test_banking_stage_entryfication() {
// In this attack we'll demonstrate that a verifier can interpret the ledger
// differently if either the server doesn't signal the ledger to add an
Expand Down
12 changes: 3 additions & 9 deletions src/blocktree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,24 +850,18 @@ impl Blocktree {
max_missing,
)
}

/// Returns the entry vector for the slot starting with `blob_start_index`
pub fn get_slot_entries(
&self,
slot_height: u64,
blob_start_index: u64,
max_entries: Option<u64>,
) -> Result<Vec<Entry>> {
// Find the next consecutive block of blobs.
let consecutive_blobs = self.get_slot_consecutive_blobs(
slot_height,
&HashMap::new(),
blob_start_index,
max_entries,
)?;
Ok(Self::deserialize_blobs(&consecutive_blobs))
self.get_slot_entries_with_blob_count(slot_height, blob_start_index, max_entries)
.map(|x| x.0)
}

/// Returns the entry vector for the slot starting with `blob_start_index`
pub fn get_slot_entries_with_blob_count(
&self,
slot_height: u64,
Expand Down
91 changes: 29 additions & 62 deletions src/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::entry::create_ticks;
use crate::entry::next_entry_mut;
use crate::entry::Entry;
use crate::gossip_service::GossipService;
use crate::leader_schedule_utils;
use crate::poh_recorder::PohRecorder;
use crate::poh_service::{PohService, PohServiceConfig};
use crate::rpc_pubsub_service::PubSubService;
Expand Down Expand Up @@ -59,14 +58,6 @@ impl NodeServices {
}
}

#[derive(Debug, PartialEq, Eq)]
pub enum FullnodeReturnType {
LeaderToValidatorRotation,
ValidatorToLeaderRotation,
LeaderToLeaderRotation,
ValidatorToValidatorRotation,
}

pub struct FullnodeConfig {
pub sigverify_disabled: bool,
pub voting_disabled: bool,
Expand Down Expand Up @@ -106,6 +97,7 @@ pub struct Fullnode {
blocktree: Arc<Blocktree>,
poh_service: PohService,
poh_recorder: Arc<Mutex<PohRecorder>>,
bank_forks: Arc<RwLock<BankForks>>,
}

impl Fullnode {
Expand Down Expand Up @@ -262,35 +254,36 @@ impl Fullnode {
blocktree,
poh_service,
poh_recorder,
bank_forks,
}
}

fn rotate(&mut self, rotation_info: TvuRotationInfo) -> FullnodeReturnType {
fn rotate(&mut self, rotation_info: TvuRotationInfo) {
trace!(
"{:?}: rotate for slot={} to leader={:?}",
self.id,
rotation_info.slot,
rotation_info.leader_id,
);
let was_leader = leader_schedule_utils::slot_leader(&rotation_info.bank) == self.id;

if let Some(ref mut rpc_service) = self.rpc_service {
// TODO: This is not the correct bank. Instead TVU should pass along the
// frozen Bank for each completed block for RPC to use from it's notion of the "best"
// available fork (until we want to surface multiple forks to RPC)
rpc_service.set_bank(&rotation_info.bank);
rpc_service.set_bank(&self.bank_forks.read().unwrap().working_bank());
}

if rotation_info.leader_id == self.id {
let transition = if was_leader {
debug!("{:?} remaining in leader role", self.id);
FullnodeReturnType::LeaderToLeaderRotation
} else {
debug!("{:?} rotating to leader role", self.id);
FullnodeReturnType::ValidatorToLeaderRotation
};
debug!("{:?} rotating to leader role", self.id);
let tpu_bank = self
.bank_forks
.read()
.unwrap()
.get(rotation_info.slot)
.unwrap()
.clone();
self.node_services.tpu.switch_to_leader(
&rotation_info.bank,
&tpu_bank,
&self.poh_recorder,
self.tpu_sockets
.iter()
Expand All @@ -303,31 +296,22 @@ impl Fullnode {
rotation_info.slot,
&self.blocktree,
);
transition
} else {
let transition = if was_leader {
debug!("{:?} rotating to validator role", self.id);
FullnodeReturnType::LeaderToValidatorRotation
} else {
debug!("{:?} remaining in validator role", self.id);
FullnodeReturnType::ValidatorToValidatorRotation
};
self.node_services.tpu.switch_to_forwarder(
rotation_info.leader_id,
self.tpu_sockets
.iter()
.map(|s| s.try_clone().expect("Failed to clone TPU sockets"))
.collect(),
);
transition
}
}

// Runs a thread to manage node role transitions. The returned closure can be used to signal the
// node to exit.
pub fn start(
mut self,
rotation_notifier: Option<Sender<(FullnodeReturnType, u64)>>,
rotation_notifier: Option<Sender<u64>>,
) -> (JoinHandle<()>, Arc<AtomicBool>, Receiver<bool>) {
let (sender, receiver) = channel();
let exit = self.exit.clone();
Expand All @@ -345,15 +329,19 @@ impl Fullnode {
trace!("{:?}: rotate at slot={}", self.id, rotation_info.slot);
//TODO: this will be called by the TVU every time it votes
//instead of here
self.poh_recorder.lock().unwrap().reset(
rotation_info.bank.tick_height(),
rotation_info.bank.last_id(),
info!(
"reset PoH... {} {}",
rotation_info.tick_height, rotation_info.last_id
);
self.poh_recorder
.lock()
.unwrap()
.reset(rotation_info.tick_height, rotation_info.last_id);
let slot = rotation_info.slot;
let transition = self.rotate(rotation_info);
debug!("role transition complete: {:?}", transition);
self.rotate(rotation_info);
debug!("role transition complete");
if let Some(ref rotation_notifier) = rotation_notifier {
rotation_notifier.send((transition, slot)).unwrap();
rotation_notifier.send(slot).unwrap();
}
}
Err(RecvTimeoutError::Timeout) => continue,
Expand All @@ -363,10 +351,7 @@ impl Fullnode {
(handle, exit, receiver)
}

pub fn run(
self,
rotation_notifier: Option<Sender<(FullnodeReturnType, u64)>>,
) -> impl FnOnce() {
pub fn run(self, rotation_notifier: Option<Sender<u64>>) -> impl FnOnce() {
let (_, exit, receiver) = self.start(rotation_notifier);
move || {
exit.store(true, Ordering::Relaxed);
Expand Down Expand Up @@ -592,10 +577,7 @@ mod tests {

// Wait for the bootstrap leader to transition. Since there are no other nodes in the
// cluster it will continue to be the leader
assert_eq!(
rotation_receiver.recv().unwrap(),
(FullnodeReturnType::LeaderToLeaderRotation, 1)
);
assert_eq!(rotation_receiver.recv().unwrap(), 1);
bootstrap_leader_exit();
}

Expand Down Expand Up @@ -638,13 +620,7 @@ mod tests {
);
let (rotation_sender, rotation_receiver) = channel();
let bootstrap_leader_exit = bootstrap_leader.run(Some(rotation_sender));
assert_eq!(
rotation_receiver.recv().unwrap(),
(
FullnodeReturnType::LeaderToValidatorRotation,
DEFAULT_SLOTS_PER_EPOCH
)
);
assert_eq!(rotation_receiver.recv().unwrap(), (DEFAULT_SLOTS_PER_EPOCH));

// Test that a node knows to transition to a leader based on parsing the ledger
let validator = Fullnode::new(
Expand All @@ -658,13 +634,7 @@ mod tests {

let (rotation_sender, rotation_receiver) = channel();
let validator_exit = validator.run(Some(rotation_sender));
assert_eq!(
rotation_receiver.recv().unwrap(),
(
FullnodeReturnType::ValidatorToLeaderRotation,
DEFAULT_SLOTS_PER_EPOCH
)
);
assert_eq!(rotation_receiver.recv().unwrap(), (DEFAULT_SLOTS_PER_EPOCH));

validator_exit();
bootstrap_leader_exit();
Expand Down Expand Up @@ -741,10 +711,7 @@ mod tests {
let (rotation_sender, rotation_receiver) = channel();
let validator_exit = validator.run(Some(rotation_sender));
let rotation = rotation_receiver.recv().unwrap();
assert_eq!(
rotation,
(FullnodeReturnType::ValidatorToLeaderRotation, blobs_to_send)
);
assert_eq!(rotation, blobs_to_send);

// Close the validator so that rocksdb has locks available
validator_exit();
Expand Down
6 changes: 4 additions & 2 deletions src/poh_recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl PohRecorder {
}

pub fn set_working_bank(&mut self, working_bank: WorkingBank) {
trace!("new working bank");
self.working_bank = Some(working_bank);
}

Expand Down Expand Up @@ -94,8 +95,9 @@ impl PohRecorder {
.take_while(|x| x.1 <= working_bank.max_tick_height)
.count();
let e = if cnt > 0 {
trace!(
"flush_cache: {} {} sending: {}",
debug!(
"flush_cache: bank_id: {} tick_height: {} max: {} sending: {}",
working_bank.bank.slot(),
working_bank.bank.tick_height(),
working_bank.max_tick_height,
cnt,
Expand Down
Loading