Skip to content

Commit

Permalink
Merge pull request #204 from garious/add-accounting-stage
Browse files Browse the repository at this point in the history
TPU cleanup
  • Loading branch information
garious authored May 12, 2018
2 parents bcdb058 + 6264508 commit d7cd80d
Show file tree
Hide file tree
Showing 12 changed files with 592 additions and 470 deletions.
42 changes: 28 additions & 14 deletions src/bin/client-demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn print_usage(program: &str, opts: Options) {
fn main() {
let mut threads = 4usize;
let mut addr: String = "127.0.0.1:8000".to_string();
let mut client_addr: String = "127.0.0.1:8010".to_string();
let mut requests_addr: String = "127.0.0.1:8010".to_string();

let mut opts = Options::new();
opts.optopt("s", "", "server address", "host:port");
Expand All @@ -60,12 +60,16 @@ fn main() {
addr = matches.opt_str("s").unwrap();
}
if matches.opt_present("c") {
client_addr = matches.opt_str("c").unwrap();
requests_addr = matches.opt_str("c").unwrap();
}
if matches.opt_present("t") {
threads = matches.opt_str("t").unwrap().parse().expect("integer");
}

let mut events_addr: SocketAddr = requests_addr.parse().unwrap();
let requests_port = events_addr.port();
events_addr.set_port(requests_port + 1);

if stdin_isatty() {
eprintln!("nothing found on stdin, expected a json file");
exit(1);
Expand All @@ -84,13 +88,16 @@ fn main() {
exit(1);
});

println!("Binding to {}", client_addr);
let socket = UdpSocket::bind(&client_addr).unwrap();
socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap();
let mut accountant = ThinClient::new(addr.parse().unwrap(), socket);
println!("Binding to {}", requests_addr);
let requests_socket = UdpSocket::bind(&requests_addr).unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(5, 0)))
.unwrap();
let events_socket = UdpSocket::bind(&events_addr).unwrap();
let mut client = ThinClient::new(addr.parse().unwrap(), requests_socket, events_socket);

println!("Get last ID...");
let last_id = accountant.get_last_id().wait().unwrap();
let last_id = client.get_last_id().wait().unwrap();
println!("Got last ID {:?}", last_id);

let rnd = GenKeys::new(demo.mint.keypair().public_key_bytes());
Expand Down Expand Up @@ -122,7 +129,7 @@ fn main() {
nsps / 1_000_f64
);

let initial_tx_count = accountant.transaction_count();
let initial_tx_count = client.transaction_count();
println!("initial count {}", initial_tx_count);

println!("Transfering {} transactions in {} batches", txs, threads);
Expand All @@ -131,19 +138,26 @@ fn main() {
let chunks: Vec<_> = transactions.chunks(sz).collect();
chunks.into_par_iter().for_each(|trs| {
println!("Transferring 1 unit {} times... to", trs.len());
let mut client_addr: SocketAddr = client_addr.parse().unwrap();
client_addr.set_port(0);
let socket = UdpSocket::bind(client_addr).unwrap();
let accountant = ThinClient::new(addr.parse().unwrap(), socket);
let mut requests_addr: SocketAddr = requests_addr.parse().unwrap();
requests_addr.set_port(0);
let requests_socket = UdpSocket::bind(requests_addr).unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(5, 0)))
.unwrap();
let mut events_addr: SocketAddr = requests_addr.clone();
let requests_port = events_addr.port();
events_addr.set_port(requests_port + 1);
let events_socket = UdpSocket::bind(&events_addr).unwrap();
let client = ThinClient::new(addr.parse().unwrap(), requests_socket, events_socket);
for tr in trs {
accountant.transfer_signed(tr.clone()).unwrap();
client.transfer_signed(tr.clone()).unwrap();
}
});

println!("Waiting for transactions to complete...",);
let mut tx_count;
for _ in 0..10 {
tx_count = accountant.transaction_count();
tx_count = client.transaction_count();
duration = now.elapsed();
let txs = tx_count - initial_tx_count;
println!("Transactions processed {}", txs);
Expand Down
21 changes: 7 additions & 14 deletions src/bin/testnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ extern crate solana;
use getopts::Options;
use isatty::stdin_isatty;
use solana::accountant::Accountant;
use solana::accounting_stage::AccountingStage;
use solana::crdt::ReplicatedData;
use solana::entry::Entry;
use solana::event::Event;
use solana::event_processor::EventProcessor;
use solana::rpu::Rpu;
use solana::signature::{KeyPair, KeyPairUtil};
use solana::tpu::Tpu;
use std::env;
use std::io::{stdin, stdout, Read};
use std::net::UdpSocket;
Expand Down Expand Up @@ -115,13 +115,13 @@ fn main() {

eprintln!("creating networking stack...");

let accounting_stage = AccountingStage::new(accountant, &last_id, Some(1000));
let event_processor = EventProcessor::new(accountant, &last_id, Some(1000));
let exit = Arc::new(AtomicBool::new(false));
let tpu = Arc::new(Tpu::new(accounting_stage));
let rpu = Rpu::new(event_processor);
let serve_sock = UdpSocket::bind(&serve_addr).unwrap();
let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap();
let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap();
let events_sock = UdpSocket::bind(&events_addr).unwrap();
let _events_sock = UdpSocket::bind(&events_addr).unwrap();
let pubkey = KeyPair::new().pubkey();
let d = ReplicatedData::new(
pubkey,
Expand All @@ -130,15 +130,8 @@ fn main() {
serve_sock.local_addr().unwrap(),
);
eprintln!("starting server...");
let threads = Tpu::serve(
&tpu,
d,
serve_sock,
events_sock,
gossip_sock,
exit.clone(),
stdout(),
).unwrap();
let threads = rpu.serve(d, serve_sock, gossip_sock, exit.clone(), stdout())
.unwrap();
eprintln!("Ready. Listening on {}", serve_addr);
for t in threads {
t.join().expect("join");
Expand Down
2 changes: 1 addition & 1 deletion src/ecdsa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ mod tests {
use bincode::serialize;
use ecdsa;
use packet::{Packet, Packets, SharedPackets};
use request_stage::Request;
use std::sync::RwLock;
use thin_client_service::Request;
use transaction::Transaction;
use transaction::test_tx;

Expand Down
88 changes: 88 additions & 0 deletions src/entry_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
//! The `entry_writer` module helps implement the TPU's write stage.
use entry::Entry;
use event_processor::EventProcessor;
use ledger;
use packet;
use request_stage::RequestProcessor;
use result::Result;
use serde_json;
use std::collections::VecDeque;
use std::io::Write;
use std::io::sink;
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use streamer;

pub struct EntryWriter<'a> {
event_processor: &'a EventProcessor,
request_processor: &'a RequestProcessor,
}

impl<'a> EntryWriter<'a> {
/// Create a new Tpu that wraps the given Accountant.
pub fn new(
event_processor: &'a EventProcessor,
request_processor: &'a RequestProcessor,
) -> Self {
EntryWriter {
event_processor,
request_processor,
}
}

fn write_entry<W: Write>(&self, writer: &Mutex<W>, entry: &Entry) {
trace!("write_entry entry");
self.event_processor.accountant.register_entry_id(&entry.id);
writeln!(
writer.lock().expect("'writer' lock in fn fn write_entry"),
"{}",
serde_json::to_string(&entry).expect("'entry' to_strong in fn write_entry")
).expect("writeln! in fn write_entry");
self.request_processor.notify_entry_info_subscribers(&entry);
}

fn write_entries<W: Write>(
&self,
writer: &Mutex<W>,
entry_receiver: &Receiver<Entry>,
) -> Result<Vec<Entry>> {
//TODO implement a serialize for channel that does this without allocations
let mut l = vec![];
let entry = entry_receiver.recv_timeout(Duration::new(1, 0))?;
self.write_entry(writer, &entry);
l.push(entry);
while let Ok(entry) = entry_receiver.try_recv() {
self.write_entry(writer, &entry);
l.push(entry);
}
Ok(l)
}

/// Process any Entry items that have been published by the Historian.
/// continuosly broadcast blobs of entries out
pub fn write_and_send_entries<W: Write>(
&self,
broadcast: &streamer::BlobSender,
blob_recycler: &packet::BlobRecycler,
writer: &Mutex<W>,
entry_receiver: &Receiver<Entry>,
) -> Result<()> {
let mut q = VecDeque::new();
let list = self.write_entries(writer, entry_receiver)?;
trace!("New blobs? {}", list.len());
ledger::process_entry_list_into_blobs(&list, blob_recycler, &mut q);
if !q.is_empty() {
broadcast.send(q)?;
}
Ok(())
}

/// Process any Entry items that have been published by the Historian.
/// continuosly broadcast blobs of entries out
pub fn drain_entries(&self, entry_receiver: &Receiver<Entry>) -> Result<()> {
self.write_entries(&Arc::new(Mutex::new(sink())), entry_receiver)?;
Ok(())
}
}
48 changes: 21 additions & 27 deletions src/accounting_stage.rs → src/event_processor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! The `accounting_stage` module implements the accounting stage of the TPU.
//! The `event_processor` module implements the accounting stage of the TPU.
use accountant::Accountant;
use entry::Entry;
Expand All @@ -7,81 +7,75 @@ use hash::Hash;
use historian::Historian;
use recorder::Signal;
use result::Result;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::mpsc::{channel, Sender};
use std::sync::{Arc, Mutex};

pub struct AccountingStage {
pub output: Mutex<Receiver<Entry>>,
entry_sender: Mutex<Sender<Entry>>,
pub struct EventProcessor {
pub accountant: Arc<Accountant>,
historian_input: Mutex<Sender<Signal>>,
historian: Mutex<Historian>,
}

impl AccountingStage {
/// Create a new Tpu that wraps the given Accountant.
impl EventProcessor {
/// Create a new stage of the TPU for event and transaction processing
pub fn new(accountant: Accountant, start_hash: &Hash, ms_per_tick: Option<u64>) -> Self {
let (historian_input, event_receiver) = channel();
let historian = Historian::new(event_receiver, start_hash, ms_per_tick);
let (entry_sender, output) = channel();
AccountingStage {
output: Mutex::new(output),
entry_sender: Mutex::new(entry_sender),
EventProcessor {
accountant: Arc::new(accountant),
historian_input: Mutex::new(historian_input),
historian: Mutex::new(historian),
}
}

/// Process the transactions in parallel and then log the successful ones.
pub fn process_events(&self, events: Vec<Event>) -> Result<()> {
pub fn process_events(&self, events: Vec<Event>) -> Result<Entry> {
let historian = self.historian.lock().unwrap();
let results = self.accountant.process_verified_events(events);
let events = results.into_iter().filter_map(|x| x.ok()).collect();
let sender = self.historian_input.lock().unwrap();
sender.send(Signal::Events(events))?;

// Wait for the historian to tag our Events with an ID and then register it.
let entry = historian.output.lock().unwrap().recv()?;
let entry = historian.entry_receiver.lock().unwrap().recv()?;
self.accountant.register_entry_id(&entry.id);
self.entry_sender.lock().unwrap().send(entry)?;
Ok(())
Ok(entry)
}
}

#[cfg(test)]
mod tests {
use accountant::Accountant;
use accounting_stage::AccountingStage;
use entry::Entry;
use event::Event;
use event_processor::EventProcessor;
use mint::Mint;
use signature::{KeyPair, KeyPairUtil};
use transaction::Transaction;

#[test]
// TODO: Move this test accounting_stage. Calling process_events() directly
// defeats the purpose of this test.
fn test_accounting_sequential_consistency() {
// In this attack we'll demonstrate that a verifier can interpret the ledger
// differently if either the server doesn't signal the ledger to add an
// Entry OR if the verifier tries to parallelize across multiple Entries.
let mint = Mint::new(2);
let accountant = Accountant::new(&mint);
let accounting_stage = AccountingStage::new(accountant, &mint.last_id(), None);
let event_processor = EventProcessor::new(accountant, &mint.last_id(), None);

// Process a batch that includes a transaction that receives two tokens.
let alice = KeyPair::new();
let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
let events = vec![Event::Transaction(tr)];
assert!(accounting_stage.process_events(events).is_ok());
let entry0 = event_processor.process_events(events).unwrap();

// Process a second batch that spends one of those tokens.
let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
let events = vec![Event::Transaction(tr)];
assert!(accounting_stage.process_events(events).is_ok());
let entry1 = event_processor.process_events(events).unwrap();

// Collect the ledger and feed it to a new accountant.
drop(accounting_stage.entry_sender);
let entries: Vec<Entry> = accounting_stage.output.lock().unwrap().iter().collect();
let entries = vec![entry0, entry1];

// Assert the user holds one token, not two. If the server only output one
// entry, then the second transaction will be rejected, because it drives
Expand All @@ -104,8 +98,8 @@ mod bench {
extern crate test;
use self::test::Bencher;
use accountant::{Accountant, MAX_ENTRY_IDS};
use accounting_stage::*;
use bincode::serialize;
use event_processor::*;
use hash::hash;
use mint::Mint;
use rayon::prelude::*;
Expand Down Expand Up @@ -154,17 +148,17 @@ mod bench {
.map(|tr| Event::Transaction(tr))
.collect();

let accounting_stage = AccountingStage::new(accountant, &mint.last_id(), None);
let event_processor = EventProcessor::new(accountant, &mint.last_id(), None);

let now = Instant::now();
assert!(accounting_stage.process_events(events).is_ok());
assert!(event_processor.process_events(events).is_ok());
let duration = now.elapsed();
let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
let tps = txs as f64 / sec;

// Ensure that all transactions were successfully logged.
drop(accounting_stage.historian_input);
let entries: Vec<Entry> = accounting_stage.output.lock().unwrap().iter().collect();
drop(event_processor.historian_input);
let entries: Vec<Entry> = event_processor.output.lock().unwrap().iter().collect();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].events.len(), txs as usize);

Expand Down
Loading

0 comments on commit d7cd80d

Please sign in to comment.