Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable parallelized accountant #117

Merged
merged 2 commits into from
Apr 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions src/accountant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::result;
use std::sync::RwLock;
use transaction::Transaction;

const MAX_ENTRY_IDS: usize = 1024 * 4;
pub const MAX_ENTRY_IDS: usize = 1024 * 4;

#[derive(Debug, PartialEq, Eq)]
pub enum AccountingError {
Expand Down Expand Up @@ -152,12 +152,17 @@ impl Accountant {
}

/// Process a batch of verified transactions.
pub fn process_verified_transactions(&self, trs: &[Transaction]) -> Vec<Result<()>> {
pub fn process_verified_transactions(&self, trs: Vec<Transaction>) -> Vec<Result<Transaction>> {
// Run all debits first to filter out any transactions that can't be processed
// in parallel deterministically.
trs.par_iter()
.map(|tr| self.process_verified_transaction_debits(tr).map(|_| tr))
.map(|result| result.map(|tr| self.process_verified_transaction_credits(tr)))
trs.into_par_iter()
.map(|tr| self.process_verified_transaction_debits(&tr).map(|_| tr))
.map(|result| {
result.map(|tr| {
self.process_verified_transaction_credits(&tr);
tr
})
})
.collect()
}

Expand Down Expand Up @@ -448,7 +453,7 @@ mod bench {
}

assert!(
acc.process_verified_transactions(&transactions)
acc.process_verified_transactions(transactions.clone())
.iter()
.all(|x| x.is_ok())
);
Expand Down
154 changes: 124 additions & 30 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::collections::VecDeque;
use std::io::Write;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, SendError, Sender};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{spawn, JoinHandle};
use std::time::Duration;
Expand Down Expand Up @@ -82,28 +82,18 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
}

/// Process Request items sent by clients.
pub fn log_verified_request(&mut self, msg: Request, verify: u8) -> Option<Response> {
pub fn process_request(
&mut self,
msg: Request,
rsp_addr: SocketAddr,
) -> Option<(Response, SocketAddr)> {
match msg {
Request::Transaction(_) if verify == 0 => {
trace!("Transaction failed sigverify");
None
}
Request::Transaction(tr) => {
if let Err(err) = self.acc.process_verified_transaction(&tr) {
trace!("Transaction error: {:?}", err);
} else if let Err(SendError(_)) = self.historian
.sender
.send(Signal::Event(Event::Transaction(tr.clone())))
{
error!("Channel send error");
}
None
}
Request::GetBalance { key } => {
let val = self.acc.get_balance(&key);
Some(Response::Balance { key, val })
Some((Response::Balance { key, val }, rsp_addr))
}
Request::GetLastId => Some(Response::LastId { id: self.sync() }),
Request::GetLastId => Some((Response::LastId { id: self.sync() }, rsp_addr)),
Request::Transaction(_) => unreachable!(),
}
}

Expand Down Expand Up @@ -155,18 +145,46 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
.collect()
}

/// Split Request list into verified transactions and the rest
fn partition_requests(
req_vers: Vec<(Request, SocketAddr, u8)>,
) -> (Vec<Transaction>, Vec<(Request, SocketAddr)>) {
let mut trs = vec![];
let mut reqs = vec![];
for (msg, rsp_addr, verify) in req_vers {
match msg {
Request::Transaction(tr) => {
if verify != 0 {
trs.push(tr);
}
}
_ => reqs.push((msg, rsp_addr)),
}
}
(trs, reqs)
}

fn process_packets(
obj: &Arc<Mutex<AccountantSkel<W>>>,
&mut self,
req_vers: Vec<(Request, SocketAddr, u8)>,
) -> Vec<(Response, SocketAddr)> {
req_vers
.into_iter()
.filter_map(|(req, rsp_addr, v)| {
let mut skel = obj.lock().unwrap();
skel.log_verified_request(req, v)
.map(|resp| (resp, rsp_addr))
})
.collect()
) -> Result<Vec<(Response, SocketAddr)>> {
let (trs, reqs) = Self::partition_requests(req_vers);

// Process the transactions in parallel and then log the successful ones.
for result in self.acc.process_verified_transactions(trs) {
if let Ok(tr) = result {
self.historian
.sender
.send(Signal::Event(Event::Transaction(tr)))?;
}
}

// Process the remaining requests serially.
let rsps = reqs.into_iter()
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr))
.collect();

Ok(rsps)
}

fn serialize_response(
Expand Down Expand Up @@ -213,7 +231,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
.filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver)))
.filter(|x| x.0.verify())
.collect();
let rsps = Self::process_packets(obj, req_vers);
let rsps = obj.lock().unwrap().process_packets(req_vers)?;
let blobs = Self::serialize_responses(rsps, blob_recycler)?;
if !blobs.is_empty() {
//don't wake up the other side if there is nothing
Expand Down Expand Up @@ -286,3 +304,79 @@ mod tests {
assert_matches!(memfind(&packet, &tx), Some(ecdsa::TX_OFFSET));
}
}

#[cfg(all(feature = "unstable", test))]
mod bench {
extern crate test;
use self::test::Bencher;
use accountant_skel::*;
use accountant::{Accountant, MAX_ENTRY_IDS};
use signature::{KeyPair, KeyPairUtil};
use mint::Mint;
use transaction::Transaction;
use std::collections::HashSet;
use std::io::sink;
use std::time::Instant;
use bincode::serialize;
use hash::hash;

#[bench]
fn process_packets_bench(_bencher: &mut Bencher) {
let mint = Mint::new(100_000_000);
let acc = Accountant::new(&mint);
let rsp_addr: SocketAddr = "0.0.0.0:0".parse().expect("socket address");
// Create transactions between unrelated parties.
let txs = 100_000;
let last_ids: Mutex<HashSet<Hash>> = Mutex::new(HashSet::new());
let transactions: Vec<_> = (0..txs)
.into_par_iter()
.map(|i| {
// Seed the 'to' account and a cell for its signature.
let dummy_id = i % (MAX_ENTRY_IDS as i32);
let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash
{
let mut last_ids = last_ids.lock().unwrap();
if !last_ids.contains(&last_id) {
last_ids.insert(last_id);
acc.register_entry_id(&last_id);
}
}

// Seed the 'from' account.
let rando0 = KeyPair::new();
let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id);
acc.process_verified_transaction(&tr).unwrap();

let rando1 = KeyPair::new();
let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id);
acc.process_verified_transaction(&tr).unwrap();

// Finally, return a transaction that's unique
Transaction::new(&rando0, rando1.pubkey(), 1, last_id)
})
.collect();

let req_vers = transactions
.into_iter()
.map(|tr| (Request::Transaction(tr), rsp_addr, 1_u8))
.collect();

let historian = Historian::new(&mint.last_id(), None);
let mut skel = AccountantSkel::new(acc, mint.last_id(), sink(), historian);

let now = Instant::now();
assert!(skel.process_packets(req_vers).is_ok());
let duration = now.elapsed();
let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
let tps = txs as f64 / sec;

// Ensure that all transactions were successfully logged.
skel.historian.sender.send(Signal::Tick).unwrap();
drop(skel.historian.sender);
let entries: Vec<Entry> = skel.historian.receiver.iter().collect();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].events.len(), txs as usize);

println!("{} tps", tps);
}
}