From a2c05b112eae02b047956f9c0d276207bd7875fb Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 14 May 2018 12:43:38 -0600 Subject: [PATCH 1/8] Add historian to pipeline No longer intercept entries to register_entry_id(). Intead, register the ID in the Write stage. EventProcessor is now just being used as a place to store data. Fixes #216 --- src/event_processor.rs | 6 +++++- src/historian.rs | 18 +++++++----------- src/request_processor.rs | 11 +++++------ src/request_stage.rs | 13 +++++-------- src/rpu.rs | 10 ++++++++-- src/tvu.rs | 10 ++++++++-- 6 files changed, 38 insertions(+), 30 deletions(-) diff --git a/src/event_processor.rs b/src/event_processor.rs index 8d3b9cdafefa74..07fc66b10abfc3 100644 --- a/src/event_processor.rs +++ b/src/event_processor.rs @@ -14,6 +14,8 @@ pub struct EventProcessor { pub accountant: Arc, historian_input: Mutex>, historian: Mutex, + pub start_hash: Hash, + pub ms_per_tick: Option, } impl EventProcessor { @@ -25,6 +27,8 @@ impl EventProcessor { accountant: Arc::new(accountant), historian_input: Mutex::new(historian_input), historian: Mutex::new(historian), + start_hash: *start_hash, + ms_per_tick, } } @@ -37,7 +41,7 @@ impl EventProcessor { sender.send(Signal::Events(events))?; // Wait for the historian to tag our Events with an ID and then register it. - let entry = historian.entry_receiver.lock().unwrap().recv()?; + let entry = historian.entry_receiver.recv()?; self.accountant.register_entry_id(&entry.id); Ok(entry) } diff --git a/src/historian.rs b/src/historian.rs index bed9bb8ac070c3..e7234ece56755f 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -4,13 +4,12 @@ use entry::Entry; use hash::Hash; use recorder::{ExitReason, Recorder, Signal}; -use std::sync::Mutex; use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; use std::thread::{spawn, JoinHandle}; use std::time::Instant; pub struct Historian { - pub entry_receiver: Mutex>, + pub entry_receiver: Receiver, pub thread_hdl: JoinHandle, } @@ -24,7 +23,7 @@ impl Historian { let thread_hdl = Historian::create_recorder(*start_hash, ms_per_tick, event_receiver, entry_sender); Historian { - entry_receiver: Mutex::new(entry_receiver), + entry_receiver, thread_hdl, } } @@ -52,10 +51,7 @@ impl Historian { } pub fn receive(self: &Self) -> Result { - self.entry_receiver - .lock() - .expect("'entry_receiver' lock in pub fn receive") - .try_recv() + self.entry_receiver.try_recv() } } @@ -78,9 +74,9 @@ mod tests { sleep(Duration::new(0, 1_000_000)); input.send(Signal::Tick).unwrap(); - let entry0 = hist.entry_receiver.lock().unwrap().recv().unwrap(); - let entry1 = hist.entry_receiver.lock().unwrap().recv().unwrap(); - let entry2 = hist.entry_receiver.lock().unwrap().recv().unwrap(); + let entry0 = hist.entry_receiver.recv().unwrap(); + let entry1 = hist.entry_receiver.recv().unwrap(); + let entry2 = hist.entry_receiver.recv().unwrap(); assert_eq!(entry0.num_hashes, 0); assert_eq!(entry1.num_hashes, 0); @@ -117,7 +113,7 @@ mod tests { sleep(Duration::from_millis(900)); input.send(Signal::Tick).unwrap(); drop(input); - let entries: Vec = hist.entry_receiver.lock().unwrap().iter().collect(); + let entries: Vec = hist.entry_receiver.iter().collect(); assert!(entries.len() > 1); // Ensure the ID is not the seed. diff --git a/src/request_processor.rs b/src/request_processor.rs index 4e6c815edfbc61..47692e1ef6f8de 100644 --- a/src/request_processor.rs +++ b/src/request_processor.rs @@ -2,12 +2,11 @@ use accountant::Accountant; use bincode::{deserialize, serialize}; -use entry::Entry; use event::Event; -use event_processor::EventProcessor; use packet; use packet::SharedPackets; use rayon::prelude::*; +use recorder::Signal; use request::{Request, Response}; use result::Result; use std::collections::VecDeque; @@ -140,9 +139,8 @@ impl RequestProcessor { pub fn process_request_packets( &self, - event_processor: &EventProcessor, verified_receiver: &Receiver)>>, - entry_sender: &Sender, + signal_sender: &Sender, blob_sender: &streamer::BlobSender, packet_recycler: &packet::PacketRecycler, blob_recycler: &packet::BlobRecycler, @@ -176,8 +174,9 @@ impl RequestProcessor { debug!("events: {} reqs: {}", events.len(), reqs.len()); debug!("process_events"); - let entry = event_processor.process_events(events)?; - entry_sender.send(entry)?; + let results = self.accountant.process_verified_events(events); + let events = results.into_iter().filter_map(|x| x.ok()).collect(); + signal_sender.send(Signal::Events(events))?; debug!("done process_events"); debug!("process_requests"); diff --git a/src/request_stage.rs b/src/request_stage.rs index 26ac76d779aac1..2f9935a2f6e278 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -1,9 +1,8 @@ //! The `request_stage` processes thin client Request messages. -use entry::Entry; -use event_processor::EventProcessor; use packet; use packet::SharedPackets; +use recorder::Signal; use request_processor::RequestProcessor; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; @@ -13,7 +12,7 @@ use streamer; pub struct RequestStage { pub thread_hdl: JoinHandle<()>, - pub entry_receiver: Receiver, + pub signal_receiver: Receiver, pub blob_receiver: streamer::BlobReceiver, pub request_processor: Arc, } @@ -21,7 +20,6 @@ pub struct RequestStage { impl RequestStage { pub fn new( request_processor: RequestProcessor, - event_processor: Arc, exit: Arc, verified_receiver: Receiver)>>, packet_recycler: packet::PacketRecycler, @@ -29,13 +27,12 @@ impl RequestStage { ) -> Self { let request_processor = Arc::new(request_processor); let request_processor_ = request_processor.clone(); - let (entry_sender, entry_receiver) = channel(); + let (signal_sender, signal_receiver) = channel(); let (blob_sender, blob_receiver) = channel(); let thread_hdl = spawn(move || loop { let e = request_processor_.process_request_packets( - &event_processor, &verified_receiver, - &entry_sender, + &signal_sender, &blob_sender, &packet_recycler, &blob_recycler, @@ -48,7 +45,7 @@ impl RequestStage { }); RequestStage { thread_hdl, - entry_receiver, + signal_receiver, blob_receiver, request_processor, } diff --git a/src/rpu.rs b/src/rpu.rs index b8954c55426ae6..b5127c16035812 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -6,6 +6,7 @@ use crdt::{Crdt, ReplicatedData}; use entry::Entry; use entry_writer::EntryWriter; use event_processor::EventProcessor; +use historian::Historian; use packet; use request_processor::RequestProcessor; use request_stage::RequestStage; @@ -88,13 +89,18 @@ impl Rpu { let request_processor = RequestProcessor::new(self.event_processor.accountant.clone()); let request_stage = RequestStage::new( request_processor, - self.event_processor.clone(), exit.clone(), sig_verify_stage.verified_receiver, packet_recycler.clone(), blob_recycler.clone(), ); + let historian_stage = Historian::new( + request_stage.signal_receiver, + &self.event_processor.start_hash, + self.event_processor.ms_per_tick, + ); + let (broadcast_sender, broadcast_receiver) = channel(); let t_write = Self::write_service( self.event_processor.accountant.clone(), @@ -102,7 +108,7 @@ impl Rpu { broadcast_sender, blob_recycler.clone(), Mutex::new(writer), - request_stage.entry_receiver, + historian_stage.entry_receiver, ); let broadcast_socket = UdpSocket::bind(local)?; diff --git a/src/tvu.rs b/src/tvu.rs index 8deedbf23cbc07..39e0498d45d364 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -6,6 +6,7 @@ use crdt::{Crdt, ReplicatedData}; use entry::Entry; use entry_writer::EntryWriter; use event_processor::EventProcessor; +use historian::Historian; use ledger; use packet; use request_processor::RequestProcessor; @@ -173,17 +174,22 @@ impl Tvu { let request_processor = RequestProcessor::new(obj.event_processor.accountant.clone()); let request_stage = RequestStage::new( request_processor, - obj.event_processor.clone(), exit.clone(), sig_verify_stage.verified_receiver, packet_recycler.clone(), blob_recycler.clone(), ); + let historian_stage = Historian::new( + request_stage.signal_receiver, + &obj.event_processor.start_hash, + obj.event_processor.ms_per_tick, + ); + let t_write = Self::drain_service( obj.event_processor.accountant.clone(), exit.clone(), - request_stage.entry_receiver, + historian_stage.entry_receiver, ); let t_responder = streamer::responder( From 27984e469a25ab1eadab65631945662949aad105 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 14 May 2018 13:58:42 -0600 Subject: [PATCH 2/8] Multiply duration, not milliseconds --- src/recorder.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/recorder.rs b/src/recorder.rs index 2ec50aea287440..208d62b6ed6225 100644 --- a/src/recorder.rs +++ b/src/recorder.rs @@ -28,7 +28,7 @@ pub struct Recorder { receiver: Receiver, last_hash: Hash, num_hashes: u64, - num_ticks: u64, + num_ticks: u32, } impl Recorder { @@ -62,8 +62,9 @@ impl Recorder { ) -> Result<(), ExitReason> { loop { if let Some(ms) = ms_per_tick { - if epoch.elapsed() > Duration::from_millis((self.num_ticks + 1) * ms) { + if epoch.elapsed() > Duration::from_millis(ms) * (self.num_ticks + 1) { self.record_entry(vec![])?; + // TODO: don't let this overflow u32 self.num_ticks += 1; } } From 3f10bf44db2de96b6204461333e54db30554f6f2 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 14 May 2018 14:12:36 -0600 Subject: [PATCH 3/8] Config recorder with any kind of Duration, not just milliseconds --- src/bin/testnode.rs | 4 +++- src/event_processor.rs | 9 +++++---- src/historian.rs | 17 ++++++++--------- src/recorder.rs | 9 ++++----- src/rpu.rs | 2 +- src/thin_client.rs | 24 ++++++++++++++++++++---- src/tvu.rs | 8 ++++++-- 7 files changed, 47 insertions(+), 26 deletions(-) diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 74056b13b86ddb..b298ca4157b4b5 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -19,6 +19,7 @@ use std::net::UdpSocket; use std::process::exit; use std::sync::Arc; use std::sync::atomic::AtomicBool; +use std::time::Duration; fn print_usage(program: &str, opts: Options) { let mut brief = format!("Usage: cat | {} [options]\n\n", program); @@ -115,7 +116,8 @@ fn main() { eprintln!("creating networking stack..."); - let event_processor = EventProcessor::new(accountant, &last_id, Some(1000)); + let event_processor = + EventProcessor::new(accountant, &last_id, Some(Duration::from_millis(1000))); let exit = Arc::new(AtomicBool::new(false)); let rpu = Rpu::new(event_processor); let serve_sock = UdpSocket::bind(&serve_addr).unwrap(); diff --git a/src/event_processor.rs b/src/event_processor.rs index 07fc66b10abfc3..4f30def1f7fddb 100644 --- a/src/event_processor.rs +++ b/src/event_processor.rs @@ -9,26 +9,27 @@ use recorder::Signal; use result::Result; use std::sync::mpsc::{channel, Sender}; use std::sync::{Arc, Mutex}; +use std::time::Duration; pub struct EventProcessor { pub accountant: Arc, historian_input: Mutex>, historian: Mutex, pub start_hash: Hash, - pub ms_per_tick: Option, + pub tick_duration: Option, } impl EventProcessor { /// Create a new stage of the TPU for event and transaction processing - pub fn new(accountant: Accountant, start_hash: &Hash, ms_per_tick: Option) -> Self { + pub fn new(accountant: Accountant, start_hash: &Hash, tick_duration: Option) -> Self { let (historian_input, event_receiver) = channel(); - let historian = Historian::new(event_receiver, start_hash, ms_per_tick); + let historian = Historian::new(event_receiver, start_hash, tick_duration); EventProcessor { accountant: Arc::new(accountant), historian_input: Mutex::new(historian_input), historian: Mutex::new(historian), start_hash: *start_hash, - ms_per_tick, + tick_duration, } } diff --git a/src/historian.rs b/src/historian.rs index e7234ece56755f..553d3ff6efd3a6 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -6,7 +6,7 @@ use hash::Hash; use recorder::{ExitReason, Recorder, Signal}; use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; use std::thread::{spawn, JoinHandle}; -use std::time::Instant; +use std::time::{Duration, Instant}; pub struct Historian { pub entry_receiver: Receiver, @@ -17,11 +17,11 @@ impl Historian { pub fn new( event_receiver: Receiver, start_hash: &Hash, - ms_per_tick: Option, + tick_duration: Option, ) -> Self { let (entry_sender, entry_receiver) = channel(); let thread_hdl = - Historian::create_recorder(*start_hash, ms_per_tick, event_receiver, entry_sender); + Self::create_recorder(*start_hash, tick_duration, event_receiver, entry_sender); Historian { entry_receiver, thread_hdl, @@ -32,18 +32,18 @@ impl Historian { /// sending back Entry messages until either the receiver or sender channel is closed. fn create_recorder( start_hash: Hash, - ms_per_tick: Option, + tick_duration: Option, receiver: Receiver, sender: Sender, ) -> JoinHandle { spawn(move || { let mut recorder = Recorder::new(receiver, sender, start_hash); - let now = Instant::now(); + let duration_data = tick_duration.map(|dur| (Instant::now(), dur)); loop { - if let Err(err) = recorder.process_events(now, ms_per_tick) { + if let Err(err) = recorder.process_events(duration_data) { return err; } - if ms_per_tick.is_some() { + if duration_data.is_some() { recorder.hash(); } } @@ -60,7 +60,6 @@ mod tests { use super::*; use ledger::Block; use std::thread::sleep; - use std::time::Duration; #[test] fn test_historian() { @@ -109,7 +108,7 @@ mod tests { fn test_ticking_historian() { let (input, event_receiver) = channel(); let zero = Hash::default(); - let hist = Historian::new(event_receiver, &zero, Some(20)); + let hist = Historian::new(event_receiver, &zero, Some(Duration::from_millis(20))); sleep(Duration::from_millis(900)); input.send(Signal::Tick).unwrap(); drop(input); diff --git a/src/recorder.rs b/src/recorder.rs index 208d62b6ed6225..3866747fa2ba1b 100644 --- a/src/recorder.rs +++ b/src/recorder.rs @@ -57,12 +57,11 @@ impl Recorder { pub fn process_events( &mut self, - epoch: Instant, - ms_per_tick: Option, + duration_data: Option<(Instant, Duration)>, ) -> Result<(), ExitReason> { loop { - if let Some(ms) = ms_per_tick { - if epoch.elapsed() > Duration::from_millis(ms) * (self.num_ticks + 1) { + if let Some((start_time, tick_duration)) = duration_data { + if start_time.elapsed() > tick_duration * (self.num_ticks + 1) { self.record_entry(vec![])?; // TODO: don't let this overflow u32 self.num_ticks += 1; @@ -105,7 +104,7 @@ mod tests { signal_sender .send(Signal::Events(vec![event0, event1])) .unwrap(); - recorder.process_events(Instant::now(), None).unwrap(); + recorder.process_events(None).unwrap(); drop(recorder.sender); let entries: Vec<_> = entry_receiver.iter().collect(); diff --git a/src/rpu.rs b/src/rpu.rs index b5127c16035812..0690d6069f8140 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -98,7 +98,7 @@ impl Rpu { let historian_stage = Historian::new( request_stage.signal_receiver, &self.event_processor.start_hash, - self.event_processor.ms_per_tick, + self.event_processor.tick_duration, ); let (broadcast_sender, broadcast_receiver) = channel(); diff --git a/src/thin_client.rs b/src/thin_client.rs index 2bc2908432ba88..87cc37b856e894 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -191,7 +191,11 @@ mod tests { let accountant = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); + let event_processor = EventProcessor::new( + accountant, + &alice.last_id(), + Some(Duration::from_millis(30)), + ); let rpu = Rpu::new(event_processor); let threads = rpu.serve(d, serve, gossip, exit.clone(), sink()).unwrap(); sleep(Duration::from_millis(900)); @@ -230,7 +234,11 @@ mod tests { let accountant = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); + let event_processor = EventProcessor::new( + accountant, + &alice.last_id(), + Some(Duration::from_millis(30)), + ); let rpu = Rpu::new(event_processor); let serve_addr = leader_serve.local_addr().unwrap(); let threads = rpu.serve( @@ -300,13 +308,21 @@ mod tests { let leader_acc = { let accountant = Accountant::new(&alice); - let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); + let event_processor = EventProcessor::new( + accountant, + &alice.last_id(), + Some(Duration::from_millis(30)), + ); Rpu::new(event_processor) }; let replicant_acc = { let accountant = Accountant::new(&alice); - let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); + let event_processor = EventProcessor::new( + accountant, + &alice.last_id(), + Some(Duration::from_millis(30)), + ); Arc::new(Tvu::new(event_processor)) }; diff --git a/src/tvu.rs b/src/tvu.rs index 39e0498d45d364..f05e9c2dc0a4ef 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -183,7 +183,7 @@ impl Tvu { let historian_stage = Historian::new( request_stage.signal_receiver, &obj.event_processor.start_hash, - obj.event_processor.ms_per_tick, + obj.event_processor.tick_duration, ); let t_write = Self::drain_service( @@ -311,7 +311,11 @@ mod tests { let starting_balance = 10_000; let alice = Mint::new(starting_balance); let accountant = Accountant::new(&alice); - let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); + let event_processor = EventProcessor::new( + accountant, + &alice.last_id(), + Some(Duration::from_millis(30)), + ); let tvu = Arc::new(Tvu::new(event_processor)); let replicate_addr = target1_data.replicate_addr; let threads = Tvu::serve( From 17cc9ab07f9594e0a16289734afa6e401553ae14 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 14 May 2018 14:19:11 -0600 Subject: [PATCH 4/8] Rename Historian to RecordStage Historian was a legacy name. The new name reflects the new pipelined architecture. --- src/event_processor.rs | 12 ++++++------ src/lib.rs | 2 +- src/{historian.rs => record_stage.rs} | 28 +++++++++++++-------------- src/rpu.rs | 6 +++--- src/tvu.rs | 6 +++--- 5 files changed, 27 insertions(+), 27 deletions(-) rename src/{historian.rs => record_stage.rs} (79%) diff --git a/src/event_processor.rs b/src/event_processor.rs index 4f30def1f7fddb..3ce63d27b2985a 100644 --- a/src/event_processor.rs +++ b/src/event_processor.rs @@ -4,7 +4,7 @@ use accountant::Accountant; use entry::Entry; use event::Event; use hash::Hash; -use historian::Historian; +use record_stage::RecordStage; use recorder::Signal; use result::Result; use std::sync::mpsc::{channel, Sender}; @@ -14,7 +14,7 @@ use std::time::Duration; pub struct EventProcessor { pub accountant: Arc, historian_input: Mutex>, - historian: Mutex, + record_stage: Mutex, pub start_hash: Hash, pub tick_duration: Option, } @@ -23,11 +23,11 @@ impl EventProcessor { /// Create a new stage of the TPU for event and transaction processing pub fn new(accountant: Accountant, start_hash: &Hash, tick_duration: Option) -> Self { let (historian_input, event_receiver) = channel(); - let historian = Historian::new(event_receiver, start_hash, tick_duration); + let record_stage = RecordStage::new(event_receiver, start_hash, tick_duration); EventProcessor { accountant: Arc::new(accountant), historian_input: Mutex::new(historian_input), - historian: Mutex::new(historian), + record_stage: Mutex::new(record_stage), start_hash: *start_hash, tick_duration, } @@ -35,14 +35,14 @@ impl EventProcessor { /// Process the transactions in parallel and then log the successful ones. pub fn process_events(&self, events: Vec) -> Result { - let historian = self.historian.lock().unwrap(); + let record_stage = self.record_stage.lock().unwrap(); let results = self.accountant.process_verified_events(events); let events = results.into_iter().filter_map(|x| x.ok()).collect(); let sender = self.historian_input.lock().unwrap(); sender.send(Signal::Events(events))?; // Wait for the historian to tag our Events with an ID and then register it. - let entry = historian.entry_receiver.recv()?; + let entry = record_stage.entry_receiver.recv()?; self.accountant.register_entry_id(&entry.id); Ok(entry) } diff --git a/src/lib.rs b/src/lib.rs index 6d50c18e37c452..7099ef43d898cc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,12 +9,12 @@ pub mod erasure; pub mod event; pub mod event_processor; pub mod hash; -pub mod historian; pub mod ledger; pub mod logger; pub mod mint; pub mod packet; pub mod plan; +pub mod record_stage; pub mod recorder; pub mod request; pub mod request_processor; diff --git a/src/historian.rs b/src/record_stage.rs similarity index 79% rename from src/historian.rs rename to src/record_stage.rs index 553d3ff6efd3a6..072983fd097bdb 100644 --- a/src/historian.rs +++ b/src/record_stage.rs @@ -1,4 +1,4 @@ -//! The `historian` module provides a microservice for generating a Proof of History. +//! The `record_stage` implements the Record stage of the TPU. //! It manages a thread containing a Proof of History Recorder. use entry::Entry; @@ -8,12 +8,12 @@ use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; use std::thread::{spawn, JoinHandle}; use std::time::{Duration, Instant}; -pub struct Historian { +pub struct RecordStage { pub entry_receiver: Receiver, pub thread_hdl: JoinHandle, } -impl Historian { +impl RecordStage { pub fn new( event_receiver: Receiver, start_hash: &Hash, @@ -22,7 +22,7 @@ impl Historian { let (entry_sender, entry_receiver) = channel(); let thread_hdl = Self::create_recorder(*start_hash, tick_duration, event_receiver, entry_sender); - Historian { + RecordStage { entry_receiver, thread_hdl, } @@ -65,7 +65,7 @@ mod tests { fn test_historian() { let (input, event_receiver) = channel(); let zero = Hash::default(); - let hist = Historian::new(event_receiver, &zero, None); + let record_stage = RecordStage::new(event_receiver, &zero, None); input.send(Signal::Tick).unwrap(); sleep(Duration::new(0, 1_000_000)); @@ -73,9 +73,9 @@ mod tests { sleep(Duration::new(0, 1_000_000)); input.send(Signal::Tick).unwrap(); - let entry0 = hist.entry_receiver.recv().unwrap(); - let entry1 = hist.entry_receiver.recv().unwrap(); - let entry2 = hist.entry_receiver.recv().unwrap(); + let entry0 = record_stage.entry_receiver.recv().unwrap(); + let entry1 = record_stage.entry_receiver.recv().unwrap(); + let entry2 = record_stage.entry_receiver.recv().unwrap(); assert_eq!(entry0.num_hashes, 0); assert_eq!(entry1.num_hashes, 0); @@ -83,7 +83,7 @@ mod tests { drop(input); assert_eq!( - hist.thread_hdl.join().unwrap(), + record_stage.thread_hdl.join().unwrap(), ExitReason::RecvDisconnected ); @@ -94,11 +94,11 @@ mod tests { fn test_historian_closed_sender() { let (input, event_receiver) = channel(); let zero = Hash::default(); - let hist = Historian::new(event_receiver, &zero, None); - drop(hist.entry_receiver); + let record_stage = RecordStage::new(event_receiver, &zero, None); + drop(record_stage.entry_receiver); input.send(Signal::Tick).unwrap(); assert_eq!( - hist.thread_hdl.join().unwrap(), + record_stage.thread_hdl.join().unwrap(), ExitReason::SendDisconnected ); } @@ -108,11 +108,11 @@ mod tests { fn test_ticking_historian() { let (input, event_receiver) = channel(); let zero = Hash::default(); - let hist = Historian::new(event_receiver, &zero, Some(Duration::from_millis(20))); + let record_stage = RecordStage::new(event_receiver, &zero, Some(Duration::from_millis(20))); sleep(Duration::from_millis(900)); input.send(Signal::Tick).unwrap(); drop(input); - let entries: Vec = hist.entry_receiver.iter().collect(); + let entries: Vec = record_stage.entry_receiver.iter().collect(); assert!(entries.len() > 1); // Ensure the ID is not the seed. diff --git a/src/rpu.rs b/src/rpu.rs index 0690d6069f8140..8019f7d0e447a3 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -6,8 +6,8 @@ use crdt::{Crdt, ReplicatedData}; use entry::Entry; use entry_writer::EntryWriter; use event_processor::EventProcessor; -use historian::Historian; use packet; +use record_stage::RecordStage; use request_processor::RequestProcessor; use request_stage::RequestStage; use result::Result; @@ -95,7 +95,7 @@ impl Rpu { blob_recycler.clone(), ); - let historian_stage = Historian::new( + let record_stage = RecordStage::new( request_stage.signal_receiver, &self.event_processor.start_hash, self.event_processor.tick_duration, @@ -108,7 +108,7 @@ impl Rpu { broadcast_sender, blob_recycler.clone(), Mutex::new(writer), - historian_stage.entry_receiver, + record_stage.entry_receiver, ); let broadcast_socket = UdpSocket::bind(local)?; diff --git a/src/tvu.rs b/src/tvu.rs index f05e9c2dc0a4ef..4e33bb77fa576d 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -6,9 +6,9 @@ use crdt::{Crdt, ReplicatedData}; use entry::Entry; use entry_writer::EntryWriter; use event_processor::EventProcessor; -use historian::Historian; use ledger; use packet; +use record_stage::RecordStage; use request_processor::RequestProcessor; use request_stage::RequestStage; use result::Result; @@ -180,7 +180,7 @@ impl Tvu { blob_recycler.clone(), ); - let historian_stage = Historian::new( + let record_stage = RecordStage::new( request_stage.signal_receiver, &obj.event_processor.start_hash, obj.event_processor.tick_duration, @@ -189,7 +189,7 @@ impl Tvu { let t_write = Self::drain_service( obj.event_processor.accountant.clone(), exit.clone(), - historian_stage.entry_receiver, + record_stage.entry_receiver, ); let t_responder = streamer::responder( From 685de30047d0fb5f1c48bc70f5e248cbe900260b Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 14 May 2018 14:35:25 -0600 Subject: [PATCH 5/8] Purge EventProcessor from RPU --- src/bin/testnode.rs | 5 +---- src/rpu.rs | 21 +++++++++++++-------- src/thin_client.rs | 21 +++------------------ 3 files changed, 17 insertions(+), 30 deletions(-) diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index b298ca4157b4b5..ac8b74d10a183a 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -10,7 +10,6 @@ use solana::accountant::Accountant; use solana::crdt::ReplicatedData; use solana::entry::Entry; use solana::event::Event; -use solana::event_processor::EventProcessor; use solana::rpu::Rpu; use solana::signature::{KeyPair, KeyPairUtil}; use std::env; @@ -116,10 +115,8 @@ fn main() { eprintln!("creating networking stack..."); - let event_processor = - EventProcessor::new(accountant, &last_id, Some(Duration::from_millis(1000))); let exit = Arc::new(AtomicBool::new(false)); - let rpu = Rpu::new(event_processor); + let rpu = Rpu::new(accountant, last_id, Some(Duration::from_millis(1000))); let serve_sock = UdpSocket::bind(&serve_addr).unwrap(); let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap(); let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap(); diff --git a/src/rpu.rs b/src/rpu.rs index 8019f7d0e447a3..6bc7aa52269348 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -5,7 +5,7 @@ use accountant::Accountant; use crdt::{Crdt, ReplicatedData}; use entry::Entry; use entry_writer::EntryWriter; -use event_processor::EventProcessor; +use hash::Hash; use packet; use record_stage::RecordStage; use request_processor::RequestProcessor; @@ -18,17 +18,22 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver}; use std::sync::{Arc, Mutex, RwLock}; use std::thread::{spawn, JoinHandle}; +use std::time::Duration; use streamer; pub struct Rpu { - event_processor: Arc, + accountant: Arc, + start_hash: Hash, + tick_duration: Option, } impl Rpu { /// Create a new Rpu that wraps the given Accountant. - pub fn new(event_processor: EventProcessor) -> Self { + pub fn new(accountant: Accountant, start_hash: Hash, tick_duration: Option) -> Self { Rpu { - event_processor: Arc::new(event_processor), + accountant: Arc::new(accountant), + start_hash, + tick_duration, } } @@ -86,7 +91,7 @@ impl Rpu { let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); let blob_recycler = packet::BlobRecycler::default(); - let request_processor = RequestProcessor::new(self.event_processor.accountant.clone()); + let request_processor = RequestProcessor::new(self.accountant.clone()); let request_stage = RequestStage::new( request_processor, exit.clone(), @@ -97,13 +102,13 @@ impl Rpu { let record_stage = RecordStage::new( request_stage.signal_receiver, - &self.event_processor.start_hash, - self.event_processor.tick_duration, + &self.start_hash, + self.tick_duration, ); let (broadcast_sender, broadcast_receiver) = channel(); let t_write = Self::write_service( - self.event_processor.accountant.clone(), + self.accountant.clone(), exit.clone(), broadcast_sender, blob_recycler.clone(), diff --git a/src/thin_client.rs b/src/thin_client.rs index 87cc37b856e894..a682dfb22e9437 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -191,12 +191,7 @@ mod tests { let accountant = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let event_processor = EventProcessor::new( - accountant, - &alice.last_id(), - Some(Duration::from_millis(30)), - ); - let rpu = Rpu::new(event_processor); + let rpu = Rpu::new(accountant, alice.last_id(), Some(Duration::from_millis(30))); let threads = rpu.serve(d, serve, gossip, exit.clone(), sink()).unwrap(); sleep(Duration::from_millis(900)); @@ -234,12 +229,7 @@ mod tests { let accountant = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let event_processor = EventProcessor::new( - accountant, - &alice.last_id(), - Some(Duration::from_millis(30)), - ); - let rpu = Rpu::new(event_processor); + let rpu = Rpu::new(accountant, alice.last_id(), Some(Duration::from_millis(30))); let serve_addr = leader_serve.local_addr().unwrap(); let threads = rpu.serve( leader_data, @@ -308,12 +298,7 @@ mod tests { let leader_acc = { let accountant = Accountant::new(&alice); - let event_processor = EventProcessor::new( - accountant, - &alice.last_id(), - Some(Duration::from_millis(30)), - ); - Rpu::new(event_processor) + Rpu::new(accountant, alice.last_id(), Some(Duration::from_millis(30))) }; let replicant_acc = { From 6e8f99d9b279694b064fae92b108caa7b30337ca Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 14 May 2018 14:45:29 -0600 Subject: [PATCH 6/8] Purge EventProcessor --- src/event_processor.rs | 172 ----------------------------------------- src/lib.rs | 1 - src/request_stage.rs | 136 ++++++++++++++++++++++++++++++++ src/thin_client.rs | 8 +- src/tvu.rs | 36 ++++----- 5 files changed, 157 insertions(+), 196 deletions(-) delete mode 100644 src/event_processor.rs diff --git a/src/event_processor.rs b/src/event_processor.rs deleted file mode 100644 index 3ce63d27b2985a..00000000000000 --- a/src/event_processor.rs +++ /dev/null @@ -1,172 +0,0 @@ -//! The `event_processor` module implements the accounting stage of the TPU. - -use accountant::Accountant; -use entry::Entry; -use event::Event; -use hash::Hash; -use record_stage::RecordStage; -use recorder::Signal; -use result::Result; -use std::sync::mpsc::{channel, Sender}; -use std::sync::{Arc, Mutex}; -use std::time::Duration; - -pub struct EventProcessor { - pub accountant: Arc, - historian_input: Mutex>, - record_stage: Mutex, - pub start_hash: Hash, - pub tick_duration: Option, -} - -impl EventProcessor { - /// Create a new stage of the TPU for event and transaction processing - pub fn new(accountant: Accountant, start_hash: &Hash, tick_duration: Option) -> Self { - let (historian_input, event_receiver) = channel(); - let record_stage = RecordStage::new(event_receiver, start_hash, tick_duration); - EventProcessor { - accountant: Arc::new(accountant), - historian_input: Mutex::new(historian_input), - record_stage: Mutex::new(record_stage), - start_hash: *start_hash, - tick_duration, - } - } - - /// Process the transactions in parallel and then log the successful ones. - pub fn process_events(&self, events: Vec) -> Result { - let record_stage = self.record_stage.lock().unwrap(); - let results = self.accountant.process_verified_events(events); - let events = results.into_iter().filter_map(|x| x.ok()).collect(); - let sender = self.historian_input.lock().unwrap(); - sender.send(Signal::Events(events))?; - - // Wait for the historian to tag our Events with an ID and then register it. - let entry = record_stage.entry_receiver.recv()?; - self.accountant.register_entry_id(&entry.id); - Ok(entry) - } -} - -#[cfg(test)] -mod tests { - use accountant::Accountant; - use event::Event; - use event_processor::EventProcessor; - use mint::Mint; - use signature::{KeyPair, KeyPairUtil}; - use transaction::Transaction; - - #[test] - // TODO: Move this test accounting_stage. Calling process_events() directly - // defeats the purpose of this test. - fn test_accounting_sequential_consistency() { - // In this attack we'll demonstrate that a verifier can interpret the ledger - // differently if either the server doesn't signal the ledger to add an - // Entry OR if the verifier tries to parallelize across multiple Entries. - let mint = Mint::new(2); - let accountant = Accountant::new(&mint); - let event_processor = EventProcessor::new(accountant, &mint.last_id(), None); - - // Process a batch that includes a transaction that receives two tokens. - let alice = KeyPair::new(); - let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); - let events = vec![Event::Transaction(tr)]; - let entry0 = event_processor.process_events(events).unwrap(); - - // Process a second batch that spends one of those tokens. - let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); - let events = vec![Event::Transaction(tr)]; - let entry1 = event_processor.process_events(events).unwrap(); - - // Collect the ledger and feed it to a new accountant. - let entries = vec![entry0, entry1]; - - // Assert the user holds one token, not two. If the server only output one - // entry, then the second transaction will be rejected, because it drives - // the account balance below zero before the credit is added. - let accountant = Accountant::new(&mint); - for entry in entries { - assert!( - accountant - .process_verified_events(entry.events) - .into_iter() - .all(|x| x.is_ok()) - ); - } - assert_eq!(accountant.get_balance(&alice.pubkey()), Some(1)); - } -} - -#[cfg(all(feature = "unstable", test))] -mod bench { - extern crate test; - use self::test::Bencher; - use accountant::{Accountant, MAX_ENTRY_IDS}; - use bincode::serialize; - use event_processor::*; - use hash::hash; - use mint::Mint; - use rayon::prelude::*; - use signature::{KeyPair, KeyPairUtil}; - use std::collections::HashSet; - use std::time::Instant; - use transaction::Transaction; - - #[bench] - fn process_events_bench(_bencher: &mut Bencher) { - let mint = Mint::new(100_000_000); - let accountant = Accountant::new(&mint); - // Create transactions between unrelated parties. - let txs = 100_000; - let last_ids: Mutex> = Mutex::new(HashSet::new()); - let transactions: Vec<_> = (0..txs) - .into_par_iter() - .map(|i| { - // Seed the 'to' account and a cell for its signature. - let dummy_id = i % (MAX_ENTRY_IDS as i32); - let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash - { - let mut last_ids = last_ids.lock().unwrap(); - if !last_ids.contains(&last_id) { - last_ids.insert(last_id); - accountant.register_entry_id(&last_id); - } - } - - // Seed the 'from' account. - let rando0 = KeyPair::new(); - let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id); - accountant.process_verified_transaction(&tr).unwrap(); - - let rando1 = KeyPair::new(); - let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id); - accountant.process_verified_transaction(&tr).unwrap(); - - // Finally, return a transaction that's unique - Transaction::new(&rando0, rando1.pubkey(), 1, last_id) - }) - .collect(); - - let events: Vec<_> = transactions - .into_iter() - .map(|tr| Event::Transaction(tr)) - .collect(); - - let event_processor = EventProcessor::new(accountant, &mint.last_id(), None); - - let now = Instant::now(); - assert!(event_processor.process_events(events).is_ok()); - let duration = now.elapsed(); - let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0; - let tps = txs as f64 / sec; - - // Ensure that all transactions were successfully logged. - drop(event_processor.historian_input); - let entries: Vec = event_processor.output.lock().unwrap().iter().collect(); - assert_eq!(entries.len(), 1); - assert_eq!(entries[0].events.len(), txs as usize); - - println!("{} tps", tps); - } -} diff --git a/src/lib.rs b/src/lib.rs index 7099ef43d898cc..d31c950b709bdb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,7 +7,6 @@ pub mod entry_writer; #[cfg(feature = "erasure")] pub mod erasure; pub mod event; -pub mod event_processor; pub mod hash; pub mod ledger; pub mod logger; diff --git a/src/request_stage.rs b/src/request_stage.rs index 2f9935a2f6e278..e5ef5ef20dc656 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -51,3 +51,139 @@ impl RequestStage { } } } + +// TODO: When accounting is pulled out of RequestStage, add this test back in. + +//use accountant::Accountant; +//use entry::Entry; +//use event::Event; +//use hash::Hash; +//use record_stage::RecordStage; +//use recorder::Signal; +//use result::Result; +//use std::sync::mpsc::{channel, Sender}; +//use std::sync::{Arc, Mutex}; +//use std::time::Duration; +// +//#[cfg(test)] +//mod tests { +// use accountant::Accountant; +// use event::Event; +// use event_processor::EventProcessor; +// use mint::Mint; +// use signature::{KeyPair, KeyPairUtil}; +// use transaction::Transaction; +// +// #[test] +// // TODO: Move this test accounting_stage. Calling process_events() directly +// // defeats the purpose of this test. +// fn test_accounting_sequential_consistency() { +// // In this attack we'll demonstrate that a verifier can interpret the ledger +// // differently if either the server doesn't signal the ledger to add an +// // Entry OR if the verifier tries to parallelize across multiple Entries. +// let mint = Mint::new(2); +// let accountant = Accountant::new(&mint); +// let event_processor = EventProcessor::new(accountant, &mint.last_id(), None); +// +// // Process a batch that includes a transaction that receives two tokens. +// let alice = KeyPair::new(); +// let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); +// let events = vec![Event::Transaction(tr)]; +// let entry0 = event_processor.process_events(events).unwrap(); +// +// // Process a second batch that spends one of those tokens. +// let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); +// let events = vec![Event::Transaction(tr)]; +// let entry1 = event_processor.process_events(events).unwrap(); +// +// // Collect the ledger and feed it to a new accountant. +// let entries = vec![entry0, entry1]; +// +// // Assert the user holds one token, not two. If the server only output one +// // entry, then the second transaction will be rejected, because it drives +// // the account balance below zero before the credit is added. +// let accountant = Accountant::new(&mint); +// for entry in entries { +// assert!( +// accountant +// .process_verified_events(entry.events) +// .into_iter() +// .all(|x| x.is_ok()) +// ); +// } +// assert_eq!(accountant.get_balance(&alice.pubkey()), Some(1)); +// } +//} +// +//#[cfg(all(feature = "unstable", test))] +//mod bench { +// extern crate test; +// use self::test::Bencher; +// use accountant::{Accountant, MAX_ENTRY_IDS}; +// use bincode::serialize; +// use event_processor::*; +// use hash::hash; +// use mint::Mint; +// use rayon::prelude::*; +// use signature::{KeyPair, KeyPairUtil}; +// use std::collections::HashSet; +// use std::time::Instant; +// use transaction::Transaction; +// +// #[bench] +// fn process_events_bench(_bencher: &mut Bencher) { +// let mint = Mint::new(100_000_000); +// let accountant = Accountant::new(&mint); +// // Create transactions between unrelated parties. +// let txs = 100_000; +// let last_ids: Mutex> = Mutex::new(HashSet::new()); +// let transactions: Vec<_> = (0..txs) +// .into_par_iter() +// .map(|i| { +// // Seed the 'to' account and a cell for its signature. +// let dummy_id = i % (MAX_ENTRY_IDS as i32); +// let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash +// { +// let mut last_ids = last_ids.lock().unwrap(); +// if !last_ids.contains(&last_id) { +// last_ids.insert(last_id); +// accountant.register_entry_id(&last_id); +// } +// } +// +// // Seed the 'from' account. +// let rando0 = KeyPair::new(); +// let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id); +// accountant.process_verified_transaction(&tr).unwrap(); +// +// let rando1 = KeyPair::new(); +// let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id); +// accountant.process_verified_transaction(&tr).unwrap(); +// +// // Finally, return a transaction that's unique +// Transaction::new(&rando0, rando1.pubkey(), 1, last_id) +// }) +// .collect(); +// +// let events: Vec<_> = transactions +// .into_iter() +// .map(|tr| Event::Transaction(tr)) +// .collect(); +// +// let event_processor = EventProcessor::new(accountant, &mint.last_id(), None); +// +// let now = Instant::now(); +// assert!(event_processor.process_events(events).is_ok()); +// let duration = now.elapsed(); +// let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0; +// let tps = txs as f64 / sec; +// +// // Ensure that all transactions were successfully logged. +// drop(event_processor.historian_input); +// let entries: Vec = event_processor.output.lock().unwrap().iter().collect(); +// assert_eq!(entries.len(), 1); +// assert_eq!(entries[0].events.len(), txs as usize); +// +// println!("{} tps", tps); +// } +//} diff --git a/src/thin_client.rs b/src/thin_client.rs index a682dfb22e9437..4f45018f11849f 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -156,7 +156,6 @@ mod tests { use super::*; use accountant::Accountant; use crdt::{Crdt, ReplicatedData}; - use event_processor::EventProcessor; use futures::Future; use logger; use mint::Mint; @@ -303,12 +302,11 @@ mod tests { let replicant_acc = { let accountant = Accountant::new(&alice); - let event_processor = EventProcessor::new( + Arc::new(Tvu::new( accountant, - &alice.last_id(), + alice.last_id(), Some(Duration::from_millis(30)), - ); - Arc::new(Tvu::new(event_processor)) + )) }; let leader_threads = leader_acc diff --git a/src/tvu.rs b/src/tvu.rs index 4e33bb77fa576d..df7a8989a100fa 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -5,7 +5,7 @@ use accountant::Accountant; use crdt::{Crdt, ReplicatedData}; use entry::Entry; use entry_writer::EntryWriter; -use event_processor::EventProcessor; +use hash::Hash; use ledger; use packet; use record_stage::RecordStage; @@ -22,14 +22,18 @@ use std::time::Duration; use streamer; pub struct Tvu { - event_processor: Arc, + accountant: Arc, + start_hash: Hash, + tick_duration: Option, } impl Tvu { /// Create a new Tvu that wraps the given Accountant. - pub fn new(event_processor: EventProcessor) -> Self { + pub fn new(accountant: Accountant, start_hash: Hash, tick_duration: Option) -> Self { Tvu { - event_processor: Arc::new(event_processor), + accountant: Arc::new(accountant), + start_hash, + tick_duration, } } @@ -61,9 +65,7 @@ impl Tvu { let blobs = verified_receiver.recv_timeout(timer)?; trace!("replicating blobs {}", blobs.len()); let entries = ledger::reconstruct_entries_from_blobs(&blobs); - obj.event_processor - .accountant - .process_verified_entries(entries)?; + obj.accountant.process_verified_entries(entries)?; for blob in blobs { blob_recycler.recycle(blob); } @@ -171,7 +173,7 @@ impl Tvu { let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); - let request_processor = RequestProcessor::new(obj.event_processor.accountant.clone()); + let request_processor = RequestProcessor::new(obj.accountant.clone()); let request_stage = RequestStage::new( request_processor, exit.clone(), @@ -182,12 +184,12 @@ impl Tvu { let record_stage = RecordStage::new( request_stage.signal_receiver, - &obj.event_processor.start_hash, - obj.event_processor.tick_duration, + &obj.start_hash, + obj.tick_duration, ); let t_write = Self::drain_service( - obj.event_processor.accountant.clone(), + obj.accountant.clone(), exit.clone(), record_stage.entry_receiver, ); @@ -244,7 +246,6 @@ mod tests { use crdt::Crdt; use entry; use event::Event; - use event_processor::EventProcessor; use hash::{hash, Hash}; use logger; use mint::Mint; @@ -311,12 +312,11 @@ mod tests { let starting_balance = 10_000; let alice = Mint::new(starting_balance); let accountant = Accountant::new(&alice); - let event_processor = EventProcessor::new( + let tvu = Arc::new(Tvu::new( accountant, - &alice.last_id(), + alice.last_id(), Some(Duration::from_millis(30)), - ); - let tvu = Arc::new(Tvu::new(event_processor)); + )); let replicate_addr = target1_data.replicate_addr; let threads = Tvu::serve( &tvu, @@ -341,7 +341,7 @@ mod tests { w.set_index(i).unwrap(); w.set_id(leader_id).unwrap(); - let accountant = &tvu.event_processor.accountant; + let accountant = &tvu.accountant; let tr0 = Event::new_timestamp(&bob_keypair, Utc::now()); let entry0 = entry::create_entry(&cur_hash, i, vec![tr0]); @@ -383,7 +383,7 @@ mod tests { msgs.push(msg); } - let accountant = &tvu.event_processor.accountant; + let accountant = &tvu.accountant; let alice_balance = accountant.get_balance(&alice.keypair().pubkey()).unwrap(); assert_eq!(alice_balance, alice_ref_balance); From d2dd005a596507ae2653734d51a7417c82b80804 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 14 May 2018 15:33:11 -0600 Subject: [PATCH 7/8] accountant -> bank --- src/{accountant.rs => bank.rs} | 188 +++++++++++++++------------------ src/bin/genesis-demo.rs | 2 +- src/bin/testnode.rs | 16 +-- src/crdt.rs | 2 +- src/entry_writer.rs | 12 +-- src/lib.rs | 2 +- src/request_processor.rs | 18 ++-- src/request_stage.rs | 34 +++--- src/result.rs | 10 +- src/rpu.rs | 18 ++-- src/streamer.rs | 2 +- src/thin_client.rs | 26 ++--- src/tvu.rs | 49 ++++----- 13 files changed, 179 insertions(+), 200 deletions(-) rename src/{accountant.rs => bank.rs} (76%) diff --git a/src/accountant.rs b/src/bank.rs similarity index 76% rename from src/accountant.rs rename to src/bank.rs index 857dca60633988..7989f138dd5b97 100644 --- a/src/accountant.rs +++ b/src/bank.rs @@ -1,4 +1,4 @@ -//! The `accountant` module tracks client balances, and the progress of pending +//! The `bank` module tracks client balances, and the progress of pending //! transactions. It offers a high-level public API that signs transactions //! on behalf of the caller, and a private low-level API for when they have //! already been signed and verified. @@ -23,13 +23,13 @@ use transaction::Transaction; pub const MAX_ENTRY_IDS: usize = 1024 * 4; #[derive(Debug, PartialEq, Eq)] -pub enum AccountingError { +pub enum BankError { AccountNotFound, InsufficientFunds, InvalidTransferSignature, } -pub type Result = result::Result; +pub type Result = result::Result; /// Commit funds to the 'to' party. fn apply_payment(balances: &RwLock>, payment: &Payment) { @@ -53,7 +53,7 @@ fn apply_payment(balances: &RwLock>, payment: &P } } -pub struct Accountant { +pub struct Bank { balances: RwLock>, pending: RwLock>, last_ids: RwLock>)>>, @@ -62,12 +62,12 @@ pub struct Accountant { transaction_count: AtomicUsize, } -impl Accountant { - /// Create an Accountant using a deposit. +impl Bank { + /// Create an Bank using a deposit. pub fn new_from_deposit(deposit: &Payment) -> Self { let balances = RwLock::new(HashMap::new()); apply_payment(&balances, deposit); - Accountant { + Bank { balances, pending: RwLock::new(HashMap::new()), last_ids: RwLock::new(VecDeque::new()), @@ -77,15 +77,15 @@ impl Accountant { } } - /// Create an Accountant with only a Mint. Typically used by unit tests. + /// Create an Bank with only a Mint. Typically used by unit tests. pub fn new(mint: &Mint) -> Self { let deposit = Payment { to: mint.pubkey(), tokens: mint.tokens, }; - let accountant = Self::new_from_deposit(&deposit); - accountant.register_entry_id(&mint.last_id()); - accountant + let bank = Self::new_from_deposit(&deposit); + bank.register_entry_id(&mint.last_id()); + bank } /// Return the last entry ID registered @@ -143,10 +143,10 @@ impl Accountant { false } - /// Tell the accountant which Entry IDs exist on the ledger. This function + /// Tell the bank which Entry IDs exist on the ledger. This function /// assumes subsequent calls correspond to later entries, and will boot /// the oldest ones once its internal cache is full. Once boot, the - /// accountant will reject transactions using that `last_id`. + /// bank will reject transactions using that `last_id`. pub fn register_entry_id(&self, last_id: &Hash) { let mut last_ids = self.last_ids .write() @@ -166,11 +166,11 @@ impl Accountant { let option = bals.get(&tr.from); if option.is_none() { - return Err(AccountingError::AccountNotFound); + return Err(BankError::AccountNotFound); } if !self.reserve_signature_with_last_id(&tr.sig, &tr.data.last_id) { - return Err(AccountingError::InvalidTransferSignature); + return Err(BankError::InvalidTransferSignature); } loop { @@ -179,7 +179,7 @@ impl Accountant { if current < tr.data.tokens { self.forget_signature_with_last_id(&tr.sig, &tr.data.last_id); - return Err(AccountingError::InsufficientFunds); + return Err(BankError::InsufficientFunds); } let result = bal.compare_exchange( @@ -406,207 +406,190 @@ mod tests { use signature::KeyPairUtil; #[test] - fn test_accountant() { + fn test_bank() { let alice = Mint::new(10_000); let bob_pubkey = KeyPair::new().pubkey(); - let accountant = Accountant::new(&alice); - assert_eq!(accountant.last_id(), alice.last_id()); + let bank = Bank::new(&alice); + assert_eq!(bank.last_id(), alice.last_id()); - accountant - .transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id()) + bank.transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id()) .unwrap(); - assert_eq!(accountant.get_balance(&bob_pubkey).unwrap(), 1_000); + assert_eq!(bank.get_balance(&bob_pubkey).unwrap(), 1_000); - accountant - .transfer(500, &alice.keypair(), bob_pubkey, alice.last_id()) + bank.transfer(500, &alice.keypair(), bob_pubkey, alice.last_id()) .unwrap(); - assert_eq!(accountant.get_balance(&bob_pubkey).unwrap(), 1_500); - assert_eq!(accountant.transaction_count(), 2); + assert_eq!(bank.get_balance(&bob_pubkey).unwrap(), 1_500); + assert_eq!(bank.transaction_count(), 2); } #[test] fn test_account_not_found() { let mint = Mint::new(1); - let accountant = Accountant::new(&mint); + let bank = Bank::new(&mint); assert_eq!( - accountant.transfer(1, &KeyPair::new(), mint.pubkey(), mint.last_id()), - Err(AccountingError::AccountNotFound) + bank.transfer(1, &KeyPair::new(), mint.pubkey(), mint.last_id()), + Err(BankError::AccountNotFound) ); - assert_eq!(accountant.transaction_count(), 0); + assert_eq!(bank.transaction_count(), 0); } #[test] fn test_invalid_transfer() { let alice = Mint::new(11_000); - let accountant = Accountant::new(&alice); + let bank = Bank::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); - accountant - .transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id()) + bank.transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id()) .unwrap(); - assert_eq!(accountant.transaction_count(), 1); + assert_eq!(bank.transaction_count(), 1); assert_eq!( - accountant.transfer(10_001, &alice.keypair(), bob_pubkey, alice.last_id()), - Err(AccountingError::InsufficientFunds) + bank.transfer(10_001, &alice.keypair(), bob_pubkey, alice.last_id()), + Err(BankError::InsufficientFunds) ); - assert_eq!(accountant.transaction_count(), 1); + assert_eq!(bank.transaction_count(), 1); let alice_pubkey = alice.keypair().pubkey(); - assert_eq!(accountant.get_balance(&alice_pubkey).unwrap(), 10_000); - assert_eq!(accountant.get_balance(&bob_pubkey).unwrap(), 1_000); + assert_eq!(bank.get_balance(&alice_pubkey).unwrap(), 10_000); + assert_eq!(bank.get_balance(&bob_pubkey).unwrap(), 1_000); } #[test] fn test_transfer_to_newb() { let alice = Mint::new(10_000); - let accountant = Accountant::new(&alice); + let bank = Bank::new(&alice); let alice_keypair = alice.keypair(); let bob_pubkey = KeyPair::new().pubkey(); - accountant - .transfer(500, &alice_keypair, bob_pubkey, alice.last_id()) + bank.transfer(500, &alice_keypair, bob_pubkey, alice.last_id()) .unwrap(); - assert_eq!(accountant.get_balance(&bob_pubkey).unwrap(), 500); + assert_eq!(bank.get_balance(&bob_pubkey).unwrap(), 500); } #[test] fn test_transfer_on_date() { let alice = Mint::new(1); - let accountant = Accountant::new(&alice); + let bank = Bank::new(&alice); let alice_keypair = alice.keypair(); let bob_pubkey = KeyPair::new().pubkey(); let dt = Utc::now(); - accountant - .transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.last_id()) + bank.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.last_id()) .unwrap(); // Alice's balance will be zero because all funds are locked up. - assert_eq!(accountant.get_balance(&alice.pubkey()), Some(0)); + assert_eq!(bank.get_balance(&alice.pubkey()), Some(0)); // tx count is 1, because debits were applied. - assert_eq!(accountant.transaction_count(), 1); + assert_eq!(bank.transaction_count(), 1); // Bob's balance will be None because the funds have not been // sent. - assert_eq!(accountant.get_balance(&bob_pubkey), None); + assert_eq!(bank.get_balance(&bob_pubkey), None); // Now, acknowledge the time in the condition occurred and // that bob's funds are now available. - accountant - .process_verified_timestamp(alice.pubkey(), dt) - .unwrap(); - assert_eq!(accountant.get_balance(&bob_pubkey), Some(1)); + bank.process_verified_timestamp(alice.pubkey(), dt).unwrap(); + assert_eq!(bank.get_balance(&bob_pubkey), Some(1)); // tx count is still 1, because we chose not to count timestamp events // tx count. - assert_eq!(accountant.transaction_count(), 1); + assert_eq!(bank.transaction_count(), 1); - accountant - .process_verified_timestamp(alice.pubkey(), dt) - .unwrap(); // <-- Attack! Attempt to process completed transaction. - assert_ne!(accountant.get_balance(&bob_pubkey), Some(2)); + bank.process_verified_timestamp(alice.pubkey(), dt).unwrap(); // <-- Attack! Attempt to process completed transaction. + assert_ne!(bank.get_balance(&bob_pubkey), Some(2)); } #[test] fn test_transfer_after_date() { let alice = Mint::new(1); - let accountant = Accountant::new(&alice); + let bank = Bank::new(&alice); let alice_keypair = alice.keypair(); let bob_pubkey = KeyPair::new().pubkey(); let dt = Utc::now(); - accountant - .process_verified_timestamp(alice.pubkey(), dt) - .unwrap(); + bank.process_verified_timestamp(alice.pubkey(), dt).unwrap(); // It's now past now, so this transfer should be processed immediately. - accountant - .transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.last_id()) + bank.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.last_id()) .unwrap(); - assert_eq!(accountant.get_balance(&alice.pubkey()), Some(0)); - assert_eq!(accountant.get_balance(&bob_pubkey), Some(1)); + assert_eq!(bank.get_balance(&alice.pubkey()), Some(0)); + assert_eq!(bank.get_balance(&bob_pubkey), Some(1)); } #[test] fn test_cancel_transfer() { let alice = Mint::new(1); - let accountant = Accountant::new(&alice); + let bank = Bank::new(&alice); let alice_keypair = alice.keypair(); let bob_pubkey = KeyPair::new().pubkey(); let dt = Utc::now(); - let sig = accountant - .transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.last_id()) + let sig = bank.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.last_id()) .unwrap(); // Assert the debit counts as a transaction. - assert_eq!(accountant.transaction_count(), 1); + assert_eq!(bank.transaction_count(), 1); // Alice's balance will be zero because all funds are locked up. - assert_eq!(accountant.get_balance(&alice.pubkey()), Some(0)); + assert_eq!(bank.get_balance(&alice.pubkey()), Some(0)); // Bob's balance will be None because the funds have not been // sent. - assert_eq!(accountant.get_balance(&bob_pubkey), None); + assert_eq!(bank.get_balance(&bob_pubkey), None); // Now, cancel the trancaction. Alice gets her funds back, Bob never sees them. - accountant - .process_verified_sig(alice.pubkey(), sig) - .unwrap(); - assert_eq!(accountant.get_balance(&alice.pubkey()), Some(1)); - assert_eq!(accountant.get_balance(&bob_pubkey), None); + bank.process_verified_sig(alice.pubkey(), sig).unwrap(); + assert_eq!(bank.get_balance(&alice.pubkey()), Some(1)); + assert_eq!(bank.get_balance(&bob_pubkey), None); // Assert cancel doesn't cause count to go backward. - assert_eq!(accountant.transaction_count(), 1); + assert_eq!(bank.transaction_count(), 1); - accountant - .process_verified_sig(alice.pubkey(), sig) - .unwrap(); // <-- Attack! Attempt to cancel completed transaction. - assert_ne!(accountant.get_balance(&alice.pubkey()), Some(2)); + bank.process_verified_sig(alice.pubkey(), sig).unwrap(); // <-- Attack! Attempt to cancel completed transaction. + assert_ne!(bank.get_balance(&alice.pubkey()), Some(2)); } #[test] fn test_duplicate_event_signature() { let alice = Mint::new(1); - let accountant = Accountant::new(&alice); + let bank = Bank::new(&alice); let sig = Signature::default(); - assert!(accountant.reserve_signature_with_last_id(&sig, &alice.last_id())); - assert!(!accountant.reserve_signature_with_last_id(&sig, &alice.last_id())); + assert!(bank.reserve_signature_with_last_id(&sig, &alice.last_id())); + assert!(!bank.reserve_signature_with_last_id(&sig, &alice.last_id())); } #[test] fn test_forget_signature() { let alice = Mint::new(1); - let accountant = Accountant::new(&alice); + let bank = Bank::new(&alice); let sig = Signature::default(); - accountant.reserve_signature_with_last_id(&sig, &alice.last_id()); - assert!(accountant.forget_signature_with_last_id(&sig, &alice.last_id())); - assert!(!accountant.forget_signature_with_last_id(&sig, &alice.last_id())); + bank.reserve_signature_with_last_id(&sig, &alice.last_id()); + assert!(bank.forget_signature_with_last_id(&sig, &alice.last_id())); + assert!(!bank.forget_signature_with_last_id(&sig, &alice.last_id())); } #[test] fn test_max_entry_ids() { let alice = Mint::new(1); - let accountant = Accountant::new(&alice); + let bank = Bank::new(&alice); let sig = Signature::default(); for i in 0..MAX_ENTRY_IDS { let last_id = hash(&serialize(&i).unwrap()); // Unique hash - accountant.register_entry_id(&last_id); + bank.register_entry_id(&last_id); } // Assert we're no longer able to use the oldest entry ID. - assert!(!accountant.reserve_signature_with_last_id(&sig, &alice.last_id())); + assert!(!bank.reserve_signature_with_last_id(&sig, &alice.last_id())); } #[test] fn test_debits_before_credits() { let mint = Mint::new(2); - let accountant = Accountant::new(&mint); + let bank = Bank::new(&mint); let alice = KeyPair::new(); let tr0 = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); let tr1 = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); let trs = vec![tr0, tr1]; - let results = accountant.process_verified_transactions(trs); + let results = bank.process_verified_transactions(trs); assert!(results[1].is_err()); // Assert bad transactions aren't counted. - assert_eq!(accountant.transaction_count(), 1); + assert_eq!(bank.transaction_count(), 1); } } @@ -614,7 +597,7 @@ mod tests { mod bench { extern crate test; use self::test::Bencher; - use accountant::*; + use bank::*; use bincode::serialize; use hash::hash; use signature::KeyPairUtil; @@ -622,7 +605,7 @@ mod bench { #[bench] fn process_verified_event_bench(bencher: &mut Bencher) { let mint = Mint::new(100_000_000); - let accountant = Accountant::new(&mint); + let bank = Bank::new(&mint); // Create transactions between unrelated parties. let transactions: Vec<_> = (0..4096) .into_par_iter() @@ -630,15 +613,15 @@ mod bench { // Seed the 'from' account. let rando0 = KeyPair::new(); let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, mint.last_id()); - accountant.process_verified_transaction(&tr).unwrap(); + bank.process_verified_transaction(&tr).unwrap(); // Seed the 'to' account and a cell for its signature. let last_id = hash(&serialize(&i).unwrap()); // Unique hash - accountant.register_entry_id(&last_id); + bank.register_entry_id(&last_id); let rando1 = KeyPair::new(); let tr = Transaction::new(&rando0, rando1.pubkey(), 1, last_id); - accountant.process_verified_transaction(&tr).unwrap(); + bank.process_verified_transaction(&tr).unwrap(); // Finally, return a transaction that's unique Transaction::new(&rando0, rando1.pubkey(), 1, last_id) @@ -646,13 +629,12 @@ mod bench { .collect(); bencher.iter(|| { // Since benchmarker runs this multiple times, we need to clear the signatures. - for sigs in accountant.last_ids.read().unwrap().iter() { + for sigs in bank.last_ids.read().unwrap().iter() { sigs.1.write().unwrap().clear(); } assert!( - accountant - .process_verified_transactions(transactions.clone()) + bank.process_verified_transactions(transactions.clone()) .iter() .all(|x| x.is_ok()) ); diff --git a/src/bin/genesis-demo.rs b/src/bin/genesis-demo.rs index 926a695f14bad4..0e408c11ec36b6 100644 --- a/src/bin/genesis-demo.rs +++ b/src/bin/genesis-demo.rs @@ -7,7 +7,7 @@ extern crate untrusted; use isatty::stdin_isatty; use rayon::prelude::*; -use solana::accountant::MAX_ENTRY_IDS; +use solana::bank::MAX_ENTRY_IDS; use solana::entry::{create_entry, next_entry}; use solana::event::Event; use solana::mint::MintDemo; diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index ac8b74d10a183a..51e4e4527b1205 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -6,7 +6,7 @@ extern crate solana; use getopts::Options; use isatty::stdin_isatty; -use solana::accountant::Accountant; +use solana::bank::Bank; use solana::crdt::ReplicatedData; use solana::entry::Entry; use solana::event::Event; @@ -92,31 +92,31 @@ fn main() { None }; - eprintln!("creating accountant..."); + eprintln!("creating bank..."); - let accountant = Accountant::new_from_deposit(&deposit.unwrap()); - accountant.register_entry_id(&entry0.id); - accountant.register_entry_id(&entry1.id); + let bank = Bank::new_from_deposit(&deposit.unwrap()); + bank.register_entry_id(&entry0.id); + bank.register_entry_id(&entry1.id); eprintln!("processing entries..."); let mut last_id = entry1.id; for entry in entries { last_id = entry.id; - let results = accountant.process_verified_events(entry.events); + let results = bank.process_verified_events(entry.events); for result in results { if let Err(e) = result { eprintln!("failed to process event {:?}", e); exit(1); } } - accountant.register_entry_id(&last_id); + bank.register_entry_id(&last_id); } eprintln!("creating networking stack..."); let exit = Arc::new(AtomicBool::new(false)); - let rpu = Rpu::new(accountant, last_id, Some(Duration::from_millis(1000))); + let rpu = Rpu::new(bank, last_id, Some(Duration::from_millis(1000))); let serve_sock = UdpSocket::bind(&serve_addr).unwrap(); let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap(); let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap(); diff --git a/src/crdt.rs b/src/crdt.rs index 965dac08fe4aa2..d5aa7a0f429fa6 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -11,7 +11,7 @@ //! * layer 1 - As many nodes as we can fit //! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes. //! -//! Accountant needs to provide an interface for us to query the stake weight +//! Bank needs to provide an interface for us to query the stake weight use bincode::{deserialize, serialize}; use byteorder::{LittleEndian, ReadBytesExt}; diff --git a/src/entry_writer.rs b/src/entry_writer.rs index ab973ba3bc2aa0..69e62f0f3225e0 100644 --- a/src/entry_writer.rs +++ b/src/entry_writer.rs @@ -1,6 +1,6 @@ //! The `entry_writer` module helps implement the TPU's write stage. -use accountant::Accountant; +use bank::Bank; use entry::Entry; use ledger; use packet; @@ -15,18 +15,18 @@ use std::time::Duration; use streamer; pub struct EntryWriter<'a> { - accountant: &'a Accountant, + bank: &'a Bank, } impl<'a> EntryWriter<'a> { - /// Create a new Tpu that wraps the given Accountant. - pub fn new(accountant: &'a Accountant) -> Self { - EntryWriter { accountant } + /// Create a new Tpu that wraps the given Bank. + pub fn new(bank: &'a Bank) -> Self { + EntryWriter { bank } } fn write_entry(&self, writer: &Mutex, entry: &Entry) { trace!("write_entry entry"); - self.accountant.register_entry_id(&entry.id); + self.bank.register_entry_id(&entry.id); writeln!( writer.lock().expect("'writer' lock in fn fn write_entry"), "{}", diff --git a/src/lib.rs b/src/lib.rs index d31c950b709bdb..e61b59f88936f6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,5 @@ #![cfg_attr(feature = "unstable", feature(test))] -pub mod accountant; +pub mod bank; pub mod crdt; pub mod ecdsa; pub mod entry; diff --git a/src/request_processor.rs b/src/request_processor.rs index 47692e1ef6f8de..f070bbfe3d91d2 100644 --- a/src/request_processor.rs +++ b/src/request_processor.rs @@ -1,6 +1,6 @@ //! The `request_stage` processes thin client Request messages. -use accountant::Accountant; +use bank::Bank; use bincode::{deserialize, serialize}; use event::Event; use packet; @@ -19,13 +19,13 @@ use streamer; use timing; pub struct RequestProcessor { - accountant: Arc, + bank: Arc, } impl RequestProcessor { - /// Create a new Tpu that wraps the given Accountant. - pub fn new(accountant: Arc) -> Self { - RequestProcessor { accountant } + /// Create a new Tpu that wraps the given Bank. + pub fn new(bank: Arc) -> Self { + RequestProcessor { bank } } /// Process Request items sent by clients. @@ -36,19 +36,19 @@ impl RequestProcessor { ) -> Option<(Response, SocketAddr)> { match msg { Request::GetBalance { key } => { - let val = self.accountant.get_balance(&key); + let val = self.bank.get_balance(&key); let rsp = (Response::Balance { key, val }, rsp_addr); info!("Response::Balance {:?}", rsp); Some(rsp) } Request::GetLastId => { - let id = self.accountant.last_id(); + let id = self.bank.last_id(); let rsp = (Response::LastId { id }, rsp_addr); info!("Response::LastId {:?}", rsp); Some(rsp) } Request::GetTransactionCount => { - let transaction_count = self.accountant.transaction_count() as u64; + let transaction_count = self.bank.transaction_count() as u64; let rsp = (Response::TransactionCount { transaction_count }, rsp_addr); info!("Response::TransactionCount {:?}", rsp); Some(rsp) @@ -174,7 +174,7 @@ impl RequestProcessor { debug!("events: {} reqs: {}", events.len(), reqs.len()); debug!("process_events"); - let results = self.accountant.process_verified_events(events); + let results = self.bank.process_verified_events(events); let events = results.into_iter().filter_map(|x| x.ok()).collect(); signal_sender.send(Signal::Events(events))?; debug!("done process_events"); diff --git a/src/request_stage.rs b/src/request_stage.rs index e5ef5ef20dc656..8efc4f8237b70d 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -52,9 +52,9 @@ impl RequestStage { } } -// TODO: When accounting is pulled out of RequestStage, add this test back in. +// TODO: When banking is pulled out of RequestStage, add this test back in. -//use accountant::Accountant; +//use bank::Bank; //use entry::Entry; //use event::Event; //use hash::Hash; @@ -67,7 +67,7 @@ impl RequestStage { // //#[cfg(test)] //mod tests { -// use accountant::Accountant; +// use bank::Bank; // use event::Event; // use event_processor::EventProcessor; // use mint::Mint; @@ -75,15 +75,15 @@ impl RequestStage { // use transaction::Transaction; // // #[test] -// // TODO: Move this test accounting_stage. Calling process_events() directly +// // TODO: Move this test banking_stage. Calling process_events() directly // // defeats the purpose of this test. -// fn test_accounting_sequential_consistency() { +// fn test_banking_sequential_consistency() { // // In this attack we'll demonstrate that a verifier can interpret the ledger // // differently if either the server doesn't signal the ledger to add an // // Entry OR if the verifier tries to parallelize across multiple Entries. // let mint = Mint::new(2); -// let accountant = Accountant::new(&mint); -// let event_processor = EventProcessor::new(accountant, &mint.last_id(), None); +// let bank = Bank::new(&mint); +// let event_processor = EventProcessor::new(bank, &mint.last_id(), None); // // // Process a batch that includes a transaction that receives two tokens. // let alice = KeyPair::new(); @@ -96,22 +96,22 @@ impl RequestStage { // let events = vec![Event::Transaction(tr)]; // let entry1 = event_processor.process_events(events).unwrap(); // -// // Collect the ledger and feed it to a new accountant. +// // Collect the ledger and feed it to a new bank. // let entries = vec![entry0, entry1]; // // // Assert the user holds one token, not two. If the server only output one // // entry, then the second transaction will be rejected, because it drives // // the account balance below zero before the credit is added. -// let accountant = Accountant::new(&mint); +// let bank = Bank::new(&mint); // for entry in entries { // assert!( -// accountant +// bank // .process_verified_events(entry.events) // .into_iter() // .all(|x| x.is_ok()) // ); // } -// assert_eq!(accountant.get_balance(&alice.pubkey()), Some(1)); +// assert_eq!(bank.get_balance(&alice.pubkey()), Some(1)); // } //} // @@ -119,7 +119,7 @@ impl RequestStage { //mod bench { // extern crate test; // use self::test::Bencher; -// use accountant::{Accountant, MAX_ENTRY_IDS}; +// use bank::{Bank, MAX_ENTRY_IDS}; // use bincode::serialize; // use event_processor::*; // use hash::hash; @@ -133,7 +133,7 @@ impl RequestStage { // #[bench] // fn process_events_bench(_bencher: &mut Bencher) { // let mint = Mint::new(100_000_000); -// let accountant = Accountant::new(&mint); +// let bank = Bank::new(&mint); // // Create transactions between unrelated parties. // let txs = 100_000; // let last_ids: Mutex> = Mutex::new(HashSet::new()); @@ -147,18 +147,18 @@ impl RequestStage { // let mut last_ids = last_ids.lock().unwrap(); // if !last_ids.contains(&last_id) { // last_ids.insert(last_id); -// accountant.register_entry_id(&last_id); +// bank.register_entry_id(&last_id); // } // } // // // Seed the 'from' account. // let rando0 = KeyPair::new(); // let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id); -// accountant.process_verified_transaction(&tr).unwrap(); +// bank.process_verified_transaction(&tr).unwrap(); // // let rando1 = KeyPair::new(); // let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id); -// accountant.process_verified_transaction(&tr).unwrap(); +// bank.process_verified_transaction(&tr).unwrap(); // // // Finally, return a transaction that's unique // Transaction::new(&rando0, rando1.pubkey(), 1, last_id) @@ -170,7 +170,7 @@ impl RequestStage { // .map(|tr| Event::Transaction(tr)) // .collect(); // -// let event_processor = EventProcessor::new(accountant, &mint.last_id(), None); +// let event_processor = EventProcessor::new(bank, &mint.last_id(), None); // // let now = Instant::now(); // assert!(event_processor.process_events(events).is_ok()); diff --git a/src/result.rs b/src/result.rs index 3e2d2ac5499827..2c08058f755889 100644 --- a/src/result.rs +++ b/src/result.rs @@ -1,6 +1,6 @@ //! The `result` module exposes a Result type that propagates one of many different Error types. -use accountant; +use bank; use bincode; use serde_json; use std; @@ -15,7 +15,7 @@ pub enum Error { RecvError(std::sync::mpsc::RecvError), RecvTimeoutError(std::sync::mpsc::RecvTimeoutError), Serialize(std::boxed::Box), - AccountingError(accountant::AccountingError), + BankError(bank::BankError), SendError, Services, GeneralError, @@ -33,9 +33,9 @@ impl std::convert::From for Error { Error::RecvTimeoutError(e) } } -impl std::convert::From for Error { - fn from(e: accountant::AccountingError) -> Error { - Error::AccountingError(e) +impl std::convert::From for Error { + fn from(e: bank::BankError) -> Error { + Error::BankError(e) } } impl std::convert::From> for Error { diff --git a/src/rpu.rs b/src/rpu.rs index 6bc7aa52269348..cf43869d8d8df7 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -1,7 +1,7 @@ //! The `rpu` module implements the Request Processing Unit, a //! 5-stage transaction processing pipeline in software. -use accountant::Accountant; +use bank::Bank; use crdt::{Crdt, ReplicatedData}; use entry::Entry; use entry_writer::EntryWriter; @@ -22,23 +22,23 @@ use std::time::Duration; use streamer; pub struct Rpu { - accountant: Arc, + bank: Arc, start_hash: Hash, tick_duration: Option, } impl Rpu { - /// Create a new Rpu that wraps the given Accountant. - pub fn new(accountant: Accountant, start_hash: Hash, tick_duration: Option) -> Self { + /// Create a new Rpu that wraps the given Bank. + pub fn new(bank: Bank, start_hash: Hash, tick_duration: Option) -> Self { Rpu { - accountant: Arc::new(accountant), + bank: Arc::new(bank), start_hash, tick_duration, } } fn write_service( - accountant: Arc, + bank: Arc, exit: Arc, broadcast: streamer::BlobSender, blob_recycler: packet::BlobRecycler, @@ -46,7 +46,7 @@ impl Rpu { entry_receiver: Receiver, ) -> JoinHandle<()> { spawn(move || loop { - let entry_writer = EntryWriter::new(&accountant); + let entry_writer = EntryWriter::new(&bank); let _ = entry_writer.write_and_send_entries( &broadcast, &blob_recycler, @@ -91,7 +91,7 @@ impl Rpu { let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); let blob_recycler = packet::BlobRecycler::default(); - let request_processor = RequestProcessor::new(self.accountant.clone()); + let request_processor = RequestProcessor::new(self.bank.clone()); let request_stage = RequestStage::new( request_processor, exit.clone(), @@ -108,7 +108,7 @@ impl Rpu { let (broadcast_sender, broadcast_receiver) = channel(); let t_write = Self::write_service( - self.accountant.clone(), + self.bank.clone(), exit.clone(), broadcast_sender, blob_recycler.clone(), diff --git a/src/streamer.rs b/src/streamer.rs index 77aab25dcbd178..c03b7c36cb9cb3 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -322,7 +322,7 @@ fn retransmit( /// # Arguments /// * `sock` - Socket to read from. Read timeout is set to 1. /// * `exit` - Boolean to signal system exit. -/// * `crdt` - This structure needs to be updated and populated by the accountant and via gossip. +/// * `crdt` - This structure needs to be updated and populated by the bank and via gossip. /// * `recycler` - Blob recycler. /// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. pub fn retransmitter( diff --git a/src/thin_client.rs b/src/thin_client.rs index 4f45018f11849f..411e17afc410ae 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -154,7 +154,7 @@ impl ThinClient { #[cfg(test)] mod tests { use super::*; - use accountant::Accountant; + use bank::Bank; use crdt::{Crdt, ReplicatedData}; use futures::Future; use logger; @@ -187,10 +187,10 @@ mod tests { ); let alice = Mint::new(10_000); - let accountant = Accountant::new(&alice); + let bank = Bank::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let rpu = Rpu::new(accountant, alice.last_id(), Some(Duration::from_millis(30))); + let rpu = Rpu::new(bank, alice.last_id(), Some(Duration::from_millis(30))); let threads = rpu.serve(d, serve, gossip, exit.clone(), sink()).unwrap(); sleep(Duration::from_millis(900)); @@ -225,10 +225,10 @@ mod tests { fn test_bad_sig() { let (leader_data, leader_gossip, _, leader_serve, _leader_events) = tvu::test_node(); let alice = Mint::new(10_000); - let accountant = Accountant::new(&alice); + let bank = Bank::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let rpu = Rpu::new(accountant, alice.last_id(), Some(Duration::from_millis(30))); + let rpu = Rpu::new(bank, alice.last_id(), Some(Duration::from_millis(30))); let serve_addr = leader_serve.local_addr().unwrap(); let threads = rpu.serve( leader_data, @@ -295,25 +295,25 @@ mod tests { let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let leader_acc = { - let accountant = Accountant::new(&alice); - Rpu::new(accountant, alice.last_id(), Some(Duration::from_millis(30))) + let leader_bank = { + let bank = Bank::new(&alice); + Rpu::new(bank, alice.last_id(), Some(Duration::from_millis(30))) }; - let replicant_acc = { - let accountant = Accountant::new(&alice); + let replicant_bank = { + let bank = Bank::new(&alice); Arc::new(Tvu::new( - accountant, + bank, alice.last_id(), Some(Duration::from_millis(30)), )) }; - let leader_threads = leader_acc + let leader_threads = leader_bank .serve(leader.0.clone(), leader.2, leader.1, exit.clone(), sink()) .unwrap(); let replicant_threads = Tvu::serve( - &replicant_acc, + &replicant_bank, replicant.0.clone(), replicant.1, replicant.2, diff --git a/src/tvu.rs b/src/tvu.rs index df7a8989a100fa..62de4d839d6c4b 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -1,7 +1,7 @@ //! The `tvu` module implements the Transaction Validation Unit, a //! 5-stage transaction validation pipeline in software. -use accountant::Accountant; +use bank::Bank; use crdt::{Crdt, ReplicatedData}; use entry::Entry; use entry_writer::EntryWriter; @@ -22,28 +22,28 @@ use std::time::Duration; use streamer; pub struct Tvu { - accountant: Arc, + bank: Arc, start_hash: Hash, tick_duration: Option, } impl Tvu { - /// Create a new Tvu that wraps the given Accountant. - pub fn new(accountant: Accountant, start_hash: Hash, tick_duration: Option) -> Self { + /// Create a new Tvu that wraps the given Bank. + pub fn new(bank: Bank, start_hash: Hash, tick_duration: Option) -> Self { Tvu { - accountant: Arc::new(accountant), + bank: Arc::new(bank), start_hash, tick_duration, } } fn drain_service( - accountant: Arc, + bank: Arc, exit: Arc, entry_receiver: Receiver, ) -> JoinHandle<()> { spawn(move || { - let entry_writer = EntryWriter::new(&accountant); + let entry_writer = EntryWriter::new(&bank); loop { let _ = entry_writer.drain_entries(&entry_receiver); if exit.load(Ordering::Relaxed) { @@ -65,7 +65,7 @@ impl Tvu { let blobs = verified_receiver.recv_timeout(timer)?; trace!("replicating blobs {}", blobs.len()); let entries = ledger::reconstruct_entries_from_blobs(&blobs); - obj.accountant.process_verified_entries(entries)?; + obj.bank.process_verified_entries(entries)?; for blob in blobs { blob_recycler.recycle(blob); } @@ -73,9 +73,9 @@ impl Tvu { } /// This service receives messages from a leader in the network and processes the transactions - /// on the accountant state. + /// on the bank state. /// # Arguments - /// * `obj` - The accountant state. + /// * `obj` - The bank state. /// * `me` - my configuration /// * `leader` - leader configuration /// * `exit` - The exit signal. @@ -173,7 +173,7 @@ impl Tvu { let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); - let request_processor = RequestProcessor::new(obj.accountant.clone()); + let request_processor = RequestProcessor::new(obj.bank.clone()); let request_stage = RequestStage::new( request_processor, exit.clone(), @@ -188,11 +188,8 @@ impl Tvu { obj.tick_duration, ); - let t_write = Self::drain_service( - obj.accountant.clone(), - exit.clone(), - record_stage.entry_receiver, - ); + let t_write = + Self::drain_service(obj.bank.clone(), exit.clone(), record_stage.entry_receiver); let t_responder = streamer::responder( respond_socket, @@ -240,7 +237,7 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke #[cfg(test)] mod tests { - use accountant::Accountant; + use bank::Bank; use bincode::serialize; use chrono::prelude::*; use crdt::Crdt; @@ -311,9 +308,9 @@ mod tests { let starting_balance = 10_000; let alice = Mint::new(starting_balance); - let accountant = Accountant::new(&alice); + let bank = Bank::new(&alice); let tvu = Arc::new(Tvu::new( - accountant, + bank, alice.last_id(), Some(Duration::from_millis(30)), )); @@ -341,11 +338,11 @@ mod tests { w.set_index(i).unwrap(); w.set_id(leader_id).unwrap(); - let accountant = &tvu.accountant; + let bank = &tvu.bank; let tr0 = Event::new_timestamp(&bob_keypair, Utc::now()); let entry0 = entry::create_entry(&cur_hash, i, vec![tr0]); - accountant.register_entry_id(&cur_hash); + bank.register_entry_id(&cur_hash); cur_hash = hash(&cur_hash); let tr1 = Transaction::new( @@ -354,11 +351,11 @@ mod tests { transfer_amount, cur_hash, ); - accountant.register_entry_id(&cur_hash); + bank.register_entry_id(&cur_hash); cur_hash = hash(&cur_hash); let entry1 = entry::create_entry(&cur_hash, i + num_blobs, vec![Event::Transaction(tr1)]); - accountant.register_entry_id(&cur_hash); + bank.register_entry_id(&cur_hash); cur_hash = hash(&cur_hash); alice_ref_balance -= transfer_amount; @@ -383,11 +380,11 @@ mod tests { msgs.push(msg); } - let accountant = &tvu.accountant; - let alice_balance = accountant.get_balance(&alice.keypair().pubkey()).unwrap(); + let bank = &tvu.bank; + let alice_balance = bank.get_balance(&alice.keypair().pubkey()).unwrap(); assert_eq!(alice_balance, alice_ref_balance); - let bob_balance = accountant.get_balance(&bob_keypair.pubkey()).unwrap(); + let bob_balance = bank.get_balance(&bob_keypair.pubkey()).unwrap(); assert_eq!(bob_balance, starting_balance - alice_ref_balance); exit.store(true, Ordering::Relaxed); From 7736b9cac669a12e3a67b479fb5b29f99a55f1f9 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 14 May 2018 15:39:34 -0600 Subject: [PATCH 8/8] Boot Alice and Bob from the unit tests --- src/bank.rs | 144 +++++++++++++++++++++++++--------------------------- src/tvu.rs | 10 ++-- 2 files changed, 75 insertions(+), 79 deletions(-) diff --git a/src/bank.rs b/src/bank.rs index 7989f138dd5b97..04e2d8d06381f6 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -407,18 +407,18 @@ mod tests { #[test] fn test_bank() { - let alice = Mint::new(10_000); - let bob_pubkey = KeyPair::new().pubkey(); - let bank = Bank::new(&alice); - assert_eq!(bank.last_id(), alice.last_id()); + let mint = Mint::new(10_000); + let pubkey = KeyPair::new().pubkey(); + let bank = Bank::new(&mint); + assert_eq!(bank.last_id(), mint.last_id()); - bank.transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id()) + bank.transfer(1_000, &mint.keypair(), pubkey, mint.last_id()) .unwrap(); - assert_eq!(bank.get_balance(&bob_pubkey).unwrap(), 1_000); + assert_eq!(bank.get_balance(&pubkey).unwrap(), 1_000); - bank.transfer(500, &alice.keypair(), bob_pubkey, alice.last_id()) + bank.transfer(500, &mint.keypair(), pubkey, mint.last_id()) .unwrap(); - assert_eq!(bank.get_balance(&bob_pubkey).unwrap(), 1_500); + assert_eq!(bank.get_balance(&pubkey).unwrap(), 1_500); assert_eq!(bank.transaction_count(), 2); } @@ -435,155 +435,151 @@ mod tests { #[test] fn test_invalid_transfer() { - let alice = Mint::new(11_000); - let bank = Bank::new(&alice); - let bob_pubkey = KeyPair::new().pubkey(); - bank.transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id()) + let mint = Mint::new(11_000); + let bank = Bank::new(&mint); + let pubkey = KeyPair::new().pubkey(); + bank.transfer(1_000, &mint.keypair(), pubkey, mint.last_id()) .unwrap(); assert_eq!(bank.transaction_count(), 1); assert_eq!( - bank.transfer(10_001, &alice.keypair(), bob_pubkey, alice.last_id()), + bank.transfer(10_001, &mint.keypair(), pubkey, mint.last_id()), Err(BankError::InsufficientFunds) ); assert_eq!(bank.transaction_count(), 1); - let alice_pubkey = alice.keypair().pubkey(); - assert_eq!(bank.get_balance(&alice_pubkey).unwrap(), 10_000); - assert_eq!(bank.get_balance(&bob_pubkey).unwrap(), 1_000); + let mint_pubkey = mint.keypair().pubkey(); + assert_eq!(bank.get_balance(&mint_pubkey).unwrap(), 10_000); + assert_eq!(bank.get_balance(&pubkey).unwrap(), 1_000); } #[test] fn test_transfer_to_newb() { - let alice = Mint::new(10_000); - let bank = Bank::new(&alice); - let alice_keypair = alice.keypair(); - let bob_pubkey = KeyPair::new().pubkey(); - bank.transfer(500, &alice_keypair, bob_pubkey, alice.last_id()) + let mint = Mint::new(10_000); + let bank = Bank::new(&mint); + let pubkey = KeyPair::new().pubkey(); + bank.transfer(500, &mint.keypair(), pubkey, mint.last_id()) .unwrap(); - assert_eq!(bank.get_balance(&bob_pubkey).unwrap(), 500); + assert_eq!(bank.get_balance(&pubkey).unwrap(), 500); } #[test] fn test_transfer_on_date() { - let alice = Mint::new(1); - let bank = Bank::new(&alice); - let alice_keypair = alice.keypair(); - let bob_pubkey = KeyPair::new().pubkey(); + let mint = Mint::new(1); + let bank = Bank::new(&mint); + let pubkey = KeyPair::new().pubkey(); let dt = Utc::now(); - bank.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.last_id()) + bank.transfer_on_date(1, &mint.keypair(), pubkey, dt, mint.last_id()) .unwrap(); - // Alice's balance will be zero because all funds are locked up. - assert_eq!(bank.get_balance(&alice.pubkey()), Some(0)); + // Mint's balance will be zero because all funds are locked up. + assert_eq!(bank.get_balance(&mint.pubkey()), Some(0)); // tx count is 1, because debits were applied. assert_eq!(bank.transaction_count(), 1); - // Bob's balance will be None because the funds have not been + // pubkey's balance will be None because the funds have not been // sent. - assert_eq!(bank.get_balance(&bob_pubkey), None); + assert_eq!(bank.get_balance(&pubkey), None); // Now, acknowledge the time in the condition occurred and - // that bob's funds are now available. - bank.process_verified_timestamp(alice.pubkey(), dt).unwrap(); - assert_eq!(bank.get_balance(&bob_pubkey), Some(1)); + // that pubkey's funds are now available. + bank.process_verified_timestamp(mint.pubkey(), dt).unwrap(); + assert_eq!(bank.get_balance(&pubkey), Some(1)); // tx count is still 1, because we chose not to count timestamp events // tx count. assert_eq!(bank.transaction_count(), 1); - bank.process_verified_timestamp(alice.pubkey(), dt).unwrap(); // <-- Attack! Attempt to process completed transaction. - assert_ne!(bank.get_balance(&bob_pubkey), Some(2)); + bank.process_verified_timestamp(mint.pubkey(), dt).unwrap(); // <-- Attack! Attempt to process completed transaction. + assert_ne!(bank.get_balance(&pubkey), Some(2)); } #[test] fn test_transfer_after_date() { - let alice = Mint::new(1); - let bank = Bank::new(&alice); - let alice_keypair = alice.keypair(); - let bob_pubkey = KeyPair::new().pubkey(); + let mint = Mint::new(1); + let bank = Bank::new(&mint); + let pubkey = KeyPair::new().pubkey(); let dt = Utc::now(); - bank.process_verified_timestamp(alice.pubkey(), dt).unwrap(); + bank.process_verified_timestamp(mint.pubkey(), dt).unwrap(); // It's now past now, so this transfer should be processed immediately. - bank.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.last_id()) + bank.transfer_on_date(1, &mint.keypair(), pubkey, dt, mint.last_id()) .unwrap(); - assert_eq!(bank.get_balance(&alice.pubkey()), Some(0)); - assert_eq!(bank.get_balance(&bob_pubkey), Some(1)); + assert_eq!(bank.get_balance(&mint.pubkey()), Some(0)); + assert_eq!(bank.get_balance(&pubkey), Some(1)); } #[test] fn test_cancel_transfer() { - let alice = Mint::new(1); - let bank = Bank::new(&alice); - let alice_keypair = alice.keypair(); - let bob_pubkey = KeyPair::new().pubkey(); + let mint = Mint::new(1); + let bank = Bank::new(&mint); + let pubkey = KeyPair::new().pubkey(); let dt = Utc::now(); - let sig = bank.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.last_id()) + let sig = bank.transfer_on_date(1, &mint.keypair(), pubkey, dt, mint.last_id()) .unwrap(); // Assert the debit counts as a transaction. assert_eq!(bank.transaction_count(), 1); - // Alice's balance will be zero because all funds are locked up. - assert_eq!(bank.get_balance(&alice.pubkey()), Some(0)); + // Mint's balance will be zero because all funds are locked up. + assert_eq!(bank.get_balance(&mint.pubkey()), Some(0)); - // Bob's balance will be None because the funds have not been + // pubkey's balance will be None because the funds have not been // sent. - assert_eq!(bank.get_balance(&bob_pubkey), None); + assert_eq!(bank.get_balance(&pubkey), None); - // Now, cancel the trancaction. Alice gets her funds back, Bob never sees them. - bank.process_verified_sig(alice.pubkey(), sig).unwrap(); - assert_eq!(bank.get_balance(&alice.pubkey()), Some(1)); - assert_eq!(bank.get_balance(&bob_pubkey), None); + // Now, cancel the trancaction. Mint gets her funds back, pubkey never sees them. + bank.process_verified_sig(mint.pubkey(), sig).unwrap(); + assert_eq!(bank.get_balance(&mint.pubkey()), Some(1)); + assert_eq!(bank.get_balance(&pubkey), None); // Assert cancel doesn't cause count to go backward. assert_eq!(bank.transaction_count(), 1); - bank.process_verified_sig(alice.pubkey(), sig).unwrap(); // <-- Attack! Attempt to cancel completed transaction. - assert_ne!(bank.get_balance(&alice.pubkey()), Some(2)); + bank.process_verified_sig(mint.pubkey(), sig).unwrap(); // <-- Attack! Attempt to cancel completed transaction. + assert_ne!(bank.get_balance(&mint.pubkey()), Some(2)); } #[test] fn test_duplicate_event_signature() { - let alice = Mint::new(1); - let bank = Bank::new(&alice); + let mint = Mint::new(1); + let bank = Bank::new(&mint); let sig = Signature::default(); - assert!(bank.reserve_signature_with_last_id(&sig, &alice.last_id())); - assert!(!bank.reserve_signature_with_last_id(&sig, &alice.last_id())); + assert!(bank.reserve_signature_with_last_id(&sig, &mint.last_id())); + assert!(!bank.reserve_signature_with_last_id(&sig, &mint.last_id())); } #[test] fn test_forget_signature() { - let alice = Mint::new(1); - let bank = Bank::new(&alice); + let mint = Mint::new(1); + let bank = Bank::new(&mint); let sig = Signature::default(); - bank.reserve_signature_with_last_id(&sig, &alice.last_id()); - assert!(bank.forget_signature_with_last_id(&sig, &alice.last_id())); - assert!(!bank.forget_signature_with_last_id(&sig, &alice.last_id())); + bank.reserve_signature_with_last_id(&sig, &mint.last_id()); + assert!(bank.forget_signature_with_last_id(&sig, &mint.last_id())); + assert!(!bank.forget_signature_with_last_id(&sig, &mint.last_id())); } #[test] fn test_max_entry_ids() { - let alice = Mint::new(1); - let bank = Bank::new(&alice); + let mint = Mint::new(1); + let bank = Bank::new(&mint); let sig = Signature::default(); for i in 0..MAX_ENTRY_IDS { let last_id = hash(&serialize(&i).unwrap()); // Unique hash bank.register_entry_id(&last_id); } // Assert we're no longer able to use the oldest entry ID. - assert!(!bank.reserve_signature_with_last_id(&sig, &alice.last_id())); + assert!(!bank.reserve_signature_with_last_id(&sig, &mint.last_id())); } #[test] fn test_debits_before_credits() { let mint = Mint::new(2); let bank = Bank::new(&mint); - let alice = KeyPair::new(); - let tr0 = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); - let tr1 = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); + let keypair = KeyPair::new(); + let tr0 = Transaction::new(&mint.keypair(), keypair.pubkey(), 2, mint.last_id()); + let tr1 = Transaction::new(&keypair, mint.pubkey(), 1, mint.last_id()); let trs = vec![tr0, tr1]; let results = bank.process_verified_transactions(trs); assert!(results[1].is_err()); diff --git a/src/tvu.rs b/src/tvu.rs index 62de4d839d6c4b..21b2ca61f09195 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -307,11 +307,11 @@ mod tests { ); let starting_balance = 10_000; - let alice = Mint::new(starting_balance); - let bank = Bank::new(&alice); + let mint = Mint::new(starting_balance); + let bank = Bank::new(&mint); let tvu = Arc::new(Tvu::new( bank, - alice.last_id(), + mint.last_id(), Some(Duration::from_millis(30)), )); let replicate_addr = target1_data.replicate_addr; @@ -346,7 +346,7 @@ mod tests { cur_hash = hash(&cur_hash); let tr1 = Transaction::new( - &alice.keypair(), + &mint.keypair(), bob_keypair.pubkey(), transfer_amount, cur_hash, @@ -381,7 +381,7 @@ mod tests { } let bank = &tvu.bank; - let alice_balance = bank.get_balance(&alice.keypair().pubkey()).unwrap(); + let alice_balance = bank.get_balance(&mint.keypair().pubkey()).unwrap(); assert_eq!(alice_balance, alice_ref_balance); let bob_balance = bank.get_balance(&bob_keypair.pubkey()).unwrap();