Skip to content

Commit

Permalink
very wip, simple replay stage
Browse files Browse the repository at this point in the history
  • Loading branch information
aeyakovenko committed Feb 28, 2019
1 parent 271115a commit 25044d1
Show file tree
Hide file tree
Showing 6 changed files with 355 additions and 488 deletions.
29 changes: 19 additions & 10 deletions src/bank_forks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,32 @@ impl BankForks {
banks,
}
}

pub fn frozen_banks(&self) -> Vec<u64> {
self.banks
.iter()
.filter(|(_k, v)| v.is_frozen())
.map(|(k, _v)| *k)
.collect()
}
pub fn active_banks(&self) -> Vec<u64> {
self.banks
.iter()
.filter(|(_k, v)| !v.is_frozen())
.map(|(k, _v)| *k)
.collect()
}
pub fn working_bank(&self) -> Arc<Bank> {
self.banks[&self.working_bank_id].clone()
}

pub fn get(&self, bank_id: u64) -> Option<&Arc<Bank>> {
self.banks.get(&bank_id)
}

// TODO: use the bank's own ID instead of receiving a parameter
pub fn insert(&mut self, bank_id: u64, bank: Bank) {
let mut bank = Arc::new(bank);
let bank = Arc::new(bank);
self.banks.insert(bank_id, bank.clone());

// TODO: this really only needs to look at the first
// parent if we're always calling insert()
// when we construct a child bank
while let Some(parent) = bank.parent() {
self.banks.remove(&parent.id());
bank = parent;
}
}

pub fn set_working_bank_id(&mut self, bank_id: u64) {
Expand Down
14 changes: 13 additions & 1 deletion src/blocktree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -817,21 +817,33 @@ impl Blocktree {
max_missing,
)
}

/// Returns the entry vector for the slot starting with `blob_start_index`
pub fn get_slot_entries(
&self,
slot_height: u64,
blob_start_index: u64,
max_entries: Option<u64>,
) -> Result<Vec<Entry>> {
self.get_slot_entries_with_blob_count(slot_height, blob_start_index, max_entries)
.map(|x| x.0)
}

pub fn get_slot_entries_with_blob_count(
&self,
slot_height: u64,
blob_start_index: u64,
max_entries: Option<u64>,
) -> Result<(Vec<Entry>, usize)> {
// Find the next consecutive block of blobs.
let consecutive_blobs = self.get_slot_consecutive_blobs(
slot_height,
&HashMap::new(),
blob_start_index,
max_entries,
)?;
Ok(Self::deserialize_blobs(&consecutive_blobs))
let num = consecutive_blobs.len();
Ok((Self::deserialize_blobs(&consecutive_blobs), num))
}

// Returns slots connecting to any element of the list `slot_heights`.
Expand Down
19 changes: 14 additions & 5 deletions src/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ impl Fullnode {
let bank_info = &bank_forks_info[0];
bank_forks.set_working_bank_id(bank_info.bank_id);
let bank = bank_forks.working_bank();
assert!(bank_forks.get(bank_info.bank_id).is_some());

info!(
"starting PoH... {} {}",
Expand Down Expand Up @@ -296,8 +297,16 @@ impl Fullnode {
debug!("{:?} rotating to leader role", self.id);
FullnodeReturnType::ValidatorToLeaderRotation
};
let tpu_bank = self
.bank_forks
.read()
.unwrap()
.get(rotation_info.slot)
.unwrap()
.clone();
trace!("switch to leader");
self.node_services.tpu.switch_to_leader(
&self.bank_forks.read().unwrap().working_bank(),
tpu_bank,
&self.poh_recorder,
self.tpu_sockets
.iter()
Expand Down Expand Up @@ -352,10 +361,10 @@ impl Fullnode {
trace!("{:?}: rotate at slot={}", self.id, rotation_info.slot);
//TODO: this will be called by the TVU every time it votes
//instead of here
self.poh_recorder.lock().unwrap().reset(
rotation_info.bank.tick_height(),
rotation_info.last_entry_id,
);
self.poh_recorder
.lock()
.unwrap()
.reset(rotation_info.tick_height, rotation_info.last_entry_id);
let slot = rotation_info.slot;
let transition = self.rotate(rotation_info);
debug!("role transition complete: {:?}", transition);
Expand Down
6 changes: 4 additions & 2 deletions src/poh_recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl PohRecorder {
}

pub fn set_working_bank(&mut self, working_bank: WorkingBank) {
trace!("new working bank");
self.working_bank = Some(working_bank);
}

Expand Down Expand Up @@ -94,8 +95,9 @@ impl PohRecorder {
.take_while(|x| x.1 <= working_bank.max_tick_height)
.count();
let e = if cnt > 0 {
trace!(
"flush_cache: {} {} sending: {}",
debug!(
"flush_cache: bank_id: {} tick_height: {} max: {} sending: {}",
working_bank.bank.id(),
working_bank.bank.tick_height(),
working_bank.max_tick_height,
cnt,
Expand Down
Loading

0 comments on commit 25044d1

Please sign in to comment.