Skip to content

Commit

Permalink
Par process entries (#1499)
Browse files Browse the repository at this point in the history
* Parallel entry processor.
  • Loading branch information
aeyakovenko authored Oct 16, 2018
1 parent d09889b commit 2bd8775
Showing 1 changed file with 178 additions and 1 deletion.
179 changes: 178 additions & 1 deletion src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use log::Level;
use mint::Mint;
use payment_plan::Payment;
use poh_recorder::PohRecorder;
use rayon::prelude::*;
use rpc::RpcSignatureStatus;
use signature::Keypair;
use signature::Signature;
Expand Down Expand Up @@ -931,9 +932,62 @@ impl Bank {

/// Process an ordered list of entries.
pub fn process_entries(&self, entries: &[Entry]) -> Result<()> {
self.par_process_entries(entries)
}

pub fn first_err(results: &[Result<()>]) -> Result<()> {
for r in results {
r.clone()?;
}
Ok(())
}
pub fn par_execute_entries(&self, entries: &[(&Entry, Vec<Result<()>>)]) -> Result<()> {
inc_new_counter_info!("bank-par_execute_entries-count", entries.len());
let results: Vec<Result<()>> = entries
.into_par_iter()
.map(|(e, locks)| {
let results = self.execute_and_commit_transactions(
&e.transactions,
locks.to_vec(),
MAX_ENTRY_IDS,
);
self.unlock_accounts(&e.transactions, &results);
Self::first_err(&results)
}).collect();
Self::first_err(&results)
}

/// process entries in parallel
/// 1. In order lock accounts for each entry while the lock succeeds, up to a Tick entry
/// 2. Process the locked group in parallel
/// 3. Register the `Tick` if it's available, goto 1
pub fn par_process_entries(&self, entries: &[Entry]) -> Result<()> {
// accumulator for entries that can be processed in parallel
let mut mt_group = vec![];
for entry in entries {
self.process_entry(&entry)?;
if entry.transactions.is_empty() {
// if its a tick, execute the group and register the tick
self.par_execute_entries(&mt_group)?;
self.register_entry_id(&entry.id);
mt_group = vec![];
continue;
}
// try to lock the accounts
let locked = self.lock_accounts(&entry.transactions);
// if any of the locks error out
// execute the current group
if Self::first_err(&locked).is_err() {
self.par_execute_entries(&mt_group)?;
mt_group = vec![];
//reset the lock and push the entry
let locked = self.lock_accounts(&entry.transactions);
mt_group.push((entry, locked));
} else {
// push the entry to the mt_group
mt_group.push((entry, locked));
}
}
self.par_execute_entries(&mt_group)?;
Ok(())
}

Expand Down Expand Up @@ -1906,4 +1960,127 @@ mod tests {
// hash3 is not in the q
assert_eq!(Bank::check_entry_id_age(&q, hash3, 3), false);
}
#[test]
fn test_first_err() {
assert_eq!(Bank::first_err(&[Ok(())]), Ok(()));
assert_eq!(
Bank::first_err(&[Ok(()), Err(BankError::DuplicateSignature)]),
Err(BankError::DuplicateSignature)
);
assert_eq!(
Bank::first_err(&[
Ok(()),
Err(BankError::DuplicateSignature),
Err(BankError::AccountInUse)
]),
Err(BankError::DuplicateSignature)
);
assert_eq!(
Bank::first_err(&[
Ok(()),
Err(BankError::AccountInUse),
Err(BankError::DuplicateSignature)
]),
Err(BankError::AccountInUse)
);
assert_eq!(
Bank::first_err(&[
Err(BankError::AccountInUse),
Ok(()),
Err(BankError::DuplicateSignature)
]),
Err(BankError::AccountInUse)
);
}
#[test]
fn test_par_process_entries_tick() {
let mint = Mint::new(1000);
let bank = Bank::new(&mint);

// ensure bank can process a tick
let tick = next_entry(&mint.last_id(), 1, vec![]);
assert_eq!(bank.par_process_entries(&[tick.clone()]), Ok(()));
assert_eq!(bank.last_id(), tick.id);
}
#[test]
fn test_par_process_entries_2_entries_collision() {
let mint = Mint::new(1000);
let bank = Bank::new(&mint);
let keypair1 = Keypair::new();
let keypair2 = Keypair::new();

let last_id = bank.last_id();

// ensure bank can process 2 entries that have a common account and no tick is registered
let tx = Transaction::system_new(&mint.keypair(), keypair1.pubkey(), 2, bank.last_id());
let entry_1 = next_entry(&last_id, 1, vec![tx]);
let tx = Transaction::system_new(&mint.keypair(), keypair2.pubkey(), 2, bank.last_id());
let entry_2 = next_entry(&entry_1.id, 1, vec![tx]);
assert_eq!(bank.par_process_entries(&[entry_1, entry_2]), Ok(()));
assert_eq!(bank.get_balance(&keypair1.pubkey()), 2);
assert_eq!(bank.get_balance(&keypair2.pubkey()), 2);
assert_eq!(bank.last_id(), last_id);
}
#[test]
fn test_par_process_entries_2_entries_par() {
let mint = Mint::new(1000);
let bank = Bank::new(&mint);
let keypair1 = Keypair::new();
let keypair2 = Keypair::new();
let keypair3 = Keypair::new();
let keypair4 = Keypair::new();

//load accounts
let tx = Transaction::system_new(&mint.keypair(), keypair1.pubkey(), 1, bank.last_id());
assert_eq!(bank.process_transaction(&tx), Ok(()));
let tx = Transaction::system_new(&mint.keypair(), keypair2.pubkey(), 1, bank.last_id());
assert_eq!(bank.process_transaction(&tx), Ok(()));

// ensure bank can process 2 entries that do not have a common account and no tick is registered
let last_id = bank.last_id();
let tx = Transaction::system_new(&keypair1, keypair3.pubkey(), 1, bank.last_id());
let entry_1 = next_entry(&last_id, 1, vec![tx]);
let tx = Transaction::system_new(&keypair2, keypair4.pubkey(), 1, bank.last_id());
let entry_2 = next_entry(&entry_1.id, 1, vec![tx]);
assert_eq!(bank.par_process_entries(&[entry_1, entry_2]), Ok(()));
assert_eq!(bank.get_balance(&keypair3.pubkey()), 1);
assert_eq!(bank.get_balance(&keypair4.pubkey()), 1);
assert_eq!(bank.last_id(), last_id);
}
#[test]
fn test_par_process_entries_2_entries_tick() {
let mint = Mint::new(1000);
let bank = Bank::new(&mint);
let keypair1 = Keypair::new();
let keypair2 = Keypair::new();
let keypair3 = Keypair::new();
let keypair4 = Keypair::new();

//load accounts
let tx = Transaction::system_new(&mint.keypair(), keypair1.pubkey(), 1, bank.last_id());
assert_eq!(bank.process_transaction(&tx), Ok(()));
let tx = Transaction::system_new(&mint.keypair(), keypair2.pubkey(), 1, bank.last_id());
assert_eq!(bank.process_transaction(&tx), Ok(()));

let last_id = bank.last_id();

// ensure bank can process 2 entries that do not have a common account and tick is registered
let tx = Transaction::system_new(&keypair2, keypair3.pubkey(), 1, bank.last_id());
let entry_1 = next_entry(&last_id, 1, vec![tx]);
let new_tick = next_entry(&entry_1.id, 1, vec![]);
let tx = Transaction::system_new(&keypair1, keypair4.pubkey(), 1, new_tick.id);
let entry_2 = next_entry(&new_tick.id, 1, vec![tx]);
assert_eq!(
bank.par_process_entries(&[entry_1.clone(), new_tick.clone(), entry_2]),
Ok(())
);
assert_eq!(bank.get_balance(&keypair3.pubkey()), 1);
assert_eq!(bank.get_balance(&keypair4.pubkey()), 1);
assert_eq!(bank.last_id(), new_tick.id);
// ensure that errors are returned
assert_eq!(
bank.par_process_entries(&[entry_1]),
Err(BankError::AccountNotFound)
);
}
}

0 comments on commit 2bd8775

Please sign in to comment.