-
Notifications
You must be signed in to change notification settings - Fork 4.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #206 from garious/add-accounting-stage
More modules
- Loading branch information
Showing
10 changed files
with
360 additions
and
341 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
Cargo.lock | ||
/target/ | ||
**/*.rs.bk | ||
.cargo |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
//! The `request` module defines the messages for the thin client. | ||
use bincode::serialize; | ||
use hash::Hash; | ||
use packet; | ||
use packet::SharedPackets; | ||
use signature::PublicKey; | ||
use transaction::Transaction; | ||
|
||
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] | ||
#[derive(Serialize, Deserialize, Debug, Clone)] | ||
pub enum Request { | ||
Transaction(Transaction), | ||
GetBalance { key: PublicKey }, | ||
Subscribe { subscriptions: Vec<Subscription> }, | ||
} | ||
|
||
#[derive(Serialize, Deserialize, Debug, Clone)] | ||
pub enum Subscription { | ||
EntryInfo, | ||
} | ||
|
||
#[derive(Serialize, Deserialize, Debug, Clone)] | ||
pub struct EntryInfo { | ||
pub id: Hash, | ||
pub num_hashes: u64, | ||
pub num_events: u64, | ||
} | ||
|
||
impl Request { | ||
/// Verify the request is valid. | ||
pub fn verify(&self) -> bool { | ||
match *self { | ||
Request::Transaction(ref tr) => tr.verify_plan(), | ||
_ => true, | ||
} | ||
} | ||
} | ||
|
||
#[derive(Serialize, Deserialize, Debug)] | ||
pub enum Response { | ||
Balance { key: PublicKey, val: Option<i64> }, | ||
EntryInfo(EntryInfo), | ||
} | ||
|
||
pub fn to_request_packets(r: &packet::PacketRecycler, reqs: Vec<Request>) -> Vec<SharedPackets> { | ||
let mut out = vec![]; | ||
for rrs in reqs.chunks(packet::NUM_PACKETS) { | ||
let p = r.allocate(); | ||
p.write() | ||
.unwrap() | ||
.packets | ||
.resize(rrs.len(), Default::default()); | ||
for (i, o) in rrs.iter().zip(p.write().unwrap().packets.iter_mut()) { | ||
let v = serialize(&i).expect("serialize request"); | ||
let len = v.len(); | ||
o.data[..len].copy_from_slice(&v); | ||
o.meta.size = len; | ||
} | ||
out.push(p); | ||
} | ||
return out; | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use bincode::serialize; | ||
use ecdsa; | ||
use packet::{PacketRecycler, NUM_PACKETS}; | ||
use request::{to_request_packets, Request}; | ||
use transaction::{memfind, test_tx}; | ||
|
||
#[test] | ||
fn test_layout() { | ||
let tr = test_tx(); | ||
let tx = serialize(&tr).unwrap(); | ||
let packet = serialize(&Request::Transaction(tr)).unwrap(); | ||
assert_matches!(memfind(&packet, &tx), Some(ecdsa::TX_OFFSET)); | ||
assert_matches!(memfind(&packet, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), None); | ||
} | ||
|
||
#[test] | ||
fn test_to_packets() { | ||
let tr = Request::Transaction(test_tx()); | ||
let re = PacketRecycler::default(); | ||
let rv = to_request_packets(&re, vec![tr.clone(); 1]); | ||
assert_eq!(rv.len(), 1); | ||
assert_eq!(rv[0].read().unwrap().packets.len(), 1); | ||
|
||
let rv = to_request_packets(&re, vec![tr.clone(); NUM_PACKETS]); | ||
assert_eq!(rv.len(), 1); | ||
assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); | ||
|
||
let rv = to_request_packets(&re, vec![tr.clone(); NUM_PACKETS + 1]); | ||
assert_eq!(rv.len(), 2); | ||
assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); | ||
assert_eq!(rv[1].read().unwrap().packets.len(), 1); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,232 @@ | ||
//! The `request_stage` processes thin client Request messages. | ||
use accountant::Accountant; | ||
use bincode::{deserialize, serialize}; | ||
use entry::Entry; | ||
use event::Event; | ||
use event_processor::EventProcessor; | ||
use packet; | ||
use packet::SharedPackets; | ||
use rayon::prelude::*; | ||
use request::{EntryInfo, Request, Response, Subscription}; | ||
use result::Result; | ||
use std::collections::VecDeque; | ||
use std::net::{SocketAddr, UdpSocket}; | ||
use std::sync::mpsc::{Receiver, Sender}; | ||
use std::sync::{Arc, Mutex}; | ||
use std::time::Duration; | ||
use std::time::Instant; | ||
use streamer; | ||
use timing; | ||
|
||
pub struct RequestProcessor { | ||
accountant: Arc<Accountant>, | ||
entry_info_subscribers: Mutex<Vec<SocketAddr>>, | ||
} | ||
|
||
impl RequestProcessor { | ||
/// Create a new Tpu that wraps the given Accountant. | ||
pub fn new(accountant: Arc<Accountant>) -> Self { | ||
RequestProcessor { | ||
accountant, | ||
entry_info_subscribers: Mutex::new(vec![]), | ||
} | ||
} | ||
|
||
/// Process Request items sent by clients. | ||
fn process_request( | ||
&self, | ||
msg: Request, | ||
rsp_addr: SocketAddr, | ||
) -> Option<(Response, SocketAddr)> { | ||
match msg { | ||
Request::GetBalance { key } => { | ||
let val = self.accountant.get_balance(&key); | ||
let rsp = (Response::Balance { key, val }, rsp_addr); | ||
info!("Response::Balance {:?}", rsp); | ||
Some(rsp) | ||
} | ||
Request::Transaction(_) => unreachable!(), | ||
Request::Subscribe { subscriptions } => { | ||
for subscription in subscriptions { | ||
match subscription { | ||
Subscription::EntryInfo => { | ||
self.entry_info_subscribers.lock().unwrap().push(rsp_addr) | ||
} | ||
} | ||
} | ||
None | ||
} | ||
} | ||
} | ||
|
||
pub fn process_requests( | ||
&self, | ||
reqs: Vec<(Request, SocketAddr)>, | ||
) -> Vec<(Response, SocketAddr)> { | ||
reqs.into_iter() | ||
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr)) | ||
.collect() | ||
} | ||
|
||
pub fn notify_entry_info_subscribers(&self, entry: &Entry) { | ||
// TODO: No need to bind(). | ||
let socket = UdpSocket::bind("0.0.0.0:0").expect("bind"); | ||
|
||
// copy subscribers to avoid taking lock while doing io | ||
let addrs = self.entry_info_subscribers.lock().unwrap().clone(); | ||
trace!("Sending to {} addrs", addrs.len()); | ||
for addr in addrs { | ||
let entry_info = EntryInfo { | ||
id: entry.id, | ||
num_hashes: entry.num_hashes, | ||
num_events: entry.events.len() as u64, | ||
}; | ||
let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo"); | ||
trace!("sending {} to {}", data.len(), addr); | ||
//TODO dont do IO here, this needs to be on a separate channel | ||
let res = socket.send_to(&data, addr); | ||
if res.is_err() { | ||
eprintln!("couldn't send response: {:?}", res); | ||
} | ||
} | ||
} | ||
|
||
fn deserialize_requests(p: &packet::Packets) -> Vec<Option<(Request, SocketAddr)>> { | ||
p.packets | ||
.par_iter() | ||
.map(|x| { | ||
deserialize(&x.data[0..x.meta.size]) | ||
.map(|req| (req, x.meta.addr())) | ||
.ok() | ||
}) | ||
.collect() | ||
} | ||
|
||
// Copy-paste of deserialize_requests() because I can't figure out how to | ||
// route the lifetimes in a generic version. | ||
pub fn deserialize_events(p: &packet::Packets) -> Vec<Option<(Event, SocketAddr)>> { | ||
p.packets | ||
.par_iter() | ||
.map(|x| { | ||
deserialize(&x.data[0..x.meta.size]) | ||
.map(|req| (req, x.meta.addr())) | ||
.ok() | ||
}) | ||
.collect() | ||
} | ||
|
||
/// Split Request list into verified transactions and the rest | ||
fn partition_requests( | ||
req_vers: Vec<(Request, SocketAddr, u8)>, | ||
) -> (Vec<Event>, Vec<(Request, SocketAddr)>) { | ||
let mut events = vec![]; | ||
let mut reqs = vec![]; | ||
for (msg, rsp_addr, verify) in req_vers { | ||
match msg { | ||
Request::Transaction(tr) => { | ||
if verify != 0 { | ||
events.push(Event::Transaction(tr)); | ||
} | ||
} | ||
_ => reqs.push((msg, rsp_addr)), | ||
} | ||
} | ||
(events, reqs) | ||
} | ||
|
||
fn serialize_response( | ||
resp: Response, | ||
rsp_addr: SocketAddr, | ||
blob_recycler: &packet::BlobRecycler, | ||
) -> Result<packet::SharedBlob> { | ||
let blob = blob_recycler.allocate(); | ||
{ | ||
let mut b = blob.write().unwrap(); | ||
let v = serialize(&resp)?; | ||
let len = v.len(); | ||
b.data[..len].copy_from_slice(&v); | ||
b.meta.size = len; | ||
b.meta.set_addr(&rsp_addr); | ||
} | ||
Ok(blob) | ||
} | ||
|
||
fn serialize_responses( | ||
rsps: Vec<(Response, SocketAddr)>, | ||
blob_recycler: &packet::BlobRecycler, | ||
) -> Result<VecDeque<packet::SharedBlob>> { | ||
let mut blobs = VecDeque::new(); | ||
for (resp, rsp_addr) in rsps { | ||
blobs.push_back(Self::serialize_response(resp, rsp_addr, blob_recycler)?); | ||
} | ||
Ok(blobs) | ||
} | ||
|
||
pub fn process_request_packets( | ||
&self, | ||
event_processor: &EventProcessor, | ||
verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>, | ||
entry_sender: &Sender<Entry>, | ||
blob_sender: &streamer::BlobSender, | ||
packet_recycler: &packet::PacketRecycler, | ||
blob_recycler: &packet::BlobRecycler, | ||
) -> Result<()> { | ||
let timer = Duration::new(1, 0); | ||
let recv_start = Instant::now(); | ||
let mms = verified_receiver.recv_timeout(timer)?; | ||
let mut reqs_len = 0; | ||
let mms_len = mms.len(); | ||
info!( | ||
"@{:?} process start stalled for: {:?}ms batches: {}", | ||
timing::timestamp(), | ||
timing::duration_as_ms(&recv_start.elapsed()), | ||
mms.len(), | ||
); | ||
let proc_start = Instant::now(); | ||
for (msgs, vers) in mms { | ||
let reqs = Self::deserialize_requests(&msgs.read().unwrap()); | ||
reqs_len += reqs.len(); | ||
let req_vers = reqs.into_iter() | ||
.zip(vers) | ||
.filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver))) | ||
.filter(|x| { | ||
let v = x.0.verify(); | ||
v | ||
}) | ||
.collect(); | ||
|
||
debug!("partitioning"); | ||
let (events, reqs) = Self::partition_requests(req_vers); | ||
debug!("events: {} reqs: {}", events.len(), reqs.len()); | ||
|
||
debug!("process_events"); | ||
let entry = event_processor.process_events(events)?; | ||
entry_sender.send(entry)?; | ||
debug!("done process_events"); | ||
|
||
debug!("process_requests"); | ||
let rsps = self.process_requests(reqs); | ||
debug!("done process_requests"); | ||
|
||
let blobs = Self::serialize_responses(rsps, blob_recycler)?; | ||
if !blobs.is_empty() { | ||
info!("process: sending blobs: {}", blobs.len()); | ||
//don't wake up the other side if there is nothing | ||
blob_sender.send(blobs)?; | ||
} | ||
packet_recycler.recycle(msgs); | ||
} | ||
let total_time_s = timing::duration_as_s(&proc_start.elapsed()); | ||
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); | ||
info!( | ||
"@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}", | ||
timing::timestamp(), | ||
mms_len, | ||
total_time_ms, | ||
reqs_len, | ||
(reqs_len as f32) / (total_time_s) | ||
); | ||
Ok(()) | ||
} | ||
} |
Oops, something went wrong.