Skip to content

Commit

Permalink
improved error handling and atomic transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
rlkelly committed May 1, 2018
1 parent 77dd1bd commit 00cc982
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 16 deletions.
49 changes: 36 additions & 13 deletions src/accountant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use rayon::prelude::*;
use signature::{KeyPair, PublicKey, Signature};
use std::collections::hash_map::Entry::Occupied;
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::atomic::{AtomicIsize, Ordering};
use std::result;
use std::sync::RwLock;
use transaction::Transaction;
Expand All @@ -23,25 +24,26 @@ pub const MAX_ENTRY_IDS: usize = 1024 * 4;
#[derive(Debug, PartialEq, Eq)]
pub enum AccountingError {
AccountNotFound,
BalanceUpdatedBeforeTransactionCompleted,
InsufficientFunds,
InvalidTransferSignature,
}

pub type Result<T> = result::Result<T, AccountingError>;

/// Commit funds to the 'to' party.
fn apply_payment(balances: &RwLock<HashMap<PublicKey, RwLock<i64>>>, payment: &Payment) {
fn apply_payment(balances: &RwLock<HashMap<PublicKey, AtomicIsize>>, payment: &Payment) {
if balances.read().unwrap().contains_key(&payment.to) {
let bals = balances.read().unwrap();
*bals[&payment.to].write().unwrap() += payment.tokens;
bals[&payment.to].fetch_add(payment.tokens as isize, Ordering::Relaxed);
} else {
let mut bals = balances.write().unwrap();
bals.insert(payment.to, RwLock::new(payment.tokens));
bals.insert(payment.to, AtomicIsize::new(payment.tokens as isize));
}
}

pub struct Accountant {
balances: RwLock<HashMap<PublicKey, RwLock<i64>>>,
balances: RwLock<HashMap<PublicKey, AtomicIsize>>,
pending: RwLock<HashMap<Signature, Plan>>,
last_ids: RwLock<VecDeque<(Hash, RwLock<HashSet<Signature>>)>>,
time_sources: RwLock<HashSet<PublicKey>>,
Expand Down Expand Up @@ -131,23 +133,34 @@ impl Accountant {
// Hold a write lock before the condition check, so that a debit can't occur
// between checking the balance and the withdraw.
let option = bals.get(&tr.from);

if option.is_none() {
return Err(AccountingError::AccountNotFound);
}
let mut bal = option.unwrap().write().unwrap();

if !self.reserve_signature_with_last_id(&tr.sig, &tr.data.last_id) {
return Err(AccountingError::InvalidTransferSignature);
}

if *bal < tr.data.tokens {
let bal = option.unwrap();
let current = bal.load(Ordering::Relaxed) as i64;

if current < tr.data.tokens {
self.forget_signature_with_last_id(&tr.sig, &tr.data.last_id);
return Err(AccountingError::InsufficientFunds);
}

*bal -= tr.data.tokens;
let result = bal.compare_exchange(
current as isize,
(current - tr.data.tokens) as isize,
Ordering::Relaxed,
Ordering::Relaxed,
);

Ok(())
return match result {
Ok(_) => Ok(()),
Err(_) => Err(AccountingError::BalanceUpdatedBeforeTransactionCompleted),
}
}

pub fn process_verified_transaction_credits(&self, tr: &Transaction) {
Expand All @@ -164,17 +177,27 @@ impl Accountant {

/// Process a Transaction that has already been verified.
pub fn process_verified_transaction(&self, tr: &Transaction) -> Result<()> {
self.process_verified_transaction_debits(tr)?;
self.process_verified_transaction_credits(tr);
Ok(())
return match self.process_verified_transaction_debits(tr) {
Ok(_) => {
self.process_verified_transaction_credits(tr);
Ok(())
},
Err(err) => {
Err(err)
}
};
}

/// Process a batch of verified transactions.
pub fn process_verified_transactions(&self, trs: Vec<Transaction>) -> Vec<Result<Transaction>> {
// Run all debits first to filter out any transactions that can't be processed
// in parallel deterministically.
let results: Vec<_> = trs.into_par_iter()
.map(|tr| self.process_verified_transaction_debits(&tr).map(|_| tr))
.filter_map(|tr| match self.process_verified_transaction_debits(&tr) {
Ok(_x) => Some(Ok(tr)),
Err(_e) => None,
})
// .flat_map(|tr| self.process_verified_transaction_debits(&tr).map(|_| tr))
.collect(); // Calling collect() here forces all debits to complete before moving on.

results
Expand Down Expand Up @@ -300,7 +323,7 @@ impl Accountant {

pub fn get_balance(&self, pubkey: &PublicKey) -> Option<i64> {
let bals = self.balances.read().unwrap();
bals.get(pubkey).map(|x| *x.read().unwrap())
bals.get(pubkey).map(|x| x.load(Ordering::Relaxed) as i64)
}
}

Expand Down
13 changes: 10 additions & 3 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,7 @@ mod bench {
// Create transactions between unrelated parties.
let txs = 100_000;
let last_ids: Mutex<HashSet<Hash>> = Mutex::new(HashSet::new());
let errors: Mutex<usize> = Mutex::new(0);
let transactions: Vec<_> = (0..txs)
.into_par_iter()
.map(|i| {
Expand All @@ -774,11 +775,17 @@ mod bench {
// Seed the 'from' account.
let rando0 = KeyPair::new();
let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id);
acc.process_verified_transaction(&tr).unwrap();
// some of these will fail because balance updates before transaction completes
match acc.process_verified_transaction(&tr) {
Ok(_) => (),
Err(_) => *errors.lock().unwrap() += 1,
};

let rando1 = KeyPair::new();
let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id);
acc.process_verified_transaction(&tr).unwrap();
// these will fail if the prior transaction does not go through
// but won't typically fail otherwise since the addresses are randomly generated
let _ = acc.process_verified_transaction(&tr);

// Finally, return a transaction that's unique
Transaction::new(&rando0, rando1.pubkey(), 1, last_id)
Expand All @@ -803,7 +810,7 @@ mod bench {
drop(skel.historian.sender);
let entries: Vec<Entry> = skel.historian.receiver.iter().collect();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].events.len(), txs as usize);
assert_eq!(entries[0].events.len() + *errors.lock().unwrap(), txs as usize);

println!("{} tps", tps);
}
Expand Down

0 comments on commit 00cc982

Please sign in to comment.