Skip to content

Commit

Permalink
only submit to influx when we log
Browse files Browse the repository at this point in the history
test accumilated value logging

lots of counters

higher influx rate

fix counter name

replicate-transactions
  • Loading branch information
aeyakovenko authored and garious committed Jul 11, 2018
1 parent 0c6d2ef commit 03a8a5e
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 47 deletions.
2 changes: 1 addition & 1 deletion src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl BankingStage {
reqs_len,
(reqs_len as f32) / (total_time_s)
);
inc_counter!(COUNTER, count, proc_start);
inc_counter!(COUNTER, count);
Ok(())
}
}
Expand Down
32 changes: 11 additions & 21 deletions src/counter.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use influx_db_client as influxdb;
use metrics;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use timing;

const INFLUX_RATE: usize = 100;

pub struct Counter {
pub name: &'static str,
/// total accumulated value
pub counts: AtomicUsize,
pub nanos: AtomicUsize,
pub times: AtomicUsize,
/// last accumulated value logged
pub lastlog: AtomicUsize,
Expand All @@ -20,7 +20,6 @@ macro_rules! create_counter {
Counter {
name: $name,
counts: AtomicUsize::new(0),
nanos: AtomicUsize::new(0),
times: AtomicUsize::new(0),
lastlog: AtomicUsize::new(0),
lograte: $lograte,
Expand All @@ -29,38 +28,32 @@ macro_rules! create_counter {
}

macro_rules! inc_counter {
($name:expr, $count:expr, $start:expr) => {
unsafe { $name.inc($count, $start.elapsed()) };
($name:expr, $count:expr) => {
unsafe { $name.inc($count) };
};
}

impl Counter {
pub fn inc(&mut self, events: usize, dur: Duration) {
let total = dur.as_secs() * 1_000_000_000 + dur.subsec_nanos() as u64;
pub fn inc(&mut self, events: usize) {
let counts = self.counts.fetch_add(events, Ordering::Relaxed);
let nanos = self.nanos.fetch_add(total as usize, Ordering::Relaxed);
let times = self.times.fetch_add(1, Ordering::Relaxed);
let lastlog = self.lastlog.load(Ordering::Relaxed);
if times % self.lograte == 0 && times > 0 {
info!(
"COUNTER:{{\"name\": \"{}\", \"counts\": {}, \"nanos\": {}, \"samples\": {}, \"rate\": {}, \"now\": {}}}",
"COUNTER:{{\"name\": \"{}\", \"counts\": {}, \"samples\": {}, \"now\": {}}}",
self.name,
counts,
nanos,
times,
counts as f64 * 1e9 / nanos as f64,
timing::timestamp(),
);
}
if times % INFLUX_RATE == 0 && times > 0 {
metrics::submit(
influxdb::Point::new(&format!("counter_{}", self.name))
.add_field(
"count",
influxdb::Value::Integer(counts as i64 - lastlog as i64),
)
.add_field(
"duration_ms",
influxdb::Value::Integer(timing::duration_as_ms(&dur) as i64),
)
.to_owned(),
);
self.lastlog
Expand All @@ -72,28 +65,25 @@ impl Counter {
mod tests {
use counter::Counter;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;
#[test]
fn test_counter() {
static mut COUNTER: Counter = create_counter!("test", 100);
let start = Instant::now();
let count = 1;
inc_counter!(COUNTER, count, start);
inc_counter!(COUNTER, count);
unsafe {
assert_eq!(COUNTER.counts.load(Ordering::Relaxed), 1);
assert_ne!(COUNTER.nanos.load(Ordering::Relaxed), 0);
assert_eq!(COUNTER.times.load(Ordering::Relaxed), 1);
assert_eq!(COUNTER.lograte, 100);
assert_eq!(COUNTER.lastlog.load(Ordering::Relaxed), 0);
assert_eq!(COUNTER.name, "test");
}
for _ in 0..199 {
inc_counter!(COUNTER, 2, start);
inc_counter!(COUNTER, 2);
}
unsafe {
assert_eq!(COUNTER.lastlog.load(Ordering::Relaxed), 199);
}
inc_counter!(COUNTER, 2, start);
inc_counter!(COUNTER, 2);
unsafe {
assert_eq!(COUNTER.lastlog.load(Ordering::Relaxed), 399);
}
Expand Down
24 changes: 23 additions & 1 deletion src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use bincode::{deserialize, serialize};
use byteorder::{LittleEndian, ReadBytesExt};
use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseWeightedPeerStrategy};
use counter::Counter;
use hash::Hash;
use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE};
use pnet_datalink as datalink;
Expand All @@ -28,14 +29,16 @@ use std::collections::HashMap;
use std::collections::VecDeque;
use std::io::Cursor;
use std::net::{IpAddr, SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{sleep, Builder, JoinHandle};
use std::time::Duration;
use streamer::{BlobReceiver, BlobSender, Window};
use timing::timestamp;
use transaction::Vote;

const LOG_RATE: usize = 10;

/// milliseconds we sleep for between gossip requests
const GOSSIP_SLEEP_MILLIS: u64 = 100;
const GOSSIP_PURGE_MILLIS: u64 = 15000;
Expand Down Expand Up @@ -337,6 +340,8 @@ impl Crdt {
}
}
pub fn insert_votes(&mut self, votes: Vec<(PublicKey, Vote, Hash)>) {
static mut COUNTER_VOTE: Counter = create_counter!("crdt-vote-count", LOG_RATE);
inc_counter!(COUNTER_VOTE, votes.len());
if votes.len() > 0 {
info!("{:x}: INSERTING VOTES {}", self.debug_id(), votes.len());
}
Expand All @@ -360,6 +365,8 @@ impl Crdt {
self.update_index += 1;
let _ = self.table.insert(v.id.clone(), v.clone());
let _ = self.local.insert(v.id, self.update_index);
static mut COUNTER_UPDATE: Counter = create_counter!("crdt-update-count", LOG_RATE);
inc_counter!(COUNTER_UPDATE, 1);
} else {
trace!(
"{:x}: INSERT FAILED data: {:x} new.version: {} me.version: {}",
Expand Down Expand Up @@ -431,6 +438,9 @@ impl Crdt {
})
.collect();

static mut COUNTER_PURGE: Counter = create_counter!("crdt-purge-count", LOG_RATE);
inc_counter!(COUNTER_PURGE, dead_ids.len());

for id in dead_ids.iter() {
self.alive.remove(id);
self.table.remove(id);
Expand Down Expand Up @@ -884,15 +894,24 @@ impl Crdt {
outblob.meta.set_addr(&from.contact_info.tvu_window);
outblob.set_id(sender_id).expect("blob set_id");
}
static mut COUNTER_REQ_WINDOW_PASS: Counter =
create_counter!("crdt-window-request-pass", LOG_RATE);
inc_counter!(COUNTER_REQ_WINDOW_PASS, 1);

return Some(out);
} else {
static mut COUNTER_REQ_WINDOW_OUTSIDE: Counter =
create_counter!("crdt-window-request-outside", LOG_RATE);
inc_counter!(COUNTER_REQ_WINDOW_OUTSIDE, 1);
info!(
"requested ix {} != blob_ix {}, outside window!",
ix, blob_ix
);
}
} else {
static mut COUNTER_REQ_WINDOW_FAIL: Counter =
create_counter!("crdt-window-request-fail", LOG_RATE);
inc_counter!(COUNTER_REQ_WINDOW_FAIL, 1);
assert!(window.read().unwrap()[pos].is_none());
info!(
"{:x}: failed RequestWindowIndex {:x} {} {}",
Expand Down Expand Up @@ -971,6 +990,9 @@ impl Crdt {
//TODO verify from is signed
obj.write().unwrap().insert(&from);
let me = obj.read().unwrap().my_data().clone();
static mut COUNTER_REQ_WINDOW: Counter =
create_counter!("crdt-window-request-recv", LOG_RATE);
inc_counter!(COUNTER_REQ_WINDOW, 1);
trace!(
"{:x}:received RequestWindowIndex {:x} {} ",
me.debug_id(),
Expand Down
8 changes: 3 additions & 5 deletions src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ use std::mem::size_of;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, Mutex, RwLock};
use std::time::Instant;

pub type SharedPackets = Arc<RwLock<Packets>>;
pub type SharedBlob = Arc<RwLock<Blob>>;
pub type SharedBlobs = VecDeque<SharedBlob>;
pub type PacketRecycler = Recycler<Packets>;
pub type BlobRecycler = Recycler<Blob>;

const LOG_RATE: usize = 10;
pub const NUM_PACKETS: usize = 1024 * 8;
pub const BLOB_SIZE: usize = 64 * 1024;
pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - BLOB_HEADER_SIZE;
Expand Down Expand Up @@ -188,7 +188,7 @@ impl<T: Default> Recycler<T> {

impl Packets {
fn run_read_from(&mut self, socket: &UdpSocket) -> Result<usize> {
static mut COUNTER: Counter = create_counter!("packets", 10);
static mut COUNTER: Counter = create_counter!("packets", LOG_RATE);
self.packets.resize(NUM_PACKETS, Packet::default());
let mut i = 0;
//DOCUMENTED SIDE-EFFECT
Expand All @@ -198,13 +198,12 @@ impl Packets {
// * read until it fails
// * set it back to blocking before returning
socket.set_nonblocking(false)?;
let mut start = Instant::now();
for p in &mut self.packets {
p.meta.size = 0;
trace!("receiving on {}", socket.local_addr().unwrap());
match socket.recv_from(&mut p.data) {
Err(_) if i > 0 => {
inc_counter!(COUNTER, i, start);
inc_counter!(COUNTER, i);
debug!("got {:?} messages on {}", i, socket.local_addr().unwrap());
break;
}
Expand All @@ -216,7 +215,6 @@ impl Packets {
p.meta.size = nrecv;
p.meta.set_addr(&from);
if i == 0 {
start = Instant::now();
socket.set_nonblocking(true)?;
}
}
Expand Down
9 changes: 9 additions & 0 deletions src/replicate_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use bank::Bank;
use bincode::serialize;
use counter::Counter;
use crdt::Crdt;
use ledger;
use packet::BlobRecycler;
Expand All @@ -10,6 +11,7 @@ use service::Service;
use signature::KeyPair;
use std::collections::VecDeque;
use std::net::UdpSocket;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::channel;
use std::sync::mpsc::RecvTimeoutError;
use std::sync::{Arc, RwLock};
Expand All @@ -25,6 +27,7 @@ pub struct ReplicateStage {
}

const VOTE_TIMEOUT_MS: u64 = 1000;
const LOG_RATE: usize = 10;

impl ReplicateStage {
/// Process entry blobs, already in order
Expand All @@ -46,6 +49,12 @@ impl ReplicateStage {
let blobs_len = blobs.len();
let entries = ledger::reconstruct_entries_from_blobs(blobs.clone())?;
let votes = entries_to_votes(&entries);

static mut COUNTER_REPLICATE: Counter = create_counter!("replicate-transactions", LOG_RATE);
inc_counter!(
COUNTER_REPLICATE,
entries.iter().map(|x| x.transactions.len()).sum()
);
let res = bank.process_entries(entries);
if res.is_err() {
error!("process_entries {} {:?}", blobs_len, res);
Expand Down
7 changes: 2 additions & 5 deletions src/sigverify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use counter::Counter;
use packet::{Packet, SharedPackets};
use std::mem::size_of;
use std::sync::atomic::AtomicUsize;
use std::time::Instant;
use transaction::{PUB_KEY_OFFSET, SIGNED_DATA_OFFSET, SIG_OFFSET};

pub const TX_OFFSET: usize = 0;
Expand Down Expand Up @@ -71,7 +70,6 @@ fn batch_size(batches: &Vec<SharedPackets>) -> usize {
pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
use rayon::prelude::*;
static mut COUNTER: Counter = create_counter!("ed25519_verify", 1);
let start = Instant::now();
let count = batch_size(batches);
info!("CPU ECDSA for {}", batch_size(batches));
let rv = batches
Expand All @@ -85,15 +83,14 @@ pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
.collect()
})
.collect();
inc_counter!(COUNTER, count, start);
inc_counter!(COUNTER, count);
rv
}

#[cfg(feature = "cuda")]
pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
use packet::PACKET_DATA_SIZE;
static mut COUNTER: Counter = create_counter!("ed25519_verify_cuda", 1);
let start = Instant::now();
let count = batch_size(batches);
info!("CUDA ECDSA for {}", batch_size(batches));
let mut out = Vec::new();
Expand Down Expand Up @@ -153,7 +150,7 @@ pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
num += 1;
}
}
inc_counter!(COUNTER, count, start);
inc_counter!(COUNTER, count);
rvs
}

Expand Down
Loading

0 comments on commit 03a8a5e

Please sign in to comment.