Skip to content

Commit

Permalink
perf counters
Browse files Browse the repository at this point in the history
  • Loading branch information
aeyakovenko authored and solana-grimes committed Jun 7, 2018
1 parent 586279b commit c2a9395
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 4 deletions.
6 changes: 5 additions & 1 deletion src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
use bank::Bank;
use bincode::deserialize;
use counter::Counter;
use packet;
use packet::SharedPackets;
use rayon::prelude::*;
use record_stage::Signal;
use result::Result;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use std::thread::{Builder, JoinHandle};
Expand Down Expand Up @@ -94,6 +95,8 @@ impl BankingStage {
timing::duration_as_ms(&recv_start.elapsed()),
mms.len(),
);
let count = mms.iter().map(|x| x.1.len()).sum();
static mut COUNTER: Counter = create_counter!("banking_stage_process_packets", 1);
let proc_start = Instant::now();
for (msgs, vers) in mms {
let transactions = Self::deserialize_transactions(&msgs.read().unwrap());
Expand Down Expand Up @@ -129,6 +132,7 @@ impl BankingStage {
reqs_len,
(reqs_len as f32) / (total_time_s)
);
inc_counter!(COUNTER, count, proc_start);
Ok(())
}
}
Expand Down
70 changes: 70 additions & 0 deletions src/counter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH};

pub struct Counter {
pub name: &'static str,
pub counts: AtomicUsize,
pub nanos: AtomicUsize,
pub times: AtomicUsize,
pub lograte: usize,
}

macro_rules! create_counter {
($name:expr, $lograte:expr) => {
Counter {
name: $name,
counts: AtomicUsize::new(0),
nanos: AtomicUsize::new(0),
times: AtomicUsize::new(0),
lograte: $lograte,
}
};
}

macro_rules! inc_counter {
($name:expr, $count:expr, $start:expr) => {
unsafe { $name.inc($count, $start.elapsed()) };
};
}

impl Counter {
pub fn inc(&mut self, events: usize, dur: Duration) {
let total = dur.as_secs() * 1_000_000_000 + dur.subsec_nanos() as u64;
let counts = self.counts.fetch_add(events, Ordering::Relaxed);
let nanos = self.nanos.fetch_add(total as usize, Ordering::Relaxed);
let times = self.times.fetch_add(1, Ordering::Relaxed);
if times % self.lograte == 0 && times > 0 {
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let now_ms = now.as_secs() * 1_000 + now.subsec_nanos() as u64 / 1_000_000;
info!(
"COUNTER:{{\"name:\":\"{}\", \"counts\": {}, \"nanos\": {}, \"samples\": {} \"rate\": {}, \"now\": {}}}",
self.name,
counts,
nanos,
times,
counts as f64 * 1e9 / nanos as f64,
now_ms,
);
}
}
}
#[cfg(test)]
mod tests {
use counter::Counter;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;
#[test]
fn test_counter() {
static mut COUNTER: Counter = create_counter!("test", 100);
let start = Instant::now();
let count = 1;
inc_counter!(COUNTER, count, start);
unsafe {
assert_eq!(COUNTER.counts.load(Ordering::Relaxed), 1);
assert_ne!(COUNTER.nanos.load(Ordering::Relaxed), 0);
assert_eq!(COUNTER.times.load(Ordering::Relaxed), 1);
assert_eq!(COUNTER.lograte, 100);
assert_eq!(COUNTER.name, "test");
}
}
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
//!
#![cfg_attr(feature = "unstable", feature(test))]
#[macro_use]
pub mod counter;
pub mod bank;
pub mod banking_stage;
pub mod budget;
Expand Down
7 changes: 7 additions & 0 deletions src/packet.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! The `packet` module defines data structures and methods to pull data from the network.
use bincode::{deserialize, serialize};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use counter::Counter;
use result::{Error, Result};
use serde::Serialize;
use signature::PublicKey;
Expand All @@ -9,7 +10,9 @@ use std::fmt;
use std::io;
use std::mem::size_of;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, Mutex, RwLock};
use std::time::Instant;

pub type SharedPackets = Arc<RwLock<Packets>>;
pub type SharedBlob = Arc<RwLock<Blob>>;
Expand Down Expand Up @@ -169,6 +172,7 @@ impl<T: Default> Recycler<T> {

impl Packets {
fn run_read_from(&mut self, socket: &UdpSocket) -> Result<usize> {
static mut COUNTER: Counter = create_counter!("packets", 10);
self.packets.resize(NUM_PACKETS, Packet::default());
let mut i = 0;
//DOCUMENTED SIDE-EFFECT
Expand All @@ -178,11 +182,13 @@ impl Packets {
// * read until it fails
// * set it back to blocking before returning
socket.set_nonblocking(false)?;
let mut start = Instant::now();
for p in &mut self.packets {
p.meta.size = 0;
trace!("receiving on {}", socket.local_addr().unwrap());
match socket.recv_from(&mut p.data) {
Err(_) if i > 0 => {
inc_counter!(COUNTER, i, start);
debug!("got {:?} messages on {}", i, socket.local_addr().unwrap());
break;
}
Expand All @@ -194,6 +200,7 @@ impl Packets {
p.meta.size = nrecv;
p.meta.set_addr(&from);
if i == 0 {
start = Instant::now();
socket.set_nonblocking(true)?;
}
}
Expand Down
17 changes: 14 additions & 3 deletions src/sigverify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
//! offloaded to the GPU.
//!
use counter::Counter;
use packet::{Packet, SharedPackets};
use std::mem::size_of;
use std::sync::atomic::AtomicUsize;
use std::time::Instant;
use transaction::{PUB_KEY_OFFSET, SIGNED_DATA_OFFSET, SIG_OFFSET};

pub const TX_OFFSET: usize = 0;
Expand Down Expand Up @@ -67,8 +70,11 @@ fn batch_size(batches: &Vec<SharedPackets>) -> usize {
#[cfg(not(feature = "cuda"))]
pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
use rayon::prelude::*;
static mut COUNTER: Counter = create_counter!("ed25519_verify", 1);
let start = Instant::now();
let count = batch_size(batches);
info!("CPU ECDSA for {}", batch_size(batches));
batches
let rv = batches
.into_par_iter()
.map(|p| {
p.read()
Expand All @@ -78,13 +84,17 @@ pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
.map(verify_packet)
.collect()
})
.collect()
.collect();
inc_counter!(COUNTER, count, start);
rv
}

#[cfg(feature = "cuda")]
pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
use packet::PACKET_DATA_SIZE;

static mut COUNTER: Counter = create_counter!("ed25519_verify_cuda", 1);
let start = Instant::now();
let count = batch_size(batches);
info!("CUDA ECDSA for {}", batch_size(batches));
let mut out = Vec::new();
let mut elems = Vec::new();
Expand Down Expand Up @@ -143,6 +153,7 @@ pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
num += 1;
}
}
inc_counter!(COUNTER, count, start);
rvs
}

Expand Down

0 comments on commit c2a9395

Please sign in to comment.