diff --git a/src/crdt.rs b/src/crdt.rs index fd71c15bed07a8..f77835e4aa818d 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -34,9 +34,9 @@ use std::net::{IpAddr, SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::{sleep, Builder, JoinHandle}; -use std::time::Duration; +use std::time::{Duration, Instant}; use streamer::{BlobReceiver, BlobSender, SharedWindow, WindowIndex}; -use timing::timestamp; +use timing::{duration_as_ms, timestamp}; use transaction::Vote; /// milliseconds we sleep for between gossip requests @@ -417,7 +417,8 @@ impl Crdt { self.insert_vote(&v.0, &v.1, v.2); } } - pub fn insert(&mut self, v: &NodeInfo) { + + pub fn insert(&mut self, v: &NodeInfo) -> usize { // TODO check that last_verified types are always increasing //update the peer table if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) { @@ -436,8 +437,8 @@ impl Crdt { self.update_index += 1; let _ = self.table.insert(v.id, v.clone()); let _ = self.local.insert(v.id, self.update_index); - inc_new_counter_info!("crdt-update-count", 1); self.update_liveness(v.id); + 1 } else { trace!( "{:x}: INSERT FAILED data: {:x} new.version: {} me.version: {}", @@ -446,6 +447,7 @@ impl Crdt { v.version, self.table[&v.id].version ); + 0 } } @@ -901,9 +903,11 @@ impl Crdt { trace!("got updates {}", data.len()); // TODO we need to punish/spam resist here // sig verify the whole update and slash anyone who sends a bad update + let mut insert_total = 0; for v in data { - self.insert(&v); + insert_total += self.insert(&v); } + inc_new_counter_info!("crdt-update-count", insert_total); for (pk, external_remote_index) in external_liveness { let remote_entry = if let Some(v) = self.remote.get(pk) { @@ -1118,6 +1122,7 @@ impl Crdt { } } Protocol::ReceiveUpdates(from, update_index, data, external_liveness) => { + let now = Instant::now(); trace!( "ReceivedUpdates from={:x} update_index={} len={}", make_debug_id(&from), @@ -1127,9 +1132,16 @@ impl Crdt { obj.write() .expect("'obj' write lock in ReceiveUpdates") .apply_updates(from, update_index, &data, &external_liveness); + + report_time_spent( + "ReceiveUpdates", + &now.elapsed(), + &format!(" len: {}", data.len()), + ); None } Protocol::RequestWindowIndex(from, ix) => { + let now = Instant::now(); //TODO this doesn't depend on CRDT module, can be moved //but we are using the listen thread to service these request //TODO verify from is signed @@ -1152,7 +1164,14 @@ impl Crdt { inc_new_counter_info!("crdt-window-request-address-eq", 1); return None; } - Self::run_window_request(&window, ledger_window, &me, &from, ix, blob_recycler) + let res = + Self::run_window_request(&window, ledger_window, &me, &from, ix, blob_recycler); + report_time_spent( + "RequestWindowIndex", + &now.elapsed(), + &format!(" ix: {}", ix), + ); + res } } } @@ -1343,6 +1362,13 @@ impl TestNode { } } +fn report_time_spent(label: &str, time: &Duration, extra: &str) { + let count = duration_as_ms(time); + if count > 5 { + info!("{} took: {} ms {}", label, count, extra); + } +} + #[cfg(test)] mod tests { use crdt::{ diff --git a/src/streamer.rs b/src/streamer.rs index 7b29c35ea1b27c..936edc25c3372d 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -17,7 +17,8 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, RwLock}; use std::thread::{Builder, JoinHandle}; -use std::time::Duration; +use std::time::{Duration, Instant}; +use timing::duration_as_ms; pub const WINDOW_SIZE: u64 = 2 * 1024; pub type PacketReceiver = Receiver; @@ -250,7 +251,7 @@ fn repair_window( trace!("{:x}: repair_window missing: {}", debug_id, reqs.len()); if !reqs.is_empty() { inc_new_counter_info!("streamer-repair_window-repair", reqs.len()); - debug!( + info!( "{:x}: repair_window counter times: {} consumed: {} highest_lost: {} missing: {}", debug_id, *times, @@ -496,7 +497,8 @@ fn recv_window( while let Ok(mut nq) = r.try_recv() { dq.append(&mut nq) } - inc_new_counter_info!("streamer-recv_window-recv", dq.len()); + let now = Instant::now(); + inc_new_counter_info!("streamer-recv_window-recv", dq.len(), 100); debug!( "{:x}: RECV_WINDOW {} {}: got packets {}", debug_id, @@ -515,6 +517,7 @@ fn recv_window( retransmit, )?; + let mut pixs = Vec::new(); //send a contiguous set of blocks let mut consume_queue = VecDeque::new(); while let Some(b) = dq.pop_front() { @@ -522,6 +525,7 @@ fn recv_window( let p = b.write().expect("'b' write lock in fn recv_window"); (p.get_index()?, p.meta.size) }; + pixs.push(pix); if !blob_idx_in_window(debug_id, pix, *consumed, received) { recycler.recycle(b); @@ -543,10 +547,14 @@ fn recv_window( if log_enabled!(Level::Trace) { trace!("{}", print_window(debug_id, window, *consumed)); } - trace!( - "{:x}: sending consume_queue.len: {}", + info!( + "{:x}: consumed: {} received: {} sending consume.len: {} pixs: {:?} took {} ms", debug_id, - consume_queue.len() + *consumed, + *received, + consume_queue.len(), + pixs, + duration_as_ms(&now.elapsed()) ); if !consume_queue.is_empty() { debug!(