Skip to content

Commit

Permalink
Move Accounting stage functionality into its own object
Browse files Browse the repository at this point in the history
  • Loading branch information
garious committed May 9, 2018
1 parent 9040c04 commit d44a6f7
Showing 1 changed file with 120 additions and 105 deletions.
225 changes: 120 additions & 105 deletions src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,98 @@ use streamer;
use timing;
use transaction::Transaction;

pub struct Tpu {
struct AccountingStage {
acc: Mutex<Accountant>,
historian_input: Mutex<SyncSender<Signal>>,
historian: Historian,
entry_info_subscribers: Mutex<Vec<SocketAddr>>,
}

impl AccountingStage {
/// Create a new Tpu that wraps the given Accountant.
pub fn new(acc: Accountant, historian_input: SyncSender<Signal>, historian: Historian) -> Self {
AccountingStage {
acc: Mutex::new(acc),
entry_info_subscribers: Mutex::new(vec![]),
historian_input: Mutex::new(historian_input),
historian,
}
}

/// Process the transactions in parallel and then log the successful ones.
pub fn process_events(&self, events: Vec<Event>) -> Result<()> {
let results = self.acc.lock().unwrap().process_verified_events(events);
let events = results.into_iter().filter_map(|x| x.ok()).collect();
let sender = self.historian_input.lock().unwrap();
sender.send(Signal::Events(events))?;
debug!("after historian_input");
Ok(())
}

/// Process Request items sent by clients.
fn process_request(
&self,
msg: Request,
rsp_addr: SocketAddr,
) -> Option<(Response, SocketAddr)> {
match msg {
Request::GetBalance { key } => {
let val = self.acc.lock().unwrap().get_balance(&key);
let rsp = (Response::Balance { key, val }, rsp_addr);
info!("Response::Balance {:?}", rsp);
Some(rsp)
}
Request::Transaction(_) => unreachable!(),
Request::Subscribe { subscriptions } => {
for subscription in subscriptions {
match subscription {
Subscription::EntryInfo => {
self.entry_info_subscribers.lock().unwrap().push(rsp_addr)
}
}
}
None
}
}
}

pub fn process_requests(
&self,
reqs: Vec<(Request, SocketAddr)>,
) -> Vec<(Response, SocketAddr)> {
reqs.into_iter()
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr))
.collect()
}

pub fn notify_entry_info_subscribers(&self, entry: &Entry) {
// TODO: No need to bind().
let socket = UdpSocket::bind("0.0.0.0:0").expect("bind");

// copy subscribers to avoid taking lock while doing io
let addrs = self.entry_info_subscribers.lock().unwrap().clone();
trace!("Sending to {} addrs", addrs.len());
for addr in addrs {
let entry_info = EntryInfo {
id: entry.id,
num_hashes: entry.num_hashes,
num_events: entry.events.len() as u64,
};
let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo");
trace!("sending {} to {}", data.len(), addr);
//TODO dont do IO here, this needs to be on a separate channel
let res = socket.send_to(&data, addr);
if res.is_err() {
eprintln!("couldn't send response: {:?}", res);
}
}
}
}

pub struct Tpu {
accounting: AccountingStage,
}

#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Request {
Expand Down Expand Up @@ -80,59 +165,37 @@ pub enum Response {
impl Tpu {
/// Create a new Tpu that wraps the given Accountant.
pub fn new(acc: Accountant, historian_input: SyncSender<Signal>, historian: Historian) -> Self {
Tpu {
acc: Mutex::new(acc),
entry_info_subscribers: Mutex::new(vec![]),
historian_input: Mutex::new(historian_input),
historian,
}
}

fn notify_entry_info_subscribers(obj: &SharedTpu, entry: &Entry) {
// TODO: No need to bind().
let socket = UdpSocket::bind("0.0.0.0:0").expect("bind");

// copy subscribers to avoid taking lock while doing io
let addrs = obj.entry_info_subscribers.lock().unwrap().clone();
trace!("Sending to {} addrs", addrs.len());
for addr in addrs {
let entry_info = EntryInfo {
id: entry.id,
num_hashes: entry.num_hashes,
num_events: entry.events.len() as u64,
};
let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo");
trace!("sending {} to {}", data.len(), addr);
//TODO dont do IO here, this needs to be on a separate channel
let res = socket.send_to(&data, addr);
if res.is_err() {
eprintln!("couldn't send response: {:?}", res);
}
}
let accounting = AccountingStage::new(acc, historian_input, historian);
Tpu { accounting }
}

fn update_entry<W: Write>(obj: &SharedTpu, writer: &Arc<Mutex<W>>, entry: &Entry) {
trace!("update_entry entry");
obj.acc.lock().unwrap().register_entry_id(&entry.id);
obj.accounting
.acc
.lock()
.unwrap()
.register_entry_id(&entry.id);
writeln!(
writer.lock().unwrap(),
"{}",
serde_json::to_string(&entry).unwrap()
).unwrap();
Self::notify_entry_info_subscribers(obj, &entry);
obj.accounting.notify_entry_info_subscribers(&entry);
}

fn receive_all<W: Write>(obj: &SharedTpu, writer: &Arc<Mutex<W>>) -> Result<Vec<Entry>> {
//TODO implement a serialize for channel that does this without allocations
let mut l = vec![];
let entry = obj.historian
let entry = obj.accounting
.historian
.output
.lock()
.unwrap()
.recv_timeout(Duration::new(1, 0))?;
Self::update_entry(obj, writer, &entry);
l.push(entry);
while let Ok(entry) = obj.historian.receive() {
while let Ok(entry) = obj.accounting.historian.receive() {
Self::update_entry(obj, writer, &entry);
l.push(entry);
}
Expand Down Expand Up @@ -247,33 +310,6 @@ impl Tpu {
})
}

/// Process Request items sent by clients.
pub fn process_request(
&self,
msg: Request,
rsp_addr: SocketAddr,
) -> Option<(Response, SocketAddr)> {
match msg {
Request::GetBalance { key } => {
let val = self.acc.lock().unwrap().get_balance(&key);
let rsp = (Response::Balance { key, val }, rsp_addr);
info!("Response::Balance {:?}", rsp);
Some(rsp)
}
Request::Transaction(_) => unreachable!(),
Request::Subscribe { subscriptions } => {
for subscription in subscriptions {
match subscription {
Subscription::EntryInfo => {
self.entry_info_subscribers.lock().unwrap().push(rsp_addr)
}
}
}
None
}
}
}

fn recv_batch(recvr: &streamer::PacketReceiver) -> Result<(Vec<SharedPackets>, usize)> {
let timer = Duration::new(1, 0);
let msgs = recvr.recv_timeout(timer)?;
Expand Down Expand Up @@ -365,22 +401,6 @@ impl Tpu {
(events, reqs)
}

/// Process the transactions in parallel and then log the successful ones.
fn process_events(&self, events: Vec<Event>) -> Result<()> {
let results = self.acc.lock().unwrap().process_verified_events(events);
let events = results.into_iter().filter_map(|x| x.ok()).collect();
let sender = self.historian_input.lock().unwrap();
sender.send(Signal::Events(events))?;
debug!("after historian_input");
Ok(())
}

fn process_requests(&self, reqs: Vec<(Request, SocketAddr)>) -> Vec<(Response, SocketAddr)> {
reqs.into_iter()
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr))
.collect()
}

fn serialize_response(
resp: Response,
rsp_addr: SocketAddr,
Expand Down Expand Up @@ -445,11 +465,11 @@ impl Tpu {
debug!("events: {} reqs: {}", events.len(), reqs.len());

debug!("process_events");
obj.process_events(events)?;
obj.accounting.process_events(events)?;
debug!("done process_events");

debug!("process_requests");
let rsps = obj.process_requests(reqs);
let rsps = obj.accounting.process_requests(reqs);
debug!("done process_requests");

let blobs = Self::serialize_responses(rsps, blob_recycler)?;
Expand Down Expand Up @@ -485,7 +505,7 @@ impl Tpu {
for msgs in &blobs {
let blob = msgs.read().unwrap();
let entries: Vec<Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
let acc = obj.acc.lock().unwrap();
let acc = obj.accounting.acc.lock().unwrap();
for entry in entries {
acc.register_entry_id(&entry.id);
for result in acc.process_verified_events(entry.events) {
Expand Down Expand Up @@ -807,7 +827,7 @@ mod tests {
use std::time::Duration;
use streamer;
use thin_client::ThinClient;
use tpu::Tpu;
use tpu::{AccountingStage, Tpu};
use transaction::Transaction;

#[test]
Expand Down Expand Up @@ -845,22 +865,22 @@ mod tests {
let acc = Accountant::new(&mint);
let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &mint.last_id(), None);
let tpu = Tpu::new(acc, input, historian);
let stage = AccountingStage::new(acc, input, historian);

// Process a batch that includes a transaction that receives two tokens.
let alice = KeyPair::new();
let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
let events = vec![Event::Transaction(tr)];
assert!(tpu.process_events(events).is_ok());
assert!(stage.process_events(events).is_ok());

// Process a second batch that spends one of those tokens.
let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
let events = vec![Event::Transaction(tr)];
assert!(tpu.process_events(events).is_ok());
assert!(stage.process_events(events).is_ok());

// Collect the ledger and feed it to a new accountant.
drop(tpu.historian_input);
let entries: Vec<Entry> = tpu.historian.output.lock().unwrap().iter().collect();
drop(stage.historian_input);
let entries: Vec<Entry> = stage.historian.output.lock().unwrap().iter().collect();

// Assert the user holds one token, not two. If the server only output one
// entry, then the second transaction will be rejected, because it drives
Expand Down Expand Up @@ -993,10 +1013,10 @@ mod tests {
let acc = Accountant::new(&alice);
let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
let acc = Arc::new(Tpu::new(acc, input, historian));
let tpu = Arc::new(Tpu::new(acc, input, historian));
let replicate_addr = target1_data.replicate_addr;
let threads = Tpu::replicate(
&acc,
&tpu,
target1_data,
target1_gossip,
target1_serve,
Expand All @@ -1018,9 +1038,11 @@ mod tests {
w.set_index(i).unwrap();
w.set_id(leader_id).unwrap();

let acc = tpu.accounting.acc.lock().unwrap();

let tr0 = Event::new_timestamp(&bob_keypair, Utc::now());
let entry0 = entry::create_entry(&cur_hash, i, vec![tr0]);
acc.acc.lock().unwrap().register_entry_id(&cur_hash);
acc.register_entry_id(&cur_hash);
cur_hash = hash(&cur_hash);

let tr1 = Transaction::new(
Expand All @@ -1029,11 +1051,11 @@ mod tests {
transfer_amount,
cur_hash,
);
acc.acc.lock().unwrap().register_entry_id(&cur_hash);
acc.register_entry_id(&cur_hash);
cur_hash = hash(&cur_hash);
let entry1 =
entry::create_entry(&cur_hash, i + num_blobs, vec![Event::Transaction(tr1)]);
acc.acc.lock().unwrap().register_entry_id(&cur_hash);
acc.register_entry_id(&cur_hash);
cur_hash = hash(&cur_hash);

alice_ref_balance -= transfer_amount;
Expand All @@ -1058,18 +1080,11 @@ mod tests {
msgs.push(msg);
}

let alice_balance = acc.acc
.lock()
.unwrap()
.get_balance(&alice.keypair().pubkey())
.unwrap();
let acc = tpu.accounting.acc.lock().unwrap();
let alice_balance = acc.get_balance(&alice.keypair().pubkey()).unwrap();
assert_eq!(alice_balance, alice_ref_balance);

let bob_balance = acc.acc
.lock()
.unwrap()
.get_balance(&bob_keypair.pubkey())
.unwrap();
let bob_balance = acc.get_balance(&bob_keypair.pubkey()).unwrap();
assert_eq!(bob_balance, starting_balance - alice_ref_balance);

exit.store(true, Ordering::Relaxed);
Expand Down Expand Up @@ -1164,17 +1179,17 @@ mod bench {

let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &mint.last_id(), None);
let tpu = Tpu::new(acc, input, historian);
let stage = AccountingStage::new(acc, input, historian);

let now = Instant::now();
assert!(tpu.process_events(req_vers).is_ok());
assert!(stage.process_events(req_vers).is_ok());
let duration = now.elapsed();
let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
let tps = txs as f64 / sec;

// Ensure that all transactions were successfully logged.
drop(tpu.historian_input);
let entries: Vec<Entry> = tpu.historian.output.lock().unwrap().iter().collect();
drop(stage.historian_input);
let entries: Vec<Entry> = stage.historian.output.lock().unwrap().iter().collect();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].events.len(), txs as usize);

Expand Down

0 comments on commit d44a6f7

Please sign in to comment.