Skip to content

Commit

Permalink
works!
Browse files Browse the repository at this point in the history
  • Loading branch information
aeyakovenko committed Feb 28, 2019
1 parent 007ff57 commit 81148a4
Showing 1 changed file with 54 additions and 27 deletions.
81 changes: 54 additions & 27 deletions src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::service::Service;
use crate::tvu::{TvuRotationInfo, TvuRotationSender};
use solana_metrics::counter::Counter;
use solana_runtime::bank::Bank;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::KeypairUtil;
use solana_sdk::timing::duration_as_ms;
Expand Down Expand Up @@ -82,9 +83,11 @@ impl ReplayStage {
let bank = bank_forks.read().unwrap().get(bank_id).unwrap().clone();
if Self::is_tpu(&bank, my_id) {
trace!(
"sending rotate signal to bootstrap bank {} {}",
"sending rotate signal to bootstrap bank {} {} {} forks_info_id: {}",
bank.id(),
bank.tick_height()
bank.tick_height(),
bank.last_id(),
bank_forks_info[0].last_entry_id,
);
// RPC can be made aware of last slot's bank
to_leader_sender
Expand All @@ -98,9 +101,9 @@ impl ReplayStage {
break;
}
}
let mut progress: HashMap<u64, usize> = bank_forks_info
let mut progress: HashMap<u64, (Hash, usize)> = bank_forks_info
.iter()
.map(|x| (x.bank_id, x.next_blob_index as usize))
.map(|x| (x.bank_id, (x.last_entry_id, x.next_blob_index as usize)))
.collect();

// Start the replay stage loop
Expand Down Expand Up @@ -129,28 +132,13 @@ impl ReplayStage {
for bank_id in live_bank_ids {
let bank = bank_forks.read().unwrap().get(bank_id).unwrap().clone();
if !Self::is_tpu(&bank, my_id) {
let blob_ix = *progress.get(&bank_id).unwrap_or(&0);
trace!("replay fork {} {}", bank_id, blob_ix);
let result = blocktree.get_slot_entries_with_blob_count(
bank_id,
blob_ix as u64,
None,
Self::replay_entries_into_forks(
&bank_forks,
&my_id,
&mut progress,
&bank,
&blocktree,
);
if let Ok((entries, num)) = result {
progress.insert(bank_id, blob_ix + num);
bank_forks.write().unwrap().set_working_bank_id(bank_id);
let err = Self::verify_and_process_entries(&bank, &entries);
if err.is_err() {
//TODO: mark this fork as failed
trace!("{} verify_and_process_entries failed {:?}", my_id, err);
inc_new_counter_info!(
"replicate-stage_failed_process_entries",
entries.len()
);
} else {
trace!("{} verify_and_process_entries passed", my_id);
}
}
} else {
trace!("checking tpu fork {}", bank_id);
}
Expand Down Expand Up @@ -240,6 +228,41 @@ impl ReplayStage {
(Self { t_replay, exit })
}

pub fn replay_entries_into_forks(
bank_forks: &Arc<RwLock<BankForks>>,
my_id: &Pubkey,
progress: &mut HashMap<u64, (Hash, usize)>,
bank: &Bank,
blocktree: &Blocktree,
) {
let bank_id = bank.id();
let bank_progress = &mut progress.entry(bank_id).or_insert((bank.last_id(), 0));
trace!("replay fork {} {}", bank_id, bank_progress.1);
let result =
blocktree.get_slot_entries_with_blob_count(bank_id, bank_progress.1 as u64, None);
if let Ok((entries, num)) = result {
trace!(
"replay fork {} {} num new blobs: {}",
bank_id,
bank_progress.1,
num
);
bank_forks.write().unwrap().set_working_bank_id(bank_id);
let err = Self::verify_and_process_entries(&bank, &entries, &bank_progress.0);
if err.is_err() {
//TODO: mark this fork as failed
trace!("{} verify_and_process_entries failed {:?}", my_id, err);
inc_new_counter_info!("replicate-stage_failed_process_entries", entries.len());
} else if num > 0 {
trace!("{} verify_and_process_entries passed", my_id);
}
bank_progress.1 += num;
if let Some(last_entry) = entries.last() {
bank_progress.0 = last_entry.id;
}
}
}

pub fn is_tpu(bank: &Bank, my_id: Pubkey) -> bool {
my_id == LeaderScheduler::default().slot_leader_at(bank.id(), &bank)
}
Expand All @@ -253,8 +276,12 @@ impl ReplayStage {
self.exit.store(true, Ordering::Relaxed);
}

pub fn verify_and_process_entries(bank: &Bank, entries: &[Entry]) -> result::Result<()> {
if !entries.verify(&bank.last_id()) {
pub fn verify_and_process_entries(
bank: &Bank,
entries: &[Entry],
last_entry: &Hash,
) -> result::Result<()> {
if !entries.verify(last_entry) {
return Err(result::Error::BlobError(BlobError::VerificationFailed));
}
blocktree_processor::process_entries(bank, entries)?;
Expand Down

0 comments on commit 81148a4

Please sign in to comment.