From c2a9395a4bb9b83dbeed82e2ed2a392762e209ab Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 30 May 2018 21:24:21 -0700 Subject: [PATCH] perf counters --- src/banking_stage.rs | 6 +++- src/counter.rs | 70 ++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 2 ++ src/packet.rs | 7 +++++ src/sigverify.rs | 17 +++++++++-- 5 files changed, 98 insertions(+), 4 deletions(-) create mode 100644 src/counter.rs diff --git a/src/banking_stage.rs b/src/banking_stage.rs index 21462cfd030bc5..abcd518bae05cf 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -4,13 +4,14 @@ use bank::Bank; use bincode::deserialize; +use counter::Counter; use packet; use packet::SharedPackets; use rayon::prelude::*; use record_stage::Signal; use result::Result; use std::net::SocketAddr; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::Arc; use std::thread::{Builder, JoinHandle}; @@ -94,6 +95,8 @@ impl BankingStage { timing::duration_as_ms(&recv_start.elapsed()), mms.len(), ); + let count = mms.iter().map(|x| x.1.len()).sum(); + static mut COUNTER: Counter = create_counter!("banking_stage_process_packets", 1); let proc_start = Instant::now(); for (msgs, vers) in mms { let transactions = Self::deserialize_transactions(&msgs.read().unwrap()); @@ -129,6 +132,7 @@ impl BankingStage { reqs_len, (reqs_len as f32) / (total_time_s) ); + inc_counter!(COUNTER, count, proc_start); Ok(()) } } diff --git a/src/counter.rs b/src/counter.rs new file mode 100644 index 00000000000000..279f81902a3159 --- /dev/null +++ b/src/counter.rs @@ -0,0 +1,70 @@ +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +pub struct Counter { + pub name: &'static str, + pub counts: AtomicUsize, + pub nanos: AtomicUsize, + pub times: AtomicUsize, + pub lograte: usize, +} + +macro_rules! create_counter { + ($name:expr, $lograte:expr) => { + Counter { + name: $name, + counts: AtomicUsize::new(0), + nanos: AtomicUsize::new(0), + times: AtomicUsize::new(0), + lograte: $lograte, + } + }; +} + +macro_rules! inc_counter { + ($name:expr, $count:expr, $start:expr) => { + unsafe { $name.inc($count, $start.elapsed()) }; + }; +} + +impl Counter { + pub fn inc(&mut self, events: usize, dur: Duration) { + let total = dur.as_secs() * 1_000_000_000 + dur.subsec_nanos() as u64; + let counts = self.counts.fetch_add(events, Ordering::Relaxed); + let nanos = self.nanos.fetch_add(total as usize, Ordering::Relaxed); + let times = self.times.fetch_add(1, Ordering::Relaxed); + if times % self.lograte == 0 && times > 0 { + let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); + let now_ms = now.as_secs() * 1_000 + now.subsec_nanos() as u64 / 1_000_000; + info!( + "COUNTER:{{\"name:\":\"{}\", \"counts\": {}, \"nanos\": {}, \"samples\": {} \"rate\": {}, \"now\": {}}}", + self.name, + counts, + nanos, + times, + counts as f64 * 1e9 / nanos as f64, + now_ms, + ); + } + } +} +#[cfg(test)] +mod tests { + use counter::Counter; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::time::Instant; + #[test] + fn test_counter() { + static mut COUNTER: Counter = create_counter!("test", 100); + let start = Instant::now(); + let count = 1; + inc_counter!(COUNTER, count, start); + unsafe { + assert_eq!(COUNTER.counts.load(Ordering::Relaxed), 1); + assert_ne!(COUNTER.nanos.load(Ordering::Relaxed), 0); + assert_eq!(COUNTER.times.load(Ordering::Relaxed), 1); + assert_eq!(COUNTER.lograte, 100); + assert_eq!(COUNTER.name, "test"); + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 78a954a16fd84e..b0a9d5afea8ddc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,6 +7,8 @@ //! #![cfg_attr(feature = "unstable", feature(test))] +#[macro_use] +pub mod counter; pub mod bank; pub mod banking_stage; pub mod budget; diff --git a/src/packet.rs b/src/packet.rs index c73ea1afb31d65..262d6761a03ea3 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -1,6 +1,7 @@ //! The `packet` module defines data structures and methods to pull data from the network. use bincode::{deserialize, serialize}; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; +use counter::Counter; use result::{Error, Result}; use serde::Serialize; use signature::PublicKey; @@ -9,7 +10,9 @@ use std::fmt; use std::io; use std::mem::size_of; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; +use std::sync::atomic::AtomicUsize; use std::sync::{Arc, Mutex, RwLock}; +use std::time::Instant; pub type SharedPackets = Arc>; pub type SharedBlob = Arc>; @@ -169,6 +172,7 @@ impl Recycler { impl Packets { fn run_read_from(&mut self, socket: &UdpSocket) -> Result { + static mut COUNTER: Counter = create_counter!("packets", 10); self.packets.resize(NUM_PACKETS, Packet::default()); let mut i = 0; //DOCUMENTED SIDE-EFFECT @@ -178,11 +182,13 @@ impl Packets { // * read until it fails // * set it back to blocking before returning socket.set_nonblocking(false)?; + let mut start = Instant::now(); for p in &mut self.packets { p.meta.size = 0; trace!("receiving on {}", socket.local_addr().unwrap()); match socket.recv_from(&mut p.data) { Err(_) if i > 0 => { + inc_counter!(COUNTER, i, start); debug!("got {:?} messages on {}", i, socket.local_addr().unwrap()); break; } @@ -194,6 +200,7 @@ impl Packets { p.meta.size = nrecv; p.meta.set_addr(&from); if i == 0 { + start = Instant::now(); socket.set_nonblocking(true)?; } } diff --git a/src/sigverify.rs b/src/sigverify.rs index 164c7dff494dbd..401f6e5977dc46 100644 --- a/src/sigverify.rs +++ b/src/sigverify.rs @@ -4,8 +4,11 @@ //! offloaded to the GPU. //! +use counter::Counter; use packet::{Packet, SharedPackets}; use std::mem::size_of; +use std::sync::atomic::AtomicUsize; +use std::time::Instant; use transaction::{PUB_KEY_OFFSET, SIGNED_DATA_OFFSET, SIG_OFFSET}; pub const TX_OFFSET: usize = 0; @@ -67,8 +70,11 @@ fn batch_size(batches: &Vec) -> usize { #[cfg(not(feature = "cuda"))] pub fn ed25519_verify(batches: &Vec) -> Vec> { use rayon::prelude::*; + static mut COUNTER: Counter = create_counter!("ed25519_verify", 1); + let start = Instant::now(); + let count = batch_size(batches); info!("CPU ECDSA for {}", batch_size(batches)); - batches + let rv = batches .into_par_iter() .map(|p| { p.read() @@ -78,13 +84,17 @@ pub fn ed25519_verify(batches: &Vec) -> Vec> { .map(verify_packet) .collect() }) - .collect() + .collect(); + inc_counter!(COUNTER, count, start); + rv } #[cfg(feature = "cuda")] pub fn ed25519_verify(batches: &Vec) -> Vec> { use packet::PACKET_DATA_SIZE; - + static mut COUNTER: Counter = create_counter!("ed25519_verify_cuda", 1); + let start = Instant::now(); + let count = batch_size(batches); info!("CUDA ECDSA for {}", batch_size(batches)); let mut out = Vec::new(); let mut elems = Vec::new(); @@ -143,6 +153,7 @@ pub fn ed25519_verify(batches: &Vec) -> Vec> { num += 1; } } + inc_counter!(COUNTER, count, start); rvs }