Skip to content

Commit

Permalink
Enable Crdt debug messages to debug validators
Browse files Browse the repository at this point in the history
  • Loading branch information
sakridge committed Aug 8, 2018
1 parent a6857db commit 8331aab
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 12 deletions.
38 changes: 32 additions & 6 deletions src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ use std::net::{IpAddr, SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{sleep, Builder, JoinHandle};
use std::time::Duration;
use std::time::{Duration, Instant};
use streamer::{BlobReceiver, BlobSender, SharedWindow, WindowIndex};
use timing::timestamp;
use timing::{duration_as_ms, timestamp};
use transaction::Vote;

/// milliseconds we sleep for between gossip requests
Expand Down Expand Up @@ -417,7 +417,8 @@ impl Crdt {
self.insert_vote(&v.0, &v.1, v.2);
}
}
pub fn insert(&mut self, v: &NodeInfo) {

pub fn insert(&mut self, v: &NodeInfo) -> usize {
// TODO check that last_verified types are always increasing
//update the peer table
if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) {
Expand All @@ -436,8 +437,8 @@ impl Crdt {
self.update_index += 1;
let _ = self.table.insert(v.id, v.clone());
let _ = self.local.insert(v.id, self.update_index);
inc_new_counter_info!("crdt-update-count", 1);
self.update_liveness(v.id);
1
} else {
trace!(
"{:x}: INSERT FAILED data: {:x} new.version: {} me.version: {}",
Expand All @@ -446,6 +447,7 @@ impl Crdt {
v.version,
self.table[&v.id].version
);
0
}
}

Expand Down Expand Up @@ -901,9 +903,11 @@ impl Crdt {
trace!("got updates {}", data.len());
// TODO we need to punish/spam resist here
// sig verify the whole update and slash anyone who sends a bad update
let mut insert_total = 0;
for v in data {
self.insert(&v);
insert_total += self.insert(&v);
}
inc_new_counter_info!("crdt-update-count", insert_total);

for (pk, external_remote_index) in external_liveness {
let remote_entry = if let Some(v) = self.remote.get(pk) {
Expand Down Expand Up @@ -1118,6 +1122,7 @@ impl Crdt {
}
}
Protocol::ReceiveUpdates(from, update_index, data, external_liveness) => {
let now = Instant::now();
trace!(
"ReceivedUpdates from={:x} update_index={} len={}",
make_debug_id(&from),
Expand All @@ -1127,9 +1132,16 @@ impl Crdt {
obj.write()
.expect("'obj' write lock in ReceiveUpdates")
.apply_updates(from, update_index, &data, &external_liveness);

report_time_spent(
"ReceiveUpdates",
&now.elapsed(),
&format!(" len: {}", data.len()),
);
None
}
Protocol::RequestWindowIndex(from, ix) => {
let now = Instant::now();
//TODO this doesn't depend on CRDT module, can be moved
//but we are using the listen thread to service these request
//TODO verify from is signed
Expand All @@ -1152,7 +1164,14 @@ impl Crdt {
inc_new_counter_info!("crdt-window-request-address-eq", 1);
return None;
}
Self::run_window_request(&window, ledger_window, &me, &from, ix, blob_recycler)
let res =
Self::run_window_request(&window, ledger_window, &me, &from, ix, blob_recycler);
report_time_spent(
"RequestWindowIndex",
&now.elapsed(),
&format!(" ix: {}", ix),
);
res
}
}
}
Expand Down Expand Up @@ -1343,6 +1362,13 @@ impl TestNode {
}
}

fn report_time_spent(label: &str, time: &Duration, extra: &str) {
let count = duration_as_ms(time);
if count > 5 {
info!("{} took: {} ms {}", label, count, extra);
}
}

#[cfg(test)]
mod tests {
use crdt::{
Expand Down
20 changes: 14 additions & 6 deletions src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, RwLock};
use std::thread::{Builder, JoinHandle};
use std::time::Duration;
use std::time::{Duration, Instant};
use timing::duration_as_ms;

pub const WINDOW_SIZE: u64 = 2 * 1024;
pub type PacketReceiver = Receiver<SharedPackets>;
Expand Down Expand Up @@ -250,7 +251,7 @@ fn repair_window(
trace!("{:x}: repair_window missing: {}", debug_id, reqs.len());
if !reqs.is_empty() {
inc_new_counter_info!("streamer-repair_window-repair", reqs.len());
debug!(
info!(
"{:x}: repair_window counter times: {} consumed: {} highest_lost: {} missing: {}",
debug_id,
*times,
Expand Down Expand Up @@ -496,7 +497,8 @@ fn recv_window(
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq)
}
inc_new_counter_info!("streamer-recv_window-recv", dq.len());
let now = Instant::now();
inc_new_counter_info!("streamer-recv_window-recv", dq.len(), 100);
debug!(
"{:x}: RECV_WINDOW {} {}: got packets {}",
debug_id,
Expand All @@ -515,13 +517,15 @@ fn recv_window(
retransmit,
)?;

let mut pixs = Vec::new();
//send a contiguous set of blocks
let mut consume_queue = VecDeque::new();
while let Some(b) = dq.pop_front() {
let (pix, meta_size) = {
let p = b.write().expect("'b' write lock in fn recv_window");
(p.get_index()?, p.meta.size)
};
pixs.push(pix);

if !blob_idx_in_window(debug_id, pix, *consumed, received) {
recycler.recycle(b);
Expand All @@ -543,10 +547,14 @@ fn recv_window(
if log_enabled!(Level::Trace) {
trace!("{}", print_window(debug_id, window, *consumed));
}
trace!(
"{:x}: sending consume_queue.len: {}",
info!(
"{:x}: consumed: {} received: {} sending consume.len: {} pixs: {:?} took {} ms",
debug_id,
consume_queue.len()
*consumed,
*received,
consume_queue.len(),
pixs,
duration_as_ms(&now.elapsed())
);
if !consume_queue.is_empty() {
debug!(
Expand Down

0 comments on commit 8331aab

Please sign in to comment.