diff --git a/src/bank.rs b/src/bank.rs index f3c9a7b4de22e3..60d3d794f42359 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -19,6 +19,7 @@ use log::Level; use mint::Mint; use payment_plan::Payment; use poh_recorder::PohRecorder; +use rayon::prelude::*; use rpc::RpcSignatureStatus; use signature::Keypair; use signature::Signature; @@ -931,9 +932,62 @@ impl Bank { /// Process an ordered list of entries. pub fn process_entries(&self, entries: &[Entry]) -> Result<()> { + self.par_process_entries(entries) + } + + pub fn first_err(results: &[Result<()>]) -> Result<()> { + for r in results { + r.clone()?; + } + Ok(()) + } + pub fn par_execute_entries(&self, entries: &[(&Entry, Vec>)]) -> Result<()> { + inc_new_counter_info!("bank-par_execute_entries-count", entries.len()); + let results: Vec> = entries + .into_par_iter() + .map(|(e, locks)| { + let results = self.execute_and_commit_transactions( + &e.transactions, + locks.to_vec(), + MAX_ENTRY_IDS, + ); + self.unlock_accounts(&e.transactions, &results); + Self::first_err(&results) + }).collect(); + Self::first_err(&results) + } + + /// process entries in parallel + /// 1. In order lock accounts for each entry while the lock succeeds, up to a Tick entry + /// 2. Process the locked group in parallel + /// 3. Register the `Tick` if it's available, goto 1 + pub fn par_process_entries(&self, entries: &[Entry]) -> Result<()> { + // accumulator for entries that can be processed in parallel + let mut mt_group = vec![]; for entry in entries { - self.process_entry(&entry)?; + if entry.transactions.is_empty() { + // if its a tick, execute the group and register the tick + self.par_execute_entries(&mt_group)?; + self.register_entry_id(&entry.id); + mt_group = vec![]; + continue; + } + // try to lock the accounts + let locked = self.lock_accounts(&entry.transactions); + // if any of the locks error out + // execute the current group + if Self::first_err(&locked).is_err() { + self.par_execute_entries(&mt_group)?; + mt_group = vec![]; + //reset the lock and push the entry + let locked = self.lock_accounts(&entry.transactions); + mt_group.push((entry, locked)); + } else { + // push the entry to the mt_group + mt_group.push((entry, locked)); + } } + self.par_execute_entries(&mt_group)?; Ok(()) } @@ -1906,4 +1960,127 @@ mod tests { // hash3 is not in the q assert_eq!(Bank::check_entry_id_age(&q, hash3, 3), false); } + #[test] + fn test_first_err() { + assert_eq!(Bank::first_err(&[Ok(())]), Ok(())); + assert_eq!( + Bank::first_err(&[Ok(()), Err(BankError::DuplicateSignature)]), + Err(BankError::DuplicateSignature) + ); + assert_eq!( + Bank::first_err(&[ + Ok(()), + Err(BankError::DuplicateSignature), + Err(BankError::AccountInUse) + ]), + Err(BankError::DuplicateSignature) + ); + assert_eq!( + Bank::first_err(&[ + Ok(()), + Err(BankError::AccountInUse), + Err(BankError::DuplicateSignature) + ]), + Err(BankError::AccountInUse) + ); + assert_eq!( + Bank::first_err(&[ + Err(BankError::AccountInUse), + Ok(()), + Err(BankError::DuplicateSignature) + ]), + Err(BankError::AccountInUse) + ); + } + #[test] + fn test_par_process_entries_tick() { + let mint = Mint::new(1000); + let bank = Bank::new(&mint); + + // ensure bank can process a tick + let tick = next_entry(&mint.last_id(), 1, vec![]); + assert_eq!(bank.par_process_entries(&[tick.clone()]), Ok(())); + assert_eq!(bank.last_id(), tick.id); + } + #[test] + fn test_par_process_entries_2_entries_collision() { + let mint = Mint::new(1000); + let bank = Bank::new(&mint); + let keypair1 = Keypair::new(); + let keypair2 = Keypair::new(); + + let last_id = bank.last_id(); + + // ensure bank can process 2 entries that have a common account and no tick is registered + let tx = Transaction::system_new(&mint.keypair(), keypair1.pubkey(), 2, bank.last_id()); + let entry_1 = next_entry(&last_id, 1, vec![tx]); + let tx = Transaction::system_new(&mint.keypair(), keypair2.pubkey(), 2, bank.last_id()); + let entry_2 = next_entry(&entry_1.id, 1, vec![tx]); + assert_eq!(bank.par_process_entries(&[entry_1, entry_2]), Ok(())); + assert_eq!(bank.get_balance(&keypair1.pubkey()), 2); + assert_eq!(bank.get_balance(&keypair2.pubkey()), 2); + assert_eq!(bank.last_id(), last_id); + } + #[test] + fn test_par_process_entries_2_entries_par() { + let mint = Mint::new(1000); + let bank = Bank::new(&mint); + let keypair1 = Keypair::new(); + let keypair2 = Keypair::new(); + let keypair3 = Keypair::new(); + let keypair4 = Keypair::new(); + + //load accounts + let tx = Transaction::system_new(&mint.keypair(), keypair1.pubkey(), 1, bank.last_id()); + assert_eq!(bank.process_transaction(&tx), Ok(())); + let tx = Transaction::system_new(&mint.keypair(), keypair2.pubkey(), 1, bank.last_id()); + assert_eq!(bank.process_transaction(&tx), Ok(())); + + // ensure bank can process 2 entries that do not have a common account and no tick is registered + let last_id = bank.last_id(); + let tx = Transaction::system_new(&keypair1, keypair3.pubkey(), 1, bank.last_id()); + let entry_1 = next_entry(&last_id, 1, vec![tx]); + let tx = Transaction::system_new(&keypair2, keypair4.pubkey(), 1, bank.last_id()); + let entry_2 = next_entry(&entry_1.id, 1, vec![tx]); + assert_eq!(bank.par_process_entries(&[entry_1, entry_2]), Ok(())); + assert_eq!(bank.get_balance(&keypair3.pubkey()), 1); + assert_eq!(bank.get_balance(&keypair4.pubkey()), 1); + assert_eq!(bank.last_id(), last_id); + } + #[test] + fn test_par_process_entries_2_entries_tick() { + let mint = Mint::new(1000); + let bank = Bank::new(&mint); + let keypair1 = Keypair::new(); + let keypair2 = Keypair::new(); + let keypair3 = Keypair::new(); + let keypair4 = Keypair::new(); + + //load accounts + let tx = Transaction::system_new(&mint.keypair(), keypair1.pubkey(), 1, bank.last_id()); + assert_eq!(bank.process_transaction(&tx), Ok(())); + let tx = Transaction::system_new(&mint.keypair(), keypair2.pubkey(), 1, bank.last_id()); + assert_eq!(bank.process_transaction(&tx), Ok(())); + + let last_id = bank.last_id(); + + // ensure bank can process 2 entries that do not have a common account and tick is registered + let tx = Transaction::system_new(&keypair2, keypair3.pubkey(), 1, bank.last_id()); + let entry_1 = next_entry(&last_id, 1, vec![tx]); + let new_tick = next_entry(&entry_1.id, 1, vec![]); + let tx = Transaction::system_new(&keypair1, keypair4.pubkey(), 1, new_tick.id); + let entry_2 = next_entry(&new_tick.id, 1, vec![tx]); + assert_eq!( + bank.par_process_entries(&[entry_1.clone(), new_tick.clone(), entry_2]), + Ok(()) + ); + assert_eq!(bank.get_balance(&keypair3.pubkey()), 1); + assert_eq!(bank.get_balance(&keypair4.pubkey()), 1); + assert_eq!(bank.last_id(), new_tick.id); + // ensure that errors are returned + assert_eq!( + bank.par_process_entries(&[entry_1]), + Err(BankError::AccountNotFound) + ); + } }