Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TPU cleanup #204

Merged
merged 25 commits into from
May 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
95bf68f
Correct some strange naming
garious May 11, 2018
600a1f8
Initialize thin client with events port
garious May 11, 2018
5510085
Better names
garious May 11, 2018
f0be595
Create function for thin client thread
garious May 11, 2018
0488d0a
Extract sig verify functions
garious May 12, 2018
3cedbc4
Reorder to reflect the pipeline order
garious May 12, 2018
765d901
Better names
garious May 12, 2018
b781fdb
Reorganize
garious May 12, 2018
3c11a91
Cleanup verifier error handling
garious May 12, 2018
1960788
Move sig verification stage into its own module
garious May 12, 2018
ca80bc3
Move the writer stage's utilities to its own module
garious May 12, 2018
cd96843
Free up name ThinClientService
garious May 12, 2018
d2f95d5
Move thin client service thread into thin_client_service.rs
garious May 12, 2018
2376dfc
Let thin client own the receiver channel
garious May 12, 2018
73abea0
No need for TPU dependency
garious May 12, 2018
b4ca414
More object-oriented
garious May 12, 2018
7ab3331
Move validation processor to its own module
garious May 12, 2018
898f497
Free up name 'thin_client_service'
garious May 12, 2018
421d9aa
Free up the name 'tpu'
garious May 12, 2018
4180571
Don't pass events_socket to RPU
garious May 12, 2018
3d82807
Delete dead code
garious May 12, 2018
1511dc4
Move RequestProcessor out of Rpu/Tvu state
garious May 12, 2018
a3d2831
Free up the name 'accounting_stage'
garious May 12, 2018
a3869dd
Move entry_receiver to RequestStage
garious May 12, 2018
6264508
Consistent naming of senders and receivers
garious May 12, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 28 additions & 14 deletions src/bin/client-demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn print_usage(program: &str, opts: Options) {
fn main() {
let mut threads = 4usize;
let mut addr: String = "127.0.0.1:8000".to_string();
let mut client_addr: String = "127.0.0.1:8010".to_string();
let mut requests_addr: String = "127.0.0.1:8010".to_string();

let mut opts = Options::new();
opts.optopt("s", "", "server address", "host:port");
Expand All @@ -60,12 +60,16 @@ fn main() {
addr = matches.opt_str("s").unwrap();
}
if matches.opt_present("c") {
client_addr = matches.opt_str("c").unwrap();
requests_addr = matches.opt_str("c").unwrap();
}
if matches.opt_present("t") {
threads = matches.opt_str("t").unwrap().parse().expect("integer");
}

let mut events_addr: SocketAddr = requests_addr.parse().unwrap();
let requests_port = events_addr.port();
events_addr.set_port(requests_port + 1);

if stdin_isatty() {
eprintln!("nothing found on stdin, expected a json file");
exit(1);
Expand All @@ -84,13 +88,16 @@ fn main() {
exit(1);
});

println!("Binding to {}", client_addr);
let socket = UdpSocket::bind(&client_addr).unwrap();
socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap();
let mut accountant = ThinClient::new(addr.parse().unwrap(), socket);
println!("Binding to {}", requests_addr);
let requests_socket = UdpSocket::bind(&requests_addr).unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(5, 0)))
.unwrap();
let events_socket = UdpSocket::bind(&events_addr).unwrap();
let mut client = ThinClient::new(addr.parse().unwrap(), requests_socket, events_socket);

println!("Get last ID...");
let last_id = accountant.get_last_id().wait().unwrap();
let last_id = client.get_last_id().wait().unwrap();
println!("Got last ID {:?}", last_id);

let rnd = GenKeys::new(demo.mint.keypair().public_key_bytes());
Expand Down Expand Up @@ -122,7 +129,7 @@ fn main() {
nsps / 1_000_f64
);

let initial_tx_count = accountant.transaction_count();
let initial_tx_count = client.transaction_count();
println!("initial count {}", initial_tx_count);

println!("Transfering {} transactions in {} batches", txs, threads);
Expand All @@ -131,19 +138,26 @@ fn main() {
let chunks: Vec<_> = transactions.chunks(sz).collect();
chunks.into_par_iter().for_each(|trs| {
println!("Transferring 1 unit {} times... to", trs.len());
let mut client_addr: SocketAddr = client_addr.parse().unwrap();
client_addr.set_port(0);
let socket = UdpSocket::bind(client_addr).unwrap();
let accountant = ThinClient::new(addr.parse().unwrap(), socket);
let mut requests_addr: SocketAddr = requests_addr.parse().unwrap();
requests_addr.set_port(0);
let requests_socket = UdpSocket::bind(requests_addr).unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(5, 0)))
.unwrap();
let mut events_addr: SocketAddr = requests_addr.clone();
let requests_port = events_addr.port();
events_addr.set_port(requests_port + 1);
let events_socket = UdpSocket::bind(&events_addr).unwrap();
let client = ThinClient::new(addr.parse().unwrap(), requests_socket, events_socket);
for tr in trs {
accountant.transfer_signed(tr.clone()).unwrap();
client.transfer_signed(tr.clone()).unwrap();
}
});

println!("Waiting for transactions to complete...",);
let mut tx_count;
for _ in 0..10 {
tx_count = accountant.transaction_count();
tx_count = client.transaction_count();
duration = now.elapsed();
let txs = tx_count - initial_tx_count;
println!("Transactions processed {}", txs);
Expand Down
21 changes: 7 additions & 14 deletions src/bin/testnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ extern crate solana;
use getopts::Options;
use isatty::stdin_isatty;
use solana::accountant::Accountant;
use solana::accounting_stage::AccountingStage;
use solana::crdt::ReplicatedData;
use solana::entry::Entry;
use solana::event::Event;
use solana::event_processor::EventProcessor;
use solana::rpu::Rpu;
use solana::signature::{KeyPair, KeyPairUtil};
use solana::tpu::Tpu;
use std::env;
use std::io::{stdin, stdout, Read};
use std::net::UdpSocket;
Expand Down Expand Up @@ -115,13 +115,13 @@ fn main() {

eprintln!("creating networking stack...");

let accounting_stage = AccountingStage::new(accountant, &last_id, Some(1000));
let event_processor = EventProcessor::new(accountant, &last_id, Some(1000));
let exit = Arc::new(AtomicBool::new(false));
let tpu = Arc::new(Tpu::new(accounting_stage));
let rpu = Rpu::new(event_processor);
let serve_sock = UdpSocket::bind(&serve_addr).unwrap();
let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap();
let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap();
let events_sock = UdpSocket::bind(&events_addr).unwrap();
let _events_sock = UdpSocket::bind(&events_addr).unwrap();
let pubkey = KeyPair::new().pubkey();
let d = ReplicatedData::new(
pubkey,
Expand All @@ -130,15 +130,8 @@ fn main() {
serve_sock.local_addr().unwrap(),
);
eprintln!("starting server...");
let threads = Tpu::serve(
&tpu,
d,
serve_sock,
events_sock,
gossip_sock,
exit.clone(),
stdout(),
).unwrap();
let threads = rpu.serve(d, serve_sock, gossip_sock, exit.clone(), stdout())
.unwrap();
eprintln!("Ready. Listening on {}", serve_addr);
for t in threads {
t.join().expect("join");
Expand Down
2 changes: 1 addition & 1 deletion src/ecdsa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ mod tests {
use bincode::serialize;
use ecdsa;
use packet::{Packet, Packets, SharedPackets};
use request_stage::Request;
use std::sync::RwLock;
use thin_client_service::Request;
use transaction::Transaction;
use transaction::test_tx;

Expand Down
88 changes: 88 additions & 0 deletions src/entry_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
//! The `entry_writer` module helps implement the TPU's write stage.

use entry::Entry;
use event_processor::EventProcessor;
use ledger;
use packet;
use request_stage::RequestProcessor;
use result::Result;
use serde_json;
use std::collections::VecDeque;
use std::io::Write;
use std::io::sink;
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use streamer;

pub struct EntryWriter<'a> {
event_processor: &'a EventProcessor,
request_processor: &'a RequestProcessor,
}

impl<'a> EntryWriter<'a> {
/// Create a new Tpu that wraps the given Accountant.
pub fn new(
event_processor: &'a EventProcessor,
request_processor: &'a RequestProcessor,
) -> Self {
EntryWriter {
event_processor,
request_processor,
}
}

fn write_entry<W: Write>(&self, writer: &Mutex<W>, entry: &Entry) {
trace!("write_entry entry");
self.event_processor.accountant.register_entry_id(&entry.id);
writeln!(
writer.lock().expect("'writer' lock in fn fn write_entry"),
"{}",
serde_json::to_string(&entry).expect("'entry' to_strong in fn write_entry")
).expect("writeln! in fn write_entry");
self.request_processor.notify_entry_info_subscribers(&entry);
}

fn write_entries<W: Write>(
&self,
writer: &Mutex<W>,
entry_receiver: &Receiver<Entry>,
) -> Result<Vec<Entry>> {
//TODO implement a serialize for channel that does this without allocations
let mut l = vec![];
let entry = entry_receiver.recv_timeout(Duration::new(1, 0))?;
self.write_entry(writer, &entry);
l.push(entry);
while let Ok(entry) = entry_receiver.try_recv() {
self.write_entry(writer, &entry);
l.push(entry);
}
Ok(l)
}

/// Process any Entry items that have been published by the Historian.
/// continuosly broadcast blobs of entries out
pub fn write_and_send_entries<W: Write>(
&self,
broadcast: &streamer::BlobSender,
blob_recycler: &packet::BlobRecycler,
writer: &Mutex<W>,
entry_receiver: &Receiver<Entry>,
) -> Result<()> {
let mut q = VecDeque::new();
let list = self.write_entries(writer, entry_receiver)?;
trace!("New blobs? {}", list.len());
ledger::process_entry_list_into_blobs(&list, blob_recycler, &mut q);
if !q.is_empty() {
broadcast.send(q)?;
}
Ok(())
}

/// Process any Entry items that have been published by the Historian.
/// continuosly broadcast blobs of entries out
pub fn drain_entries(&self, entry_receiver: &Receiver<Entry>) -> Result<()> {
self.write_entries(&Arc::new(Mutex::new(sink())), entry_receiver)?;
Ok(())
}
}
48 changes: 21 additions & 27 deletions src/accounting_stage.rs → src/event_processor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! The `accounting_stage` module implements the accounting stage of the TPU.
//! The `event_processor` module implements the accounting stage of the TPU.

use accountant::Accountant;
use entry::Entry;
Expand All @@ -7,81 +7,75 @@ use hash::Hash;
use historian::Historian;
use recorder::Signal;
use result::Result;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::mpsc::{channel, Sender};
use std::sync::{Arc, Mutex};

pub struct AccountingStage {
pub output: Mutex<Receiver<Entry>>,
entry_sender: Mutex<Sender<Entry>>,
pub struct EventProcessor {
pub accountant: Arc<Accountant>,
historian_input: Mutex<Sender<Signal>>,
historian: Mutex<Historian>,
}

impl AccountingStage {
/// Create a new Tpu that wraps the given Accountant.
impl EventProcessor {
/// Create a new stage of the TPU for event and transaction processing
pub fn new(accountant: Accountant, start_hash: &Hash, ms_per_tick: Option<u64>) -> Self {
let (historian_input, event_receiver) = channel();
let historian = Historian::new(event_receiver, start_hash, ms_per_tick);
let (entry_sender, output) = channel();
AccountingStage {
output: Mutex::new(output),
entry_sender: Mutex::new(entry_sender),
EventProcessor {
accountant: Arc::new(accountant),
historian_input: Mutex::new(historian_input),
historian: Mutex::new(historian),
}
}

/// Process the transactions in parallel and then log the successful ones.
pub fn process_events(&self, events: Vec<Event>) -> Result<()> {
pub fn process_events(&self, events: Vec<Event>) -> Result<Entry> {
let historian = self.historian.lock().unwrap();
let results = self.accountant.process_verified_events(events);
let events = results.into_iter().filter_map(|x| x.ok()).collect();
let sender = self.historian_input.lock().unwrap();
sender.send(Signal::Events(events))?;

// Wait for the historian to tag our Events with an ID and then register it.
let entry = historian.output.lock().unwrap().recv()?;
let entry = historian.entry_receiver.lock().unwrap().recv()?;
self.accountant.register_entry_id(&entry.id);
self.entry_sender.lock().unwrap().send(entry)?;
Ok(())
Ok(entry)
}
}

#[cfg(test)]
mod tests {
use accountant::Accountant;
use accounting_stage::AccountingStage;
use entry::Entry;
use event::Event;
use event_processor::EventProcessor;
use mint::Mint;
use signature::{KeyPair, KeyPairUtil};
use transaction::Transaction;

#[test]
// TODO: Move this test accounting_stage. Calling process_events() directly
// defeats the purpose of this test.
fn test_accounting_sequential_consistency() {
// In this attack we'll demonstrate that a verifier can interpret the ledger
// differently if either the server doesn't signal the ledger to add an
// Entry OR if the verifier tries to parallelize across multiple Entries.
let mint = Mint::new(2);
let accountant = Accountant::new(&mint);
let accounting_stage = AccountingStage::new(accountant, &mint.last_id(), None);
let event_processor = EventProcessor::new(accountant, &mint.last_id(), None);

// Process a batch that includes a transaction that receives two tokens.
let alice = KeyPair::new();
let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
let events = vec![Event::Transaction(tr)];
assert!(accounting_stage.process_events(events).is_ok());
let entry0 = event_processor.process_events(events).unwrap();

// Process a second batch that spends one of those tokens.
let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
let events = vec![Event::Transaction(tr)];
assert!(accounting_stage.process_events(events).is_ok());
let entry1 = event_processor.process_events(events).unwrap();

// Collect the ledger and feed it to a new accountant.
drop(accounting_stage.entry_sender);
let entries: Vec<Entry> = accounting_stage.output.lock().unwrap().iter().collect();
let entries = vec![entry0, entry1];

// Assert the user holds one token, not two. If the server only output one
// entry, then the second transaction will be rejected, because it drives
Expand All @@ -104,8 +98,8 @@ mod bench {
extern crate test;
use self::test::Bencher;
use accountant::{Accountant, MAX_ENTRY_IDS};
use accounting_stage::*;
use bincode::serialize;
use event_processor::*;
use hash::hash;
use mint::Mint;
use rayon::prelude::*;
Expand Down Expand Up @@ -154,17 +148,17 @@ mod bench {
.map(|tr| Event::Transaction(tr))
.collect();

let accounting_stage = AccountingStage::new(accountant, &mint.last_id(), None);
let event_processor = EventProcessor::new(accountant, &mint.last_id(), None);

let now = Instant::now();
assert!(accounting_stage.process_events(events).is_ok());
assert!(event_processor.process_events(events).is_ok());
let duration = now.elapsed();
let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
let tps = txs as f64 / sec;

// Ensure that all transactions were successfully logged.
drop(accounting_stage.historian_input);
let entries: Vec<Entry> = accounting_stage.output.lock().unwrap().iter().collect();
drop(event_processor.historian_input);
let entries: Vec<Entry> = event_processor.output.lock().unwrap().iter().collect();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].events.len(), txs as usize);

Expand Down
Loading