Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
Merge pull request #200 from jackson-sandland/153-panic-cleanup
Browse files Browse the repository at this point in the history
issue #153 - panic cleanup
  • Loading branch information
garious authored May 11, 2018
2 parents 7b2eb7c + e779496 commit 7f46aef
Show file tree
Hide file tree
Showing 17 changed files with 219 additions and 99 deletions.
93 changes: 68 additions & 25 deletions src/accountant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use signature::{KeyPair, PublicKey, Signature};
use std::collections::hash_map::Entry::Occupied;
use std::collections::{HashMap, HashSet, VecDeque};
use std::result;
use std::sync::RwLock;
use std::sync::atomic::{AtomicIsize, Ordering};
use std::sync::RwLock;
use transaction::Transaction;

pub const MAX_ENTRY_IDS: usize = 1024 * 4;
Expand All @@ -34,13 +34,17 @@ pub type Result<T> = result::Result<T, AccountingError>;
/// Commit funds to the 'to' party.
fn apply_payment(balances: &RwLock<HashMap<PublicKey, AtomicIsize>>, payment: &Payment) {
// First we check balances with a read lock to maximize potential parallelization.
if balances.read().unwrap().contains_key(&payment.to) {
let bals = balances.read().unwrap();
if balances
.read()
.expect("'balances' read lock in apply_payment")
.contains_key(&payment.to)
{
let bals = balances.read().expect("'balances' read lock");
bals[&payment.to].fetch_add(payment.tokens as isize, Ordering::Relaxed);
} else {
// Now we know the key wasn't present a nanosecond ago, but it might be there
// by the time we aquire a write lock, so we'll have to check again.
let mut bals = balances.write().unwrap();
let mut bals = balances.write().expect("'balances' write lock");
if bals.contains_key(&payment.to) {
bals[&payment.to].fetch_add(payment.tokens as isize, Ordering::Relaxed);
} else {
Expand Down Expand Up @@ -84,27 +88,37 @@ impl Accountant {

/// Return the last entry ID registered
pub fn last_id(&self) -> Hash {
let last_ids = self.last_ids.read().unwrap();
let last_item = last_ids.iter().last().expect("empty last_ids list");
let last_ids = self.last_ids.read().expect("'last_ids' read lock");
let last_item = last_ids.iter().last().expect("empty 'last_ids' list");
last_item.0
}

fn reserve_signature(signatures: &RwLock<HashSet<Signature>>, sig: &Signature) -> bool {
if signatures.read().unwrap().contains(sig) {
if signatures
.read()
.expect("'signatures' read lock")
.contains(sig)
{
return false;
}
signatures.write().unwrap().insert(*sig);
signatures
.write()
.expect("'signatures' write lock")
.insert(*sig);
true
}

fn forget_signature(signatures: &RwLock<HashSet<Signature>>, sig: &Signature) -> bool {
signatures.write().unwrap().remove(sig)
signatures
.write()
.expect("'signatures' write lock in forget_signature")
.remove(sig)
}

fn forget_signature_with_last_id(&self, sig: &Signature, last_id: &Hash) -> bool {
if let Some(entry) = self.last_ids
.read()
.unwrap()
.expect("'last_ids' read lock in forget_signature_with_last_id")
.iter()
.rev()
.find(|x| x.0 == *last_id)
Expand All @@ -117,7 +131,7 @@ impl Accountant {
fn reserve_signature_with_last_id(&self, sig: &Signature, last_id: &Hash) -> bool {
if let Some(entry) = self.last_ids
.read()
.unwrap()
.expect("'last_ids' read lock in reserve_signature_with_last_id")
.iter()
.rev()
.find(|x| x.0 == *last_id)
Expand All @@ -132,7 +146,9 @@ impl Accountant {
/// the oldest ones once its internal cache is full. Once boot, the
/// accountant will reject transactions using that `last_id`.
pub fn register_entry_id(&self, last_id: &Hash) {
let mut last_ids = self.last_ids.write().unwrap();
let mut last_ids = self.last_ids
.write()
.expect("'last_ids' write lock in register_entry_id");
if last_ids.len() >= MAX_ENTRY_IDS {
last_ids.pop_front();
}
Expand All @@ -142,7 +158,9 @@ impl Accountant {
/// Deduct tokens from the 'from' address the account has sufficient
/// funds and isn't a duplicate.
pub fn process_verified_transaction_debits(&self, tr: &Transaction) -> Result<()> {
let bals = self.balances.read().unwrap();
let bals = self.balances
.read()
.expect("'balances' read lock in process_verified_transaction_debits");
let option = bals.get(&tr.from);

if option.is_none() {
Expand All @@ -154,7 +172,7 @@ impl Accountant {
}

loop {
let bal = option.unwrap();
let bal = option.expect("assignment of option to bal");
let current = bal.load(Ordering::Relaxed) as i64;

if current < tr.data.tokens {
Expand All @@ -178,12 +196,16 @@ impl Accountant {

pub fn process_verified_transaction_credits(&self, tr: &Transaction) {
let mut plan = tr.data.plan.clone();
plan.apply_witness(&Witness::Timestamp(*self.last_time.read().unwrap()));
plan.apply_witness(&Witness::Timestamp(*self.last_time
.read()
.expect("timestamp creation in process_verified_transaction_credits")));

if let Some(ref payment) = plan.final_payment() {
apply_payment(&self.balances, payment);
} else {
let mut pending = self.pending.write().unwrap();
let mut pending = self.pending
.write()
.expect("'pending' write lock in process_verified_transaction_credits");
pending.insert(tr.sig, plan);
}
}
Expand Down Expand Up @@ -252,7 +274,11 @@ impl Accountant {

/// Process a Witness Signature that has already been verified.
fn process_verified_sig(&self, from: PublicKey, tx_sig: Signature) -> Result<()> {
if let Occupied(mut e) = self.pending.write().unwrap().entry(tx_sig) {
if let Occupied(mut e) = self.pending
.write()
.expect("write() in process_verified_sig")
.entry(tx_sig)
{
e.get_mut().apply_witness(&Witness::Signature(from));
if let Some(ref payment) = e.get().final_payment() {
apply_payment(&self.balances, payment);
Expand All @@ -267,13 +293,24 @@ impl Accountant {
fn process_verified_timestamp(&self, from: PublicKey, dt: DateTime<Utc>) -> Result<()> {
// If this is the first timestamp we've seen, it probably came from the genesis block,
// so we'll trust it.
if *self.last_time.read().unwrap() == Utc.timestamp(0, 0) {
self.time_sources.write().unwrap().insert(from);
if *self.last_time
.read()
.expect("'last_time' read lock on first timestamp check")
== Utc.timestamp(0, 0)
{
self.time_sources
.write()
.expect("'time_sources' write lock on first timestamp")
.insert(from);
}

if self.time_sources.read().unwrap().contains(&from) {
if dt > *self.last_time.read().unwrap() {
*self.last_time.write().unwrap() = dt;
if self.time_sources
.read()
.expect("'time_sources' read lock")
.contains(&from)
{
if dt > *self.last_time.read().expect("'last_time' read lock") {
*self.last_time.write().expect("'last_time' write lock") = dt;
}
} else {
return Ok(());
Expand All @@ -284,9 +321,13 @@ impl Accountant {

// Hold 'pending' write lock until the end of this function. Otherwise another thread can
// double-spend if it enters before the modified plan is removed from 'pending'.
let mut pending = self.pending.write().unwrap();
let mut pending = self.pending
.write()
.expect("'pending' write lock in process_verified_timestamp");
for (key, plan) in pending.iter_mut() {
plan.apply_witness(&Witness::Timestamp(*self.last_time.read().unwrap()));
plan.apply_witness(&Witness::Timestamp(*self.last_time
.read()
.expect("'last_time' read lock when creating timestamp")));
if let Some(ref payment) = plan.final_payment() {
apply_payment(&self.balances, payment);
completed.push(key.clone());
Expand Down Expand Up @@ -341,7 +382,9 @@ impl Accountant {
}

pub fn get_balance(&self, pubkey: &PublicKey) -> Option<i64> {
let bals = self.balances.read().unwrap();
let bals = self.balances
.read()
.expect("'balances' read lock in get_balance");
bals.get(pubkey).map(|x| x.load(Ordering::Relaxed) as i64)
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/bin/testnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use std::env;
use std::io::{stdin, stdout, Read};
use std::net::UdpSocket;
use std::process::exit;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;

fn print_usage(program: &str, opts: Options) {
let mut brief = format!("Usage: cat <transaction.log> | {} [options]\n\n", program);
Expand Down
58 changes: 41 additions & 17 deletions src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl Crdt {
) -> Result<()> {
let (me, table): (ReplicatedData, Vec<ReplicatedData>) = {
// copy to avoid locking durring IO
let robj = obj.read().unwrap();
let robj = obj.read().expect("'obj' read lock in pub fn broadcast");
let cloned_table: Vec<ReplicatedData> = robj.table.values().cloned().collect();
(robj.table[&robj.me].clone(), cloned_table)
};
Expand All @@ -194,10 +194,10 @@ impl Crdt {
.map(|((i, v), b)| {
// only leader should be broadcasting
assert!(me.current_leader_id != v.id);
let mut blob = b.write().unwrap();
blob.set_id(me.id).expect("set_id");
let mut blob = b.write().expect("'b' write lock in pub fn broadcast");
blob.set_id(me.id).expect("set_id in pub fn broadcast");
blob.set_index(*transmit_index + i as u64)
.expect("set_index");
.expect("set_index in pub fn broadcast");
//TODO profile this, may need multiple sockets for par_iter
s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr)
})
Expand All @@ -219,10 +219,10 @@ impl Crdt {
pub fn retransmit(obj: &Arc<RwLock<Self>>, blob: &SharedBlob, s: &UdpSocket) -> Result<()> {
let (me, table): (ReplicatedData, Vec<ReplicatedData>) = {
// copy to avoid locking durring IO
let s = obj.read().unwrap();
let s = obj.read().expect("'obj' read lock in pub fn retransmit");
(s.table[&s.me].clone(), s.table.values().cloned().collect())
};
let rblob = blob.read().unwrap();
let rblob = blob.read().expect("'blob' read lock in pub fn retransmit");
let daddr = "0.0.0.0:0".parse().unwrap();
let orders: Vec<_> = table
.iter()
Expand Down Expand Up @@ -261,9 +261,10 @@ impl Crdt {
fn random() -> u64 {
let rnd = SystemRandom::new();
let mut buf = [0u8; 8];
rnd.fill(&mut buf).unwrap();
rnd.fill(&mut buf).expect("rnd.fill in pub fn random");
let mut rdr = Cursor::new(&buf);
rdr.read_u64::<LittleEndian>().unwrap()
rdr.read_u64::<LittleEndian>()
.expect("rdr.read_u64 in fn random")
}
fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec<ReplicatedData>) {
//trace!("get updates since {}", v);
Expand All @@ -287,10 +288,19 @@ impl Crdt {
return Err(Error::GeneralError);
}
let mut n = (Self::random() as usize) % self.table.len();
while self.table.values().nth(n).unwrap().id == self.me {
while self.table
.values()
.nth(n)
.expect("'values().nth(n)' while loop in fn gossip_request")
.id == self.me
{
n = (Self::random() as usize) % self.table.len();
}
let v = self.table.values().nth(n).unwrap().clone();
let v = self.table
.values()
.nth(n)
.expect("'values().nth(n)' in fn gossip_request")
.clone();
let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0);
let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone());
Ok((v.gossip_addr, req))
Expand All @@ -303,7 +313,9 @@ impl Crdt {

// Lock the object only to do this operation and not for any longer
// especially not when doing the `sock.send_to`
let (remote_gossip_addr, req) = obj.read().unwrap().gossip_request()?;
let (remote_gossip_addr, req) = obj.read()
.expect("'obj' read lock in fn run_gossip")
.gossip_request()?;
let sock = UdpSocket::bind("0.0.0.0:0")?;
// TODO this will get chatty, so we need to first ask for number of updates since
// then only ask for specific data that we dont have
Expand Down Expand Up @@ -335,7 +347,11 @@ impl Crdt {
return;
}
//TODO this should be a tuned parameter
sleep(obj.read().unwrap().timeout);
sleep(
obj.read()
.expect("'obj' read lock in pub fn gossip")
.timeout,
);
})
}

Expand All @@ -353,18 +369,25 @@ impl Crdt {
trace!("RequestUpdates {}", v);
let addr = reqdata.gossip_addr;
// only lock for this call, dont lock durring IO `sock.send_to` or `sock.recv_from`
let (from, ups, data) = obj.read().unwrap().get_updates_since(v);
let (from, ups, data) = obj.read()
.expect("'obj' read lock in RequestUpdates")
.get_updates_since(v);
trace!("get updates since response {} {}", v, data.len());
let rsp = serialize(&Protocol::ReceiveUpdates(from, ups, data))?;
trace!("send_to {}", addr);
//TODO verify reqdata belongs to sender
obj.write().unwrap().insert(reqdata);
sock.send_to(&rsp, addr).unwrap();
obj.write()
.expect("'obj' write lock in RequestUpdates")
.insert(reqdata);
sock.send_to(&rsp, addr)
.expect("'sock.send_to' in RequestUpdates");
trace!("send_to done!");
}
Protocol::ReceiveUpdates(from, ups, data) => {
trace!("ReceivedUpdates");
obj.write().unwrap().apply_updates(from, ups, &data);
obj.write()
.expect("'obj' write lock in ReceiveUpdates")
.apply_updates(from, ups, &data);
}
}
Ok(())
Expand All @@ -374,7 +397,8 @@ impl Crdt {
sock: UdpSocket,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
sock.set_read_timeout(Some(Duration::new(2, 0))).unwrap();
sock.set_read_timeout(Some(Duration::new(2, 0)))
.expect("'sock.set_read_timeout' in crdt.rs");
spawn(move || loop {
let _ = Self::run_listen(&obj, &sock);
if exit.load(Ordering::Relaxed) {
Expand Down
10 changes: 7 additions & 3 deletions src/ecdsa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
.into_par_iter()
.map(|p| {
p.read()
.unwrap()
.expect("'p' read lock in ed25519_verify")
.packets
.par_iter()
.map(verify_packet)
Expand All @@ -78,7 +78,11 @@ pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
let mut rvs = Vec::new();

for packets in batches {
locks.push(packets.read().unwrap());
locks.push(
packets
.read()
.expect("'packets' read lock in pub fn ed25519_verify"),
);
}
let mut num = 0;
for p in locks {
Expand Down Expand Up @@ -135,8 +139,8 @@ mod tests {
use packet::{Packet, Packets, SharedPackets};
use std::sync::RwLock;
use thin_client_service::Request;
use transaction::Transaction;
use transaction::test_tx;
use transaction::Transaction;

fn make_packet_from_transaction(tr: Transaction) -> Packet {
let tx = serialize(&Request::Transaction(tr)).unwrap();
Expand Down
Loading

0 comments on commit 7f46aef

Please sign in to comment.