Skip to content

Commit

Permalink
lastidnotfound step 2: (#1300)
Browse files Browse the repository at this point in the history
lastidnotfound step 2:
  * move "record stage", aka poh_service into banking stage
  * remove Entry.has_more, is incompatible with leader rotation
  * rewrite entry_next_hash in terms of Poh
  * simplify and unify transaction hashing (no embedded nulls)
  * register_last_entry from banking stage, fixes #1171 (w00t!)
  * new PoH doesn't generate empty ledger entries, so some fixes necessary in 
         multinode tests that rely on that (e.g. giving validators airdrops)
  * make window repair less patient, if we've been waiting for an answer, 
          don't be shy about most recent blobs
   * delete recorder and record stage
   * make more verbost  thin_client error reporting
   * more tracing in window (sigh)
  • Loading branch information
rob-solana authored Sep 22, 2018
1 parent 54b407b commit be31da3
Show file tree
Hide file tree
Showing 20 changed files with 345 additions and 561 deletions.
46 changes: 2 additions & 44 deletions src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,9 +478,7 @@ impl Bank {
result?;
}
}
if !entry.has_more {
self.register_entry_id(&entry.id);
}
self.register_entry_id(&entry.id);
Ok(())
}

Expand Down Expand Up @@ -680,11 +678,9 @@ mod tests {
use hash::hash;
use ledger;
use logger;
use packet::BLOB_DATA_SIZE;
use signature::{GenKeys, KeypairUtil};
use std;
use std::io::{BufReader, Cursor, Seek, SeekFrom};
use std::mem::size_of;

#[test]
fn test_bank_new() {
Expand Down Expand Up @@ -901,25 +897,6 @@ mod tests {
assert_eq!(bank.get_balance(&mint.pubkey()), 1);
}

fn create_sample_block_with_next_entries(
mint: &Mint,
length: usize,
) -> impl Iterator<Item = Entry> {
let keypair = Keypair::new();
let hash = mint.last_id();
let mut txs = Vec::with_capacity(length);
for i in 0..length {
txs.push(Transaction::system_new(
&mint.keypair(),
keypair.pubkey(),
i as i64,
hash,
));
}
let entries = ledger::next_entries(&hash, 0, txs);
entries.into_iter()
}

fn create_sample_block_with_next_entries_using_keypairs(
mint: &Mint,
keypairs: &[Keypair],
Expand All @@ -940,21 +917,12 @@ mod tests {
for _ in 0..length {
let keypair = Keypair::new();
let tx = Transaction::system_new(&mint.keypair(), keypair.pubkey(), 1, hash);
let entry = Entry::new_mut(&mut hash, &mut num_hashes, vec![tx], false);
let entry = Entry::new_mut(&mut hash, &mut num_hashes, vec![tx]);
entries.push(entry);
}
entries.into_iter()
}

fn create_sample_ledger_with_next_entries(
length: usize,
) -> (impl Iterator<Item = Entry>, Pubkey) {
let mint = Mint::new((length * length) as i64);
let genesis = mint.create_entries();
let block = create_sample_block_with_next_entries(&mint, length);
(genesis.into_iter().chain(block), mint.pubkey())
}

fn create_sample_ledger(length: usize) -> (impl Iterator<Item = Entry>, Pubkey) {
let mint = Mint::new(1 + length as i64);
let genesis = mint.create_entries();
Expand Down Expand Up @@ -1038,16 +1006,6 @@ mod tests {
assert_eq!(bank.get_balance(&mint.pubkey()), 1);
}

#[test]
fn test_process_ledger_has_more_cross_block() {
// size_of<Transaction> is quite large for serialized size, so
// use 2 * verify_block_size to ensure we get enough txes to cross that
// block boundary with has_more set
let num_txs = (2 * VERIFY_BLOCK_SIZE) * BLOB_DATA_SIZE / size_of::<Transaction>();
let (ledger, _pubkey) = create_sample_ledger_with_next_entries(num_txs);
let bank = Bank::default();
assert!(bank.process_ledger(ledger).is_ok());
}
#[test]
fn test_new_default() {
let def_bank = Bank::default();
Expand Down
140 changes: 120 additions & 20 deletions src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@
use bank::Bank;
use bincode::deserialize;
use counter::Counter;
use entry::Entry;
use hash::{Hash, Hasher};
use log::Level;
use packet::{Packets, SharedPackets};
use poh::PohEntry;
use poh_service::PohService;
use rayon::prelude::*;
use record_stage::Signal;
use result::{Error, Result};
use service::Service;
use std::net::SocketAddr;
Expand All @@ -34,21 +37,35 @@ impl BankingStage {
pub fn new(
bank: Arc<Bank>,
verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>,
) -> (Self, Receiver<Signal>) {
let (signal_sender, signal_receiver) = channel();
tick_duration: Option<Duration>,
) -> (Self, Receiver<Vec<Entry>>) {
let (entry_sender, entry_receiver) = channel();
let thread_hdl = Builder::new()
.name("solana-banking-stage".to_string())
.spawn(move || loop {
if let Err(e) = Self::process_packets(&bank, &verified_receiver, &signal_sender) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
Error::SendError => break,
_ => error!("{:?}", e),
.spawn(move || {
let (hash_sender, hash_receiver) = channel();
let (poh_service, poh_receiver) =
PohService::new(bank.last_id(), hash_receiver, tick_duration);
loop {
if let Err(e) = Self::process_packets(
&bank,
&hash_sender,
&poh_receiver,
&verified_receiver,
&entry_sender,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
Error::SendError => break,
_ => error!("{:?}", e),
}
}
}
drop(hash_sender);
poh_service.join().unwrap();
}).unwrap();
(BankingStage { thread_hdl }, signal_receiver)
(BankingStage { thread_hdl }, entry_receiver)
}

/// Convert the transactions from a blob of binary data to a vector of transactions and
Expand All @@ -63,16 +80,97 @@ impl BankingStage {
}).collect()
}

fn process_transactions(
bank: &Arc<Bank>,
transactions: Vec<Transaction>,
hash_sender: &Sender<Hash>,
poh_receiver: &Receiver<PohEntry>,
entry_sender: &Sender<Vec<Entry>>,
) -> Result<()> {
let mut entries = Vec::new();

debug!("transactions: {}", transactions.len());

let mut chunk_start = 0;
while chunk_start != transactions.len() {
let chunk_end = chunk_start + Entry::num_will_fit(transactions[chunk_start..].to_vec());

let results = bank.process_transactions(transactions[chunk_start..chunk_end].to_vec());

debug!("results: {}", results.len());

chunk_start = chunk_end;

let mut hasher = Hasher::default();

let processed_transactions: Vec<_> = results
.into_iter()
.filter_map(|x| match x {
Ok(x) => {
hasher.hash(&x.signature.as_ref());
Some(x)
}
Err(e) => {
debug!("process transaction failed {:?}", e);
None
}
}).collect();

debug!("processed ok: {}", processed_transactions.len());

let hash = hasher.result();

if processed_transactions.len() != 0 {
hash_sender.send(hash)?;

let mut answered = false;
while !answered {
entries.extend(poh_receiver.try_iter().map(|poh| {
if let Some(mixin) = poh.mixin {
answered = true;
assert_eq!(mixin, hash);
bank.register_entry_id(&poh.id);
Entry {
num_hashes: poh.num_hashes,
id: poh.id,
transactions: processed_transactions.clone(),
}
} else {
Entry {
num_hashes: poh.num_hashes,
id: poh.id,
transactions: vec![],
}
}
}));
}
} else {
entries.extend(poh_receiver.try_iter().map(|poh| Entry {
num_hashes: poh.num_hashes,
id: poh.id,
transactions: vec![],
}));
}
}

debug!("done process_transactions, {} entries", entries.len());

Ok(entry_sender.send(entries)?)
}

/// Process the incoming packets and send output `Signal` messages to `signal_sender`.
/// Discard packets via `packet_recycler`.
pub fn process_packets(
bank: &Arc<Bank>,
hash_sender: &Sender<Hash>,
poh_receiver: &Receiver<PohEntry>,
verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
signal_sender: &Sender<Signal>,
entry_sender: &Sender<Vec<Entry>>,
) -> Result<()> {
let timer = Duration::new(1, 0);
let recv_start = Instant::now();
let mms = verified_receiver.recv_timeout(timer)?;
debug!("verified_recevier {:?}", verified_receiver);
let mut reqs_len = 0;
let mms_len = mms.len();
info!(
Expand All @@ -87,7 +185,8 @@ impl BankingStage {
for (msgs, vers) in mms {
let transactions = Self::deserialize_transactions(&msgs.read());
reqs_len += transactions.len();
let transactions = transactions

let transactions: Vec<_> = transactions
.into_iter()
.zip(vers)
.filter_map(|(tx, ver)| match tx {
Expand All @@ -99,14 +198,15 @@ impl BankingStage {
},
}).collect();

debug!("process_transactions");
let results = bank.process_transactions(transactions);
let transactions = results.into_iter().filter_map(|x| x.ok()).collect();
if let Err(_) = signal_sender.send(Signal::Transactions(transactions)) {
return Err(Error::SendError);
}
debug!("done process_transactions");
Self::process_transactions(
bank,
transactions,
hash_sender,
poh_receiver,
entry_sender,
)?;
}

let total_time_s = timing::duration_as_s(&proc_start.elapsed());
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());
info!(
Expand Down
13 changes: 7 additions & 6 deletions src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,8 @@ mod tests {
use broadcast_stage::{BroadcastStage, BroadcastStageReturnType};
use crdt::{Crdt, Node};
use entry::Entry;
use ledger::next_entries_mut;
use mint::Mint;
use recorder::Recorder;
use service::Service;
use signature::{Keypair, KeypairUtil, Pubkey};
use std::cmp;
Expand Down Expand Up @@ -361,19 +361,19 @@ mod tests {
}

let genesis_len = broadcast_info.entries.len() as u64;
let last_entry_hash = broadcast_info
let mut last_id = broadcast_info
.entries
.last()
.expect("Ledger should not be empty")
.id;
let mut num_hashes = 0;

// Input enough entries to make exactly leader_rotation_interval entries, which will
// trigger a check for leader rotation. Because the next scheduled leader
// is ourselves, we won't exit
let mut recorder = Recorder::new(last_entry_hash);

for _ in genesis_len..leader_rotation_interval {
let new_entry = recorder.record(vec![]);
let new_entry = next_entries_mut(&mut last_id, &mut num_hashes, vec![]);

broadcast_info.entry_sender.send(new_entry).unwrap();
}

Expand All @@ -388,7 +388,8 @@ mod tests {
// past the point of the leader rotation. The write_stage will see that
// it's no longer the leader after checking the crdt, and exit
for _ in 0..leader_rotation_interval {
let new_entry = recorder.record(vec![]);
let new_entry = next_entries_mut(&mut last_id, &mut num_hashes, vec![]);

match broadcast_info.entry_sender.send(new_entry) {
// We disconnected, break out of loop and check the results
Err(_) => break,
Expand Down
Loading

0 comments on commit be31da3

Please sign in to comment.