Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Parallelize accountant #106

Merged
merged 6 commits into from
Apr 5, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 47 additions & 42 deletions src/accountant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use signature::{KeyPair, PublicKey, Signature};
use std::collections::hash_map::Entry::Occupied;
use std::collections::{HashMap, HashSet};
use std::result;
use std::sync::RwLock;
use transaction::Transaction;

#[derive(Debug, PartialEq, Eq)]
Expand All @@ -28,11 +29,11 @@ fn apply_payment(balances: &mut HashMap<PublicKey, i64>, payment: &Payment) {
}

pub struct Accountant {
balances: HashMap<PublicKey, i64>,
pending: HashMap<Signature, Plan>,
signatures: HashSet<Signature>,
time_sources: HashSet<PublicKey>,
last_time: DateTime<Utc>,
balances: RwLock<HashMap<PublicKey, i64>>,
pending: RwLock<HashMap<Signature, Plan>>,
signatures: RwLock<HashSet<Signature>>,
time_sources: RwLock<HashSet<PublicKey>>,
last_time: RwLock<DateTime<Utc>>,
}

impl Accountant {
Expand All @@ -41,11 +42,11 @@ impl Accountant {
let mut balances = HashMap::new();
apply_payment(&mut balances, deposit);
Accountant {
balances,
pending: HashMap::new(),
signatures: HashSet::new(),
time_sources: HashSet::new(),
last_time: Utc.timestamp(0, 0),
balances: RwLock::new(balances),
pending: RwLock::new(HashMap::new()),
signatures: RwLock::new(HashSet::new()),
time_sources: RwLock::new(HashSet::new()),
last_time: RwLock::new(Utc.timestamp(0, 0)),
}
}

Expand All @@ -58,16 +59,16 @@ impl Accountant {
Self::new_from_deposit(&deposit)
}

fn reserve_signature(&mut self, sig: &Signature) -> bool {
if self.signatures.contains(sig) {
fn reserve_signature(&self, sig: &Signature) -> bool {
if self.signatures.read().unwrap().contains(sig) {
return false;
}
self.signatures.insert(*sig);
self.signatures.write().unwrap().insert(*sig);
true
}

/// Process a Transaction that has already been verified.
pub fn process_verified_transaction(&mut self, tr: &Transaction) -> Result<()> {
pub fn process_verified_transaction(&self, tr: &Transaction) -> Result<()> {
if self.get_balance(&tr.from).unwrap_or(0) < tr.tokens {
return Err(AccountingError::InsufficientFunds);
}
Expand All @@ -76,28 +77,28 @@ impl Accountant {
return Err(AccountingError::InvalidTransferSignature);
}

if let Some(x) = self.balances.get_mut(&tr.from) {
if let Some(x) = self.balances.write().unwrap().get_mut(&tr.from) {
*x -= tr.tokens;
}

let mut plan = tr.plan.clone();
plan.apply_witness(&Witness::Timestamp(self.last_time));
plan.apply_witness(&Witness::Timestamp(*self.last_time.read().unwrap()));

if let Some(ref payment) = plan.final_payment() {
apply_payment(&mut self.balances, payment);
apply_payment(&mut self.balances.write().unwrap(), payment);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will it try to get write lock for balances twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in the case the transaction requires no witnesses, it'll lock balances for the from cell and again for the to cell. Once we switch to per cell locks, it'll be two separate locks anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, a subtlety of Rust, by not putting balances into variable, it's only in scope while the righthand side of that expression is evaluated. If I put the first instance into a variable, then the second would immediately cause a panic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok cool. yea it's not super clear.

} else {
self.pending.insert(tr.sig, plan);
self.pending.write().unwrap().insert(tr.sig, plan);
}

Ok(())
}

/// Process a Witness Signature that has already been verified.
fn process_verified_sig(&mut self, from: PublicKey, tx_sig: Signature) -> Result<()> {
if let Occupied(mut e) = self.pending.entry(tx_sig) {
fn process_verified_sig(&self, from: PublicKey, tx_sig: Signature) -> Result<()> {
if let Occupied(mut e) = self.pending.write().unwrap().entry(tx_sig) {
e.get_mut().apply_witness(&Witness::Signature(from));
if let Some(ref payment) = e.get().final_payment() {
apply_payment(&mut self.balances, payment);
apply_payment(&mut self.balances.write().unwrap(), payment);
e.remove_entry();
}
};
Expand All @@ -106,40 +107,44 @@ impl Accountant {
}

/// Process a Witness Timestamp that has already been verified.
fn process_verified_timestamp(&mut self, from: PublicKey, dt: DateTime<Utc>) -> Result<()> {
fn process_verified_timestamp(&self, from: PublicKey, dt: DateTime<Utc>) -> Result<()> {
// If this is the first timestamp we've seen, it probably came from the genesis block,
// so we'll trust it.
if self.last_time == Utc.timestamp(0, 0) {
self.time_sources.insert(from);
if *self.last_time.read().unwrap() == Utc.timestamp(0, 0) {
self.time_sources.write().unwrap().insert(from);
}

if self.time_sources.contains(&from) {
if dt > self.last_time {
self.last_time = dt;
if self.time_sources.read().unwrap().contains(&from) {
if dt > *self.last_time.read().unwrap() {
*self.last_time.write().unwrap() = dt;
}
} else {
return Ok(());
}

// Check to see if any timelocked transactions can be completed.
let mut completed = vec![];
for (key, plan) in &mut self.pending {
plan.apply_witness(&Witness::Timestamp(self.last_time));

// Hold 'pending' write lock until the end of this function. Otherwise another thread can
// double-spend if it enters before the modified plan is removed from 'pending'.
let mut pending = self.pending.write().unwrap();
for (key, plan) in pending.iter_mut() {
plan.apply_witness(&Witness::Timestamp(*self.last_time.read().unwrap()));
if let Some(ref payment) = plan.final_payment() {
apply_payment(&mut self.balances, payment);
apply_payment(&mut self.balances.write().unwrap(), payment);
completed.push(key.clone());
}
}

for key in completed {
self.pending.remove(&key);
pending.remove(&key);
}

Ok(())
}

/// Process an Transaction or Witness that has already been verified.
pub fn process_verified_event(&mut self, event: &Event) -> Result<()> {
pub fn process_verified_event(&self, event: &Event) -> Result<()> {
match *event {
Event::Transaction(ref tr) => self.process_verified_transaction(tr),
Event::Signature { from, tx_sig, .. } => self.process_verified_sig(from, tx_sig),
Expand All @@ -150,7 +155,7 @@ impl Accountant {
/// Create, sign, and process a Transaction from `keypair` to `to` of
/// `n` tokens where `last_id` is the last Entry ID observed by the client.
pub fn transfer(
&mut self,
&self,
n: i64,
keypair: &KeyPair,
to: PublicKey,
Expand All @@ -165,7 +170,7 @@ impl Accountant {
/// to `to` of `n` tokens on `dt` where `last_id` is the last Entry ID
/// observed by the client.
pub fn transfer_on_date(
&mut self,
&self,
n: i64,
keypair: &KeyPair,
to: PublicKey,
Expand All @@ -178,7 +183,7 @@ impl Accountant {
}

pub fn get_balance(&self, pubkey: &PublicKey) -> Option<i64> {
self.balances.get(pubkey).cloned()
self.balances.read().unwrap().get(pubkey).cloned()
}
}

Expand All @@ -191,7 +196,7 @@ mod tests {
fn test_accountant() {
let alice = Mint::new(10_000);
let bob_pubkey = KeyPair::new().pubkey();
let mut acc = Accountant::new(&alice);
let acc = Accountant::new(&alice);
acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id())
.unwrap();
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_000);
Expand All @@ -204,7 +209,7 @@ mod tests {
#[test]
fn test_invalid_transfer() {
let alice = Mint::new(11_000);
let mut acc = Accountant::new(&alice);
let acc = Accountant::new(&alice);
let bob_pubkey = KeyPair::new().pubkey();
acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id())
.unwrap();
Expand All @@ -221,7 +226,7 @@ mod tests {
#[test]
fn test_transfer_to_newb() {
let alice = Mint::new(10_000);
let mut acc = Accountant::new(&alice);
let acc = Accountant::new(&alice);
let alice_keypair = alice.keypair();
let bob_pubkey = KeyPair::new().pubkey();
acc.transfer(500, &alice_keypair, bob_pubkey, alice.last_id())
Expand All @@ -232,7 +237,7 @@ mod tests {
#[test]
fn test_transfer_on_date() {
let alice = Mint::new(1);
let mut acc = Accountant::new(&alice);
let acc = Accountant::new(&alice);
let alice_keypair = alice.keypair();
let bob_pubkey = KeyPair::new().pubkey();
let dt = Utc::now();
Expand All @@ -258,7 +263,7 @@ mod tests {
#[test]
fn test_transfer_after_date() {
let alice = Mint::new(1);
let mut acc = Accountant::new(&alice);
let acc = Accountant::new(&alice);
let alice_keypair = alice.keypair();
let bob_pubkey = KeyPair::new().pubkey();
let dt = Utc::now();
Expand All @@ -275,7 +280,7 @@ mod tests {
#[test]
fn test_cancel_transfer() {
let alice = Mint::new(1);
let mut acc = Accountant::new(&alice);
let acc = Accountant::new(&alice);
let alice_keypair = alice.keypair();
let bob_pubkey = KeyPair::new().pubkey();
let dt = Utc::now();
Expand All @@ -301,7 +306,7 @@ mod tests {
#[test]
fn test_duplicate_event_signature() {
let alice = Mint::new(1);
let mut acc = Accountant::new(&alice);
let acc = Accountant::new(&alice);
let sig = Signature::default();
assert!(acc.reserve_signature(&sig));
assert!(!acc.reserve_signature(&sig));
Expand Down
2 changes: 1 addition & 1 deletion src/bin/testnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fn main() {
None
};

let mut acc = Accountant::new_from_deposit(&deposit.unwrap());
let acc = Accountant::new_from_deposit(&deposit.unwrap());

let mut last_id = entry1.id;
for entry in entries {
Expand Down