Skip to content

Commit

Permalink
Merge pull request #125 from garious/fix-parallelized-ledger
Browse files Browse the repository at this point in the history
Tell verifiers when not to parallelize accounting
  • Loading branch information
garious authored Apr 13, 2018
2 parents 9b12a79 + 7fc42de commit 83c5b3b
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 6 deletions.
38 changes: 37 additions & 1 deletion src/accountant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,12 @@ impl Accountant {
pub fn process_verified_transactions(&self, trs: Vec<Transaction>) -> Vec<Result<Transaction>> {
// Run all debits first to filter out any transactions that can't be processed
// in parallel deterministically.
trs.into_par_iter()
let results: Vec<_> = trs.into_par_iter()
.map(|tr| self.process_verified_transaction_debits(&tr).map(|_| tr))
.collect(); // Calling collect() here forces all debits to complete before moving on.

results
.into_par_iter()
.map(|result| {
result.map(|tr| {
self.process_verified_transaction_credits(&tr);
Expand All @@ -166,6 +170,27 @@ impl Accountant {
.collect()
}

fn partition_events(events: Vec<Event>) -> (Vec<Transaction>, Vec<Event>) {
let mut trs = vec![];
let mut rest = vec![];
for event in events {
match event {
Event::Transaction(tr) => trs.push(tr),
_ => rest.push(event),
}
}
(trs, rest)
}

pub fn process_verified_events(&self, events: Vec<Event>) -> Result<()> {
let (trs, rest) = Self::partition_events(events);
self.process_verified_transactions(trs);
for event in rest {
self.process_verified_event(&event)?;
}
Ok(())
}

/// Process a Witness Signature that has already been verified.
fn process_verified_sig(&self, from: PublicKey, tx_sig: Signature) -> Result<()> {
if let Occupied(mut e) = self.pending.write().unwrap().entry(tx_sig) {
Expand Down Expand Up @@ -410,6 +435,17 @@ mod tests {
// Assert we're no longer able to use the oldest entry ID.
assert!(!acc.reserve_signature_with_last_id(&sig, &alice.last_id()));
}

#[test]
fn test_debits_before_credits() {
let mint = Mint::new(2);
let acc = Accountant::new(&mint);
let alice = KeyPair::new();
let tr0 = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
let tr1 = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
let trs = vec![tr0, tr1];
assert!(acc.process_verified_transactions(trs)[1].is_err());
}
}

#[cfg(all(feature = "unstable", test))]
Expand Down
46 changes: 44 additions & 2 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
}
}

// Let validators know they should not attempt to process additional
// transactions in parallel.
self.historian.sender.send(Signal::Tick)?;

// Process the remaining requests serially.
let rsps = reqs.into_iter()
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr))
Expand Down Expand Up @@ -321,12 +325,14 @@ mod tests {
use accountant::Accountant;
use accountant_skel::AccountantSkel;
use accountant_stub::AccountantStub;
use entry::Entry;
use historian::Historian;
use mint::Mint;
use plan::Plan;
use recorder::Signal;
use signature::{KeyPair, KeyPairUtil};
use std::io::sink;
use std::net::UdpSocket;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::thread::sleep;
Expand Down Expand Up @@ -359,6 +365,43 @@ mod tests {
assert_eq!(rv[1].read().unwrap().packets.len(), 1);
}

#[test]
fn test_accounting_sequential_consistency() {
// In this attack we'll demonstrate that a verifier can interpret the ledger
// differently if either the server doesn't signal the ledger to add an
// Entry OR if the verifier tries to parallelize across multiple Entries.
let mint = Mint::new(2);
let acc = Accountant::new(&mint);
let rsp_addr: SocketAddr = "0.0.0.0:0".parse().expect("socket address");
let historian = Historian::new(&mint.last_id(), None);
let mut skel = AccountantSkel::new(acc, mint.last_id(), sink(), historian);

// Process a batch that includes a transaction that receives two tokens.
let alice = KeyPair::new();
let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
let req_vers = vec![(Request::Transaction(tr), rsp_addr, 1_u8)];
assert!(skel.process_packets(req_vers).is_ok());

// Process a second batch that spends one of those tokens.
let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
let req_vers = vec![(Request::Transaction(tr), rsp_addr, 1_u8)];
assert!(skel.process_packets(req_vers).is_ok());

// Collect the ledger and feed it to a new accountant.
skel.historian.sender.send(Signal::Tick).unwrap();
drop(skel.historian.sender);
let entries: Vec<Entry> = skel.historian.receiver.iter().collect();

// Assert the user holds one token, not two. If the server only output one
// entry, then the second transaction will be rejected, because it drives
// the account balance below zero before the credit is added.
let acc = Accountant::new(&mint);
for entry in entries {
acc.process_verified_events(entry.events).unwrap();
}
assert_eq!(acc.get_balance(&alice.pubkey()), Some(1));
}

#[test]
fn test_accountant_bad_sig() {
let serve_port = 9002;
Expand Down Expand Up @@ -468,7 +511,6 @@ mod bench {
let tps = txs as f64 / sec;

// Ensure that all transactions were successfully logged.
skel.historian.sender.send(Signal::Tick).unwrap();
drop(skel.historian.sender);
let entries: Vec<Entry> = skel.historian.receiver.iter().collect();
assert_eq!(entries.len(), 1);
Expand Down
4 changes: 1 addition & 3 deletions src/bin/testnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ fn main() {
let mut last_id = entry1.id;
for entry in entries {
last_id = entry.id;
for event in entry.events {
acc.process_verified_event(&event).unwrap();
}
acc.process_verified_events(entry.events).unwrap();
}

let historian = Historian::new(&last_id, Some(1000));
Expand Down

0 comments on commit 83c5b3b

Please sign in to comment.