diff --git a/src/banking_stage.rs b/src/banking_stage.rs index f7d39c31e722d2..9229312bfe4196 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -125,7 +125,7 @@ impl BankingStage { reqs_len, (reqs_len as f32) / (total_time_s) ); - inc_counter!(COUNTER, count, proc_start); + inc_counter!(COUNTER, count); Ok(()) } } diff --git a/src/counter.rs b/src/counter.rs index b3ed195456d551..f7509ca4419b91 100644 --- a/src/counter.rs +++ b/src/counter.rs @@ -1,14 +1,14 @@ use influx_db_client as influxdb; use metrics; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::time::Duration; use timing; +const INFLUX_RATE: usize = 100; + pub struct Counter { pub name: &'static str, /// total accumulated value pub counts: AtomicUsize, - pub nanos: AtomicUsize, pub times: AtomicUsize, /// last accumulated value logged pub lastlog: AtomicUsize, @@ -20,7 +20,6 @@ macro_rules! create_counter { Counter { name: $name, counts: AtomicUsize::new(0), - nanos: AtomicUsize::new(0), times: AtomicUsize::new(0), lastlog: AtomicUsize::new(0), lograte: $lograte, @@ -29,38 +28,32 @@ macro_rules! create_counter { } macro_rules! inc_counter { - ($name:expr, $count:expr, $start:expr) => { - unsafe { $name.inc($count, $start.elapsed()) }; + ($name:expr, $count:expr) => { + unsafe { $name.inc($count) }; }; } impl Counter { - pub fn inc(&mut self, events: usize, dur: Duration) { - let total = dur.as_secs() * 1_000_000_000 + dur.subsec_nanos() as u64; + pub fn inc(&mut self, events: usize) { let counts = self.counts.fetch_add(events, Ordering::Relaxed); - let nanos = self.nanos.fetch_add(total as usize, Ordering::Relaxed); let times = self.times.fetch_add(1, Ordering::Relaxed); let lastlog = self.lastlog.load(Ordering::Relaxed); if times % self.lograte == 0 && times > 0 { info!( - "COUNTER:{{\"name\": \"{}\", \"counts\": {}, \"nanos\": {}, \"samples\": {}, \"rate\": {}, \"now\": {}}}", + "COUNTER:{{\"name\": \"{}\", \"counts\": {}, \"samples\": {}, \"now\": {}}}", self.name, counts, - nanos, times, - counts as f64 * 1e9 / nanos as f64, timing::timestamp(), ); + } + if times % INFLUX_RATE == 0 && times > 0 { metrics::submit( influxdb::Point::new(&format!("counter_{}", self.name)) .add_field( "count", influxdb::Value::Integer(counts as i64 - lastlog as i64), ) - .add_field( - "duration_ms", - influxdb::Value::Integer(timing::duration_as_ms(&dur) as i64), - ) .to_owned(), ); self.lastlog @@ -72,28 +65,25 @@ impl Counter { mod tests { use counter::Counter; use std::sync::atomic::{AtomicUsize, Ordering}; - use std::time::Instant; #[test] fn test_counter() { static mut COUNTER: Counter = create_counter!("test", 100); - let start = Instant::now(); let count = 1; - inc_counter!(COUNTER, count, start); + inc_counter!(COUNTER, count); unsafe { assert_eq!(COUNTER.counts.load(Ordering::Relaxed), 1); - assert_ne!(COUNTER.nanos.load(Ordering::Relaxed), 0); assert_eq!(COUNTER.times.load(Ordering::Relaxed), 1); assert_eq!(COUNTER.lograte, 100); assert_eq!(COUNTER.lastlog.load(Ordering::Relaxed), 0); assert_eq!(COUNTER.name, "test"); } for _ in 0..199 { - inc_counter!(COUNTER, 2, start); + inc_counter!(COUNTER, 2); } unsafe { assert_eq!(COUNTER.lastlog.load(Ordering::Relaxed), 199); } - inc_counter!(COUNTER, 2, start); + inc_counter!(COUNTER, 2); unsafe { assert_eq!(COUNTER.lastlog.load(Ordering::Relaxed), 399); } diff --git a/src/crdt.rs b/src/crdt.rs index 512c037025d04b..cb20bf8f1f73f7 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -16,6 +16,7 @@ use bincode::{deserialize, serialize}; use byteorder::{LittleEndian, ReadBytesExt}; use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseWeightedPeerStrategy}; +use counter::Counter; use hash::Hash; use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE}; use pnet_datalink as datalink; @@ -28,7 +29,7 @@ use std::collections::HashMap; use std::collections::VecDeque; use std::io::Cursor; use std::net::{IpAddr, SocketAddr, UdpSocket}; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::{sleep, Builder, JoinHandle}; use std::time::Duration; @@ -36,6 +37,8 @@ use streamer::{BlobReceiver, BlobSender, Window}; use timing::timestamp; use transaction::Vote; +const LOG_RATE: usize = 10; + /// milliseconds we sleep for between gossip requests const GOSSIP_SLEEP_MILLIS: u64 = 100; const GOSSIP_PURGE_MILLIS: u64 = 15000; @@ -337,6 +340,8 @@ impl Crdt { } } pub fn insert_votes(&mut self, votes: Vec<(PublicKey, Vote, Hash)>) { + static mut COUNTER_VOTE: Counter = create_counter!("crdt-vote-count", LOG_RATE); + inc_counter!(COUNTER_VOTE, votes.len()); if votes.len() > 0 { info!("{:x}: INSERTING VOTES {}", self.debug_id(), votes.len()); } @@ -360,6 +365,8 @@ impl Crdt { self.update_index += 1; let _ = self.table.insert(v.id.clone(), v.clone()); let _ = self.local.insert(v.id, self.update_index); + static mut COUNTER_UPDATE: Counter = create_counter!("crdt-update-count", LOG_RATE); + inc_counter!(COUNTER_UPDATE, 1); } else { trace!( "{:x}: INSERT FAILED data: {:x} new.version: {} me.version: {}", @@ -431,6 +438,9 @@ impl Crdt { }) .collect(); + static mut COUNTER_PURGE: Counter = create_counter!("crdt-purge-count", LOG_RATE); + inc_counter!(COUNTER_PURGE, dead_ids.len()); + for id in dead_ids.iter() { self.alive.remove(id); self.table.remove(id); @@ -884,15 +894,24 @@ impl Crdt { outblob.meta.set_addr(&from.contact_info.tvu_window); outblob.set_id(sender_id).expect("blob set_id"); } + static mut COUNTER_REQ_WINDOW_PASS: Counter = + create_counter!("crdt-window-request-pass", LOG_RATE); + inc_counter!(COUNTER_REQ_WINDOW_PASS, 1); return Some(out); } else { + static mut COUNTER_REQ_WINDOW_OUTSIDE: Counter = + create_counter!("crdt-window-request-outside", LOG_RATE); + inc_counter!(COUNTER_REQ_WINDOW_OUTSIDE, 1); info!( "requested ix {} != blob_ix {}, outside window!", ix, blob_ix ); } } else { + static mut COUNTER_REQ_WINDOW_FAIL: Counter = + create_counter!("crdt-window-request-fail", LOG_RATE); + inc_counter!(COUNTER_REQ_WINDOW_FAIL, 1); assert!(window.read().unwrap()[pos].is_none()); info!( "{:x}: failed RequestWindowIndex {:x} {} {}", @@ -971,6 +990,9 @@ impl Crdt { //TODO verify from is signed obj.write().unwrap().insert(&from); let me = obj.read().unwrap().my_data().clone(); + static mut COUNTER_REQ_WINDOW: Counter = + create_counter!("crdt-window-request-recv", LOG_RATE); + inc_counter!(COUNTER_REQ_WINDOW, 1); trace!( "{:x}:received RequestWindowIndex {:x} {} ", me.debug_id(), diff --git a/src/packet.rs b/src/packet.rs index 32a3390f76015b..f01db16b270a1f 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -12,7 +12,6 @@ use std::mem::size_of; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; use std::sync::atomic::AtomicUsize; use std::sync::{Arc, Mutex, RwLock}; -use std::time::Instant; pub type SharedPackets = Arc>; pub type SharedBlob = Arc>; @@ -20,6 +19,7 @@ pub type SharedBlobs = VecDeque; pub type PacketRecycler = Recycler; pub type BlobRecycler = Recycler; +const LOG_RATE: usize = 10; pub const NUM_PACKETS: usize = 1024 * 8; pub const BLOB_SIZE: usize = 64 * 1024; pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - BLOB_HEADER_SIZE; @@ -188,7 +188,7 @@ impl Recycler { impl Packets { fn run_read_from(&mut self, socket: &UdpSocket) -> Result { - static mut COUNTER: Counter = create_counter!("packets", 10); + static mut COUNTER: Counter = create_counter!("packets", LOG_RATE); self.packets.resize(NUM_PACKETS, Packet::default()); let mut i = 0; //DOCUMENTED SIDE-EFFECT @@ -198,13 +198,12 @@ impl Packets { // * read until it fails // * set it back to blocking before returning socket.set_nonblocking(false)?; - let mut start = Instant::now(); for p in &mut self.packets { p.meta.size = 0; trace!("receiving on {}", socket.local_addr().unwrap()); match socket.recv_from(&mut p.data) { Err(_) if i > 0 => { - inc_counter!(COUNTER, i, start); + inc_counter!(COUNTER, i); debug!("got {:?} messages on {}", i, socket.local_addr().unwrap()); break; } @@ -216,7 +215,6 @@ impl Packets { p.meta.size = nrecv; p.meta.set_addr(&from); if i == 0 { - start = Instant::now(); socket.set_nonblocking(true)?; } } diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index 1db146a0716333..7d56ccd7e64e0c 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -2,6 +2,7 @@ use bank::Bank; use bincode::serialize; +use counter::Counter; use crdt::Crdt; use ledger; use packet::BlobRecycler; @@ -10,6 +11,7 @@ use service::Service; use signature::KeyPair; use std::collections::VecDeque; use std::net::UdpSocket; +use std::sync::atomic::AtomicUsize; use std::sync::mpsc::channel; use std::sync::mpsc::RecvTimeoutError; use std::sync::{Arc, RwLock}; @@ -25,6 +27,7 @@ pub struct ReplicateStage { } const VOTE_TIMEOUT_MS: u64 = 1000; +const LOG_RATE: usize = 10; impl ReplicateStage { /// Process entry blobs, already in order @@ -46,6 +49,12 @@ impl ReplicateStage { let blobs_len = blobs.len(); let entries = ledger::reconstruct_entries_from_blobs(blobs.clone())?; let votes = entries_to_votes(&entries); + + static mut COUNTER_REPLICATE: Counter = create_counter!("replicate-transactions", LOG_RATE); + inc_counter!( + COUNTER_REPLICATE, + entries.iter().map(|x| x.transactions.len()).sum() + ); let res = bank.process_entries(entries); if res.is_err() { error!("process_entries {} {:?}", blobs_len, res); diff --git a/src/sigverify.rs b/src/sigverify.rs index 401f6e5977dc46..d6fb66fe24acb7 100644 --- a/src/sigverify.rs +++ b/src/sigverify.rs @@ -8,7 +8,6 @@ use counter::Counter; use packet::{Packet, SharedPackets}; use std::mem::size_of; use std::sync::atomic::AtomicUsize; -use std::time::Instant; use transaction::{PUB_KEY_OFFSET, SIGNED_DATA_OFFSET, SIG_OFFSET}; pub const TX_OFFSET: usize = 0; @@ -71,7 +70,6 @@ fn batch_size(batches: &Vec) -> usize { pub fn ed25519_verify(batches: &Vec) -> Vec> { use rayon::prelude::*; static mut COUNTER: Counter = create_counter!("ed25519_verify", 1); - let start = Instant::now(); let count = batch_size(batches); info!("CPU ECDSA for {}", batch_size(batches)); let rv = batches @@ -85,7 +83,7 @@ pub fn ed25519_verify(batches: &Vec) -> Vec> { .collect() }) .collect(); - inc_counter!(COUNTER, count, start); + inc_counter!(COUNTER, count); rv } @@ -93,7 +91,6 @@ pub fn ed25519_verify(batches: &Vec) -> Vec> { pub fn ed25519_verify(batches: &Vec) -> Vec> { use packet::PACKET_DATA_SIZE; static mut COUNTER: Counter = create_counter!("ed25519_verify_cuda", 1); - let start = Instant::now(); let count = batch_size(batches); info!("CUDA ECDSA for {}", batch_size(batches)); let mut out = Vec::new(); @@ -153,7 +150,7 @@ pub fn ed25519_verify(batches: &Vec) -> Vec> { num += 1; } } - inc_counter!(COUNTER, count, start); + inc_counter!(COUNTER, count); rvs } diff --git a/src/streamer.rs b/src/streamer.rs index cf3dc4f9f6449f..a358c0f3dd43b7 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -1,5 +1,6 @@ //! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets. //! +use counter::Counter; use crdt::{Crdt, CrdtError, ReplicatedData}; #[cfg(feature = "erasure")] use erasure; @@ -11,12 +12,13 @@ use std::cmp; use std::collections::VecDeque; use std::mem; use std::net::{SocketAddr, UdpSocket}; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, RwLock}; use std::thread::{Builder, JoinHandle}; use std::time::Duration; +const LOG_RATE: usize = 10; pub const WINDOW_SIZE: u64 = 2 * 1024; pub type PacketReceiver = Receiver; pub type PacketSender = Sender; @@ -217,6 +219,9 @@ fn repair_window( let reqs = find_next_missing(locked_window, crdt, consumed, received)?; trace!("{:x}: repair_window missing: {}", debug_id, reqs.len()); if reqs.len() > 0 { + static mut COUNTER_REPAIR: Counter = + create_counter!("streamer-repair_window-repair", LOG_RATE); + inc_counter!(COUNTER_REPAIR, reqs.len()); debug!( "{:x}: repair_window counter times: {} consumed: {} received: {} missing: {}", debug_id, @@ -259,6 +264,8 @@ fn recv_window( while let Ok(mut nq) = r.try_recv() { dq.append(&mut nq) } + static mut COUNTER_RECV: Counter = create_counter!("streamer-recv_window-recv", LOG_RATE); + inc_counter!(COUNTER_RECV, dq.len()); debug!( "{:x}: RECV_WINDOW {} {}: got packets {}", debug_id, @@ -268,7 +275,7 @@ fn recv_window( ); { //retransmit all leader blocks - let mut retransmitq = VecDeque::new(); + let mut retransmit_queue = VecDeque::new(); if let Some(leader) = maybe_leader { for b in &dq { let p = b.read().expect("'b' read lock in fn recv_window"); @@ -297,25 +304,28 @@ fn recv_window( mnv.meta.size = sz; mnv.data[..sz].copy_from_slice(&p.data[..sz]); } - retransmitq.push_back(nv); + retransmit_queue.push_back(nv); } } } else { warn!("{:x}: no leader to retransmit from", debug_id); } - if !retransmitq.is_empty() { + if !retransmit_queue.is_empty() { debug!( "{:x}: RECV_WINDOW {} {}: retransmit {}", debug_id, *consumed, *received, - retransmitq.len(), + retransmit_queue.len(), ); - retransmit.send(retransmitq)?; + static mut COUNTER_RETRANSMIT: Counter = + create_counter!("streamer-recv_window-retransmit", LOG_RATE); + inc_counter!(COUNTER_RETRANSMIT, retransmit_queue.len()); + retransmit.send(retransmit_queue)?; } } //send a contiguous set of blocks - let mut contq = VecDeque::new(); + let mut consume_queue = VecDeque::new(); while let Some(b) = dq.pop_front() { let (pix, meta_size) = { let p = b.write().expect("'b' write lock in fn recv_window"); @@ -386,7 +396,7 @@ fn recv_window( } } if !is_coding { - contq.push_back(window[k].clone().expect("clone in fn recv_window")); + consume_queue.push_back(window[k].clone().expect("clone in fn recv_window")); *consumed += 1; } else { #[cfg(feature = "erasure")] @@ -416,17 +426,20 @@ fn recv_window( } } print_window(debug_id, locked_window, *consumed); - trace!("sending contq.len: {}", contq.len()); - if !contq.is_empty() { + trace!("sending consume_queue.len: {}", consume_queue.len()); + if !consume_queue.is_empty() { debug!( - "{:x}: RECV_WINDOW {} {}: forwarding contq {}", + "{:x}: RECV_WINDOW {} {}: forwarding consume_queue {}", debug_id, *consumed, *received, - contq.len(), + consume_queue.len(), ); - trace!("sending contq.len: {}", contq.len()); - s.send(contq)?; + trace!("sending consume_queue.len: {}", consume_queue.len()); + static mut COUNTER_CONSUME: Counter = + create_counter!("streamer-recv_window-consume", LOG_RATE); + inc_counter!(COUNTER_CONSUME, consume_queue.len()); + s.send(consume_queue)?; } Ok(()) } @@ -592,6 +605,9 @@ fn broadcast( // Index the blobs Crdt::index_blobs(&me, &blobs, receive_index)?; // keep the cache of blobs that are broadcast + static mut COUNTER_BROADCAST: Counter = + create_counter!("streamer-broadcast-sent", LOG_RATE); + inc_counter!(COUNTER_BROADCAST, blobs.len()); { let mut win = window.write().unwrap(); assert!(blobs.len() <= win.len());