Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tpu #220

Merged
merged 18 commits into from
May 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 253 additions & 0 deletions src/banking_stage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
//! The `banking_stage` processes Event messages.

use bank::Bank;
use bincode::deserialize;
use event::Event;
use packet;
use packet::SharedPackets;
use rayon::prelude::*;
use recorder::Signal;
use result::Result;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use std::time::Instant;
use timing;

pub struct BankingStage {
pub thread_hdl: JoinHandle<()>,
pub signal_receiver: Receiver<Signal>,
}

impl BankingStage {
pub fn new(
bank: Arc<Bank>,
exit: Arc<AtomicBool>,
verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>,
packet_recycler: packet::PacketRecycler,
) -> Self {
let (signal_sender, signal_receiver) = channel();
let thread_hdl = spawn(move || loop {
let e = Self::process_packets(
bank.clone(),
&verified_receiver,
&signal_sender,
&packet_recycler,
);
if e.is_err() {
if exit.load(Ordering::Relaxed) {
break;
}
}
});
BankingStage {
thread_hdl,
signal_receiver,
}
}

fn deserialize_events(p: &packet::Packets) -> Vec<Option<(Event, SocketAddr)>> {
p.packets
.par_iter()
.map(|x| {
deserialize(&x.data[0..x.meta.size])
.map(|req| (req, x.meta.addr()))
.ok()
})
.collect()
}

fn process_packets(
bank: Arc<Bank>,
verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
signal_sender: &Sender<Signal>,
packet_recycler: &packet::PacketRecycler,
) -> Result<()> {
let timer = Duration::new(1, 0);
let recv_start = Instant::now();
let mms = verified_receiver.recv_timeout(timer)?;
let mut reqs_len = 0;
let mms_len = mms.len();
info!(
"@{:?} process start stalled for: {:?}ms batches: {}",
timing::timestamp(),
timing::duration_as_ms(&recv_start.elapsed()),
mms.len(),
);
let proc_start = Instant::now();
for (msgs, vers) in mms {
let events = Self::deserialize_events(&msgs.read().unwrap());
reqs_len += events.len();
let events = events
.into_iter()
.zip(vers)
.filter_map(|(event, ver)| match event {
None => None,
Some((event, _addr)) => if event.verify() && ver != 0 {
Some(event)
} else {
None
},
})
.collect();

debug!("process_events");
let results = bank.process_verified_events(events);
let events = results.into_iter().filter_map(|x| x.ok()).collect();
signal_sender.send(Signal::Events(events))?;
debug!("done process_events");

packet_recycler.recycle(msgs);
}
let total_time_s = timing::duration_as_s(&proc_start.elapsed());
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());
info!(
"@{:?} done processing event batches: {} time: {:?}ms reqs: {} reqs/s: {}",
timing::timestamp(),
mms_len,
total_time_ms,
reqs_len,
(reqs_len as f32) / (total_time_s)
);
Ok(())
}
}

// TODO: When banking is pulled out of RequestStage, add this test back in.

//use bank::Bank;
//use entry::Entry;
//use event::Event;
//use hash::Hash;
//use record_stage::RecordStage;
//use recorder::Signal;
//use result::Result;
//use std::sync::mpsc::{channel, Sender};
//use std::sync::{Arc, Mutex};
//use std::time::Duration;
//
//#[cfg(test)]
//mod tests {
// use bank::Bank;
// use event::Event;
// use event_processor::EventProcessor;
// use mint::Mint;
// use signature::{KeyPair, KeyPairUtil};
// use transaction::Transaction;
//
// #[test]
// // TODO: Move this test banking_stage. Calling process_events() directly
// // defeats the purpose of this test.
// fn test_banking_sequential_consistency() {
// // In this attack we'll demonstrate that a verifier can interpret the ledger
// // differently if either the server doesn't signal the ledger to add an
// // Entry OR if the verifier tries to parallelize across multiple Entries.
// let mint = Mint::new(2);
// let bank = Bank::new(&mint);
// let event_processor = EventProcessor::new(bank, &mint.last_id(), None);
//
// // Process a batch that includes a transaction that receives two tokens.
// let alice = KeyPair::new();
// let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
// let events = vec![Event::Transaction(tr)];
// let entry0 = event_processor.process_events(events).unwrap();
//
// // Process a second batch that spends one of those tokens.
// let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
// let events = vec![Event::Transaction(tr)];
// let entry1 = event_processor.process_events(events).unwrap();
//
// // Collect the ledger and feed it to a new bank.
// let entries = vec![entry0, entry1];
//
// // Assert the user holds one token, not two. If the server only output one
// // entry, then the second transaction will be rejected, because it drives
// // the account balance below zero before the credit is added.
// let bank = Bank::new(&mint);
// for entry in entries {
// assert!(
// bank
// .process_verified_events(entry.events)
// .into_iter()
// .all(|x| x.is_ok())
// );
// }
// assert_eq!(bank.get_balance(&alice.pubkey()), Some(1));
// }
//}
//
//#[cfg(all(feature = "unstable", test))]
//mod bench {
// extern crate test;
// use self::test::Bencher;
// use bank::{Bank, MAX_ENTRY_IDS};
// use bincode::serialize;
// use event_processor::*;
// use hash::hash;
// use mint::Mint;
// use rayon::prelude::*;
// use signature::{KeyPair, KeyPairUtil};
// use std::collections::HashSet;
// use std::time::Instant;
// use transaction::Transaction;
//
// #[bench]
// fn process_events_bench(_bencher: &mut Bencher) {
// let mint = Mint::new(100_000_000);
// let bank = Bank::new(&mint);
// // Create transactions between unrelated parties.
// let txs = 100_000;
// let last_ids: Mutex<HashSet<Hash>> = Mutex::new(HashSet::new());
// let transactions: Vec<_> = (0..txs)
// .into_par_iter()
// .map(|i| {
// // Seed the 'to' account and a cell for its signature.
// let dummy_id = i % (MAX_ENTRY_IDS as i32);
// let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash
// {
// let mut last_ids = last_ids.lock().unwrap();
// if !last_ids.contains(&last_id) {
// last_ids.insert(last_id);
// bank.register_entry_id(&last_id);
// }
// }
//
// // Seed the 'from' account.
// let rando0 = KeyPair::new();
// let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id);
// bank.process_verified_transaction(&tr).unwrap();
//
// let rando1 = KeyPair::new();
// let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id);
// bank.process_verified_transaction(&tr).unwrap();
//
// // Finally, return a transaction that's unique
// Transaction::new(&rando0, rando1.pubkey(), 1, last_id)
// })
// .collect();
//
// let events: Vec<_> = transactions
// .into_iter()
// .map(|tr| Event::Transaction(tr))
// .collect();
//
// let event_processor = EventProcessor::new(bank, &mint.last_id(), None);
//
// let now = Instant::now();
// assert!(event_processor.process_events(events).is_ok());
// let duration = now.elapsed();
// let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
// let tps = txs as f64 / sec;
//
// // Ensure that all transactions were successfully logged.
// drop(event_processor.historian_input);
// let entries: Vec<Entry> = event_processor.output.lock().unwrap().iter().collect();
// assert_eq!(entries.len(), 1);
// assert_eq!(entries[0].events.len(), txs as usize);
//
// println!("{} tps", tps);
// }
//}
32 changes: 26 additions & 6 deletions src/bin/testnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use solana::bank::Bank;
use solana::crdt::ReplicatedData;
use solana::entry::Entry;
use solana::event::Event;
use solana::rpu::Rpu;
use solana::server::Server;
use solana::signature::{KeyPair, KeyPairUtil};
use std::env;
use std::io::{stdin, stdout, Read};
Expand Down Expand Up @@ -116,23 +116,43 @@ fn main() {
eprintln!("creating networking stack...");

let exit = Arc::new(AtomicBool::new(false));
let rpu = Rpu::new(bank, last_id, Some(Duration::from_millis(1000)));
let serve_sock = UdpSocket::bind(&serve_addr).unwrap();
serve_sock
.set_read_timeout(Some(Duration::new(1, 0)))
.unwrap();

let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap();
let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap();
let _events_sock = UdpSocket::bind(&events_addr).unwrap();
let events_sock = UdpSocket::bind(&events_addr).unwrap();
let pubkey = KeyPair::new().pubkey();
let d = ReplicatedData::new(
pubkey,
gossip_sock.local_addr().unwrap(),
replicate_sock.local_addr().unwrap(),
serve_sock.local_addr().unwrap(),
);

let mut local = serve_sock.local_addr().unwrap();
local.set_port(0);
let broadcast_socket = UdpSocket::bind(local).unwrap();
let respond_socket = UdpSocket::bind(local.clone()).unwrap();

eprintln!("starting server...");
let threads = rpu.serve(d, serve_sock, gossip_sock, exit.clone(), stdout())
.unwrap();
let server = Server::new(
bank,
last_id,
Some(Duration::from_millis(1000)),
d,
serve_sock,
events_sock,
broadcast_socket,
respond_socket,
gossip_sock,
exit.clone(),
stdout(),
);
eprintln!("Ready. Listening on {}", serve_addr);
for t in threads {
for t in server.thread_hdls {
t.join().expect("join");
}
}
15 changes: 12 additions & 3 deletions src/ecdsa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,23 @@ pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
mod tests {
use bincode::serialize;
use ecdsa;
use event::Event;
use packet::{Packet, Packets, SharedPackets};
use request::Request;
use std::sync::RwLock;
use transaction::Transaction;
use transaction::test_tx;
use transaction::{memfind, test_tx};

#[test]
fn test_layout() {
let tr = test_tx();
let tx = serialize(&tr).unwrap();
let packet = serialize(&Event::Transaction(tr)).unwrap();
assert_matches!(memfind(&packet, &tx), Some(ecdsa::TX_OFFSET));
assert_matches!(memfind(&packet, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), None);
}

fn make_packet_from_transaction(tr: Transaction) -> Packet {
let tx = serialize(&Request::Transaction(tr)).unwrap();
let tx = serialize(&Event::Transaction(tr)).unwrap();
let mut packet = Packet::default();
packet.meta.size = tx.len();
packet.data[..packet.meta.size].copy_from_slice(&tx);
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![cfg_attr(feature = "unstable", feature(test))]
pub mod bank;
pub mod banking_stage;
pub mod crdt;
pub mod ecdsa;
pub mod entry;
Expand All @@ -20,11 +21,13 @@ pub mod request_processor;
pub mod request_stage;
pub mod result;
pub mod rpu;
pub mod server;
pub mod sig_verify_stage;
pub mod signature;
pub mod streamer;
pub mod thin_client;
pub mod timing;
pub mod tpu;
pub mod transaction;
pub mod tvu;
pub mod write_stage;
Expand Down
Loading