Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
aeyakovenko committed Feb 28, 2019
1 parent 81148a4 commit 15e82ad
Showing 1 changed file with 4 additions and 37 deletions.
41 changes: 4 additions & 37 deletions src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,28 +132,12 @@ impl ReplayStage {
for bank_id in live_bank_ids {
let bank = bank_forks.read().unwrap().get(bank_id).unwrap().clone();
if !Self::is_tpu(&bank, my_id) {
Self::replay_entries_into_forks(
&bank_forks,
&my_id,
&mut progress,
&bank,
&blocktree,
);
} else {
trace!("checking tpu fork {}", bank_id);
bank_forks.write().unwrap().set_working_bank_id(bank_id);
Self::replay_blocktree_into_bank(&bank, &blocktree, &mut progress);
}
let max_tick_height = (bank_id + 1) * bank.ticks_per_slot() - 1;
trace!(
"{} bank {} tick: {} max: {}",
my_id,
bank_id,
bank.tick_height(),
max_tick_height
);
if bank.tick_height() == max_tick_height {
trace!("{} frozen bank: {}", my_id, bank_id);
bank.freeze();
assert!(bank_forks.read().unwrap().frozen_banks().len() > 0);
votable.push(bank_id);
progress.remove(&bank_id);
}
Expand All @@ -163,7 +147,6 @@ impl ReplayStage {
votable.sort();

if let Some(latest_slot_vote) = votable.last() {
trace!("{} latest voteable bank: {}", my_id, latest_slot_vote);
let bank = bank_forks
.read()
.unwrap()
Expand All @@ -173,7 +156,6 @@ impl ReplayStage {
let next_slot = *latest_slot_vote + 1;
let next_leader =
LeaderScheduler::default().slot_leader_at(next_slot, &bank);
trace!("{} next_leader {}", my_id, next_leader);
cluster_info.write().unwrap().set_leader(next_leader);

subscriptions.notify_subscribers(&bank);
Expand All @@ -189,13 +171,11 @@ impl ReplayStage {
cluster_info.write().unwrap().push_vote(vote);
}
if next_leader == my_id {
trace!("{} I am next_leader", my_id);
let mut wforks = bank_forks.write().unwrap();
let has_bank = wforks.get(next_slot).is_some();
if !has_bank {
let tpu_bank =
Bank::new_from_parent_and_id(&bank, my_id, next_slot);
trace!("{} I am next_leader, new_bank {}", my_id, tpu_bank.id());
wforks.insert(next_slot, tpu_bank);
wforks.set_working_bank_id(next_slot);
}
Expand Down Expand Up @@ -228,33 +208,20 @@ impl ReplayStage {
(Self { t_replay, exit })
}

pub fn replay_entries_into_forks(
bank_forks: &Arc<RwLock<BankForks>>,
my_id: &Pubkey,
progress: &mut HashMap<u64, (Hash, usize)>,
pub fn replay_blocktree_into_bank(
bank: &Bank,
blocktree: &Blocktree,
progress: &mut HashMap<u64, (Hash, usize)>,
) {
let bank_id = bank.id();
let bank_progress = &mut progress.entry(bank_id).or_insert((bank.last_id(), 0));
trace!("replay fork {} {}", bank_id, bank_progress.1);
let result =
blocktree.get_slot_entries_with_blob_count(bank_id, bank_progress.1 as u64, None);
if let Ok((entries, num)) = result {
trace!(
"replay fork {} {} num new blobs: {}",
bank_id,
bank_progress.1,
num
);
bank_forks.write().unwrap().set_working_bank_id(bank_id);
let err = Self::verify_and_process_entries(&bank, &entries, &bank_progress.0);
if err.is_err() {
//TODO: mark this fork as failed
trace!("{} verify_and_process_entries failed {:?}", my_id, err);
inc_new_counter_info!("replicate-stage_failed_process_entries", entries.len());
} else if num > 0 {
trace!("{} verify_and_process_entries passed", my_id);
}
bank_progress.1 += num;
if let Some(last_entry) = entries.last() {
Expand Down

0 comments on commit 15e82ad

Please sign in to comment.