-
Notifications
You must be signed in to change notification settings - Fork 4.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Extract accounting stage code from tpu
- Loading branch information
Showing
2 changed files
with
266 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,265 @@ | ||
//! The `accounting_stage` module implements the accounting stage of the TPU. | ||
use accountant::Accountant; | ||
use bincode::serialize; | ||
use entry::Entry; | ||
use event::Event; | ||
use hash::Hash; | ||
use recorder::Signal; | ||
use result::Result; | ||
use signature::PublicKey; | ||
use std::net::{SocketAddr, UdpSocket}; | ||
use std::sync::mpsc::SyncSender; | ||
use std::sync::Mutex; | ||
use transaction::Transaction; | ||
|
||
pub struct AccountingStage { | ||
acc: Mutex<Accountant>, | ||
historian_input: Mutex<SyncSender<Signal>>, | ||
entry_info_subscribers: Mutex<Vec<SocketAddr>>, | ||
} | ||
|
||
impl AccountingStage { | ||
/// Create a new Tpu that wraps the given Accountant. | ||
pub fn new(acc: Accountant, historian_input: SyncSender<Signal>) -> Self { | ||
AccountingStage { | ||
acc: Mutex::new(acc), | ||
entry_info_subscribers: Mutex::new(vec![]), | ||
historian_input: Mutex::new(historian_input), | ||
} | ||
} | ||
|
||
/// Process the transactions in parallel and then log the successful ones. | ||
pub fn process_events(&self, events: Vec<Event>) -> Result<()> { | ||
let results = self.acc.lock().unwrap().process_verified_events(events); | ||
let events = results.into_iter().filter_map(|x| x.ok()).collect(); | ||
let sender = self.historian_input.lock().unwrap(); | ||
sender.send(Signal::Events(events))?; | ||
debug!("after historian_input"); | ||
Ok(()) | ||
} | ||
|
||
/// Process Request items sent by clients. | ||
fn process_request( | ||
&self, | ||
msg: Request, | ||
rsp_addr: SocketAddr, | ||
) -> Option<(Response, SocketAddr)> { | ||
match msg { | ||
Request::GetBalance { key } => { | ||
let val = self.acc.lock().unwrap().get_balance(&key); | ||
let rsp = (Response::Balance { key, val }, rsp_addr); | ||
info!("Response::Balance {:?}", rsp); | ||
Some(rsp) | ||
} | ||
Request::Transaction(_) => unreachable!(), | ||
Request::Subscribe { subscriptions } => { | ||
for subscription in subscriptions { | ||
match subscription { | ||
Subscription::EntryInfo => { | ||
self.entry_info_subscribers.lock().unwrap().push(rsp_addr) | ||
} | ||
} | ||
} | ||
None | ||
} | ||
} | ||
} | ||
|
||
pub fn process_requests( | ||
&self, | ||
reqs: Vec<(Request, SocketAddr)>, | ||
) -> Vec<(Response, SocketAddr)> { | ||
reqs.into_iter() | ||
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr)) | ||
.collect() | ||
} | ||
|
||
pub fn notify_entry_info_subscribers(&self, entry: &Entry) { | ||
// TODO: No need to bind(). | ||
let socket = UdpSocket::bind("0.0.0.0:0").expect("bind"); | ||
|
||
// copy subscribers to avoid taking lock while doing io | ||
let addrs = self.entry_info_subscribers.lock().unwrap().clone(); | ||
trace!("Sending to {} addrs", addrs.len()); | ||
for addr in addrs { | ||
let entry_info = EntryInfo { | ||
id: entry.id, | ||
num_hashes: entry.num_hashes, | ||
num_events: entry.events.len() as u64, | ||
}; | ||
let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo"); | ||
trace!("sending {} to {}", data.len(), addr); | ||
//TODO dont do IO here, this needs to be on a separate channel | ||
let res = socket.send_to(&data, addr); | ||
if res.is_err() { | ||
eprintln!("couldn't send response: {:?}", res); | ||
} | ||
} | ||
} | ||
} | ||
|
||
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] | ||
#[derive(Serialize, Deserialize, Debug, Clone)] | ||
pub enum Request { | ||
Transaction(Transaction), | ||
GetBalance { key: PublicKey }, | ||
Subscribe { subscriptions: Vec<Subscription> }, | ||
} | ||
|
||
#[derive(Serialize, Deserialize, Debug, Clone)] | ||
pub enum Subscription { | ||
EntryInfo, | ||
} | ||
|
||
#[derive(Serialize, Deserialize, Debug, Clone)] | ||
pub struct EntryInfo { | ||
pub id: Hash, | ||
pub num_hashes: u64, | ||
pub num_events: u64, | ||
} | ||
|
||
impl Request { | ||
/// Verify the request is valid. | ||
pub fn verify(&self) -> bool { | ||
match *self { | ||
Request::Transaction(ref tr) => tr.verify_plan(), | ||
_ => true, | ||
} | ||
} | ||
} | ||
|
||
#[derive(Serialize, Deserialize, Debug)] | ||
pub enum Response { | ||
Balance { key: PublicKey, val: Option<i64> }, | ||
EntryInfo(EntryInfo), | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use accountant::Accountant; | ||
use entry::Entry; | ||
use event::Event; | ||
use historian::Historian; | ||
use mint::Mint; | ||
use signature::{KeyPair, KeyPairUtil}; | ||
use std::sync::mpsc::sync_channel; | ||
use accounting_stage::AccountingStage; | ||
use transaction::Transaction; | ||
|
||
#[test] | ||
fn test_accounting_sequential_consistency() { | ||
// In this attack we'll demonstrate that a verifier can interpret the ledger | ||
// differently if either the server doesn't signal the ledger to add an | ||
// Entry OR if the verifier tries to parallelize across multiple Entries. | ||
let mint = Mint::new(2); | ||
let acc = Accountant::new(&mint); | ||
let (input, event_receiver) = sync_channel(10); | ||
let historian = Historian::new(event_receiver, &mint.last_id(), None); | ||
let stage = AccountingStage::new(acc, input); | ||
|
||
// Process a batch that includes a transaction that receives two tokens. | ||
let alice = KeyPair::new(); | ||
let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); | ||
let events = vec![Event::Transaction(tr)]; | ||
assert!(stage.process_events(events).is_ok()); | ||
|
||
// Process a second batch that spends one of those tokens. | ||
let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); | ||
let events = vec![Event::Transaction(tr)]; | ||
assert!(stage.process_events(events).is_ok()); | ||
|
||
// Collect the ledger and feed it to a new accountant. | ||
drop(stage.historian_input); | ||
let entries: Vec<Entry> = historian.output.lock().unwrap().iter().collect(); | ||
|
||
// Assert the user holds one token, not two. If the server only output one | ||
// entry, then the second transaction will be rejected, because it drives | ||
// the account balance below zero before the credit is added. | ||
let acc = Accountant::new(&mint); | ||
for entry in entries { | ||
assert!( | ||
acc.process_verified_events(entry.events) | ||
.into_iter() | ||
.all(|x| x.is_ok()) | ||
); | ||
} | ||
assert_eq!(acc.get_balance(&alice.pubkey()), Some(1)); | ||
} | ||
} | ||
|
||
#[cfg(all(feature = "unstable", test))] | ||
mod bench { | ||
extern crate test; | ||
use self::test::Bencher; | ||
use accountant::{Accountant, MAX_ENTRY_IDS}; | ||
use bincode::serialize; | ||
use hash::hash; | ||
use mint::Mint; | ||
use signature::{KeyPair, KeyPairUtil}; | ||
use std::collections::HashSet; | ||
use std::sync::mpsc::sync_channel; | ||
use std::time::Instant; | ||
use accounting_stage::*; | ||
use transaction::Transaction; | ||
|
||
#[bench] | ||
fn process_events_bench(_bencher: &mut Bencher) { | ||
let mint = Mint::new(100_000_000); | ||
let acc = Accountant::new(&mint); | ||
let rsp_addr: SocketAddr = "0.0.0.0:0".parse().expect("socket address"); | ||
// Create transactions between unrelated parties. | ||
let txs = 100_000; | ||
let last_ids: Mutex<HashSet<Hash>> = Mutex::new(HashSet::new()); | ||
let transactions: Vec<_> = (0..txs) | ||
.into_par_iter() | ||
.map(|i| { | ||
// Seed the 'to' account and a cell for its signature. | ||
let dummy_id = i % (MAX_ENTRY_IDS as i32); | ||
let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash | ||
{ | ||
let mut last_ids = last_ids.lock().unwrap(); | ||
if !last_ids.contains(&last_id) { | ||
last_ids.insert(last_id); | ||
acc.register_entry_id(&last_id); | ||
} | ||
} | ||
|
||
// Seed the 'from' account. | ||
let rando0 = KeyPair::new(); | ||
let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id); | ||
acc.process_verified_transaction(&tr).unwrap(); | ||
|
||
let rando1 = KeyPair::new(); | ||
let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id); | ||
acc.process_verified_transaction(&tr).unwrap(); | ||
|
||
// Finally, return a transaction that's unique | ||
Transaction::new(&rando0, rando1.pubkey(), 1, last_id) | ||
}) | ||
.collect(); | ||
|
||
let req_vers = transactions | ||
.into_iter() | ||
.map(|tr| (Request::Transaction(tr), rsp_addr, 1_u8)) | ||
.collect(); | ||
|
||
let (input, event_receiver) = sync_channel(10); | ||
let historian = Historian::new(event_receiver, &mint.last_id(), None); | ||
let stage = AccountingStage::new(acc, input, historian); | ||
|
||
let now = Instant::now(); | ||
assert!(stage.process_events(req_vers).is_ok()); | ||
let duration = now.elapsed(); | ||
let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0; | ||
let tps = txs as f64 / sec; | ||
|
||
// Ensure that all transactions were successfully logged. | ||
drop(stage.historian_input); | ||
let entries: Vec<Entry> = stage.historian.output.lock().unwrap().iter().collect(); | ||
assert_eq!(entries.len(), 1); | ||
assert_eq!(entries[0].events.len(), txs as usize); | ||
|
||
println!("{} tps", tps); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters