diff --git a/src/bin/bench-tps.rs b/src/bin/bench-tps.rs index 838e789913f139..6647ac5a75fda8 100644 --- a/src/bin/bench-tps.rs +++ b/src/bin/bench-tps.rs @@ -511,11 +511,7 @@ fn main() { println!(" Node address | Node identifier"); println!("----------------------+------------------"); for node in &validators { - println!( - " {:20} | {:16x}", - node.contact_info.tpu.to_string(), - node.debug_id() - ); + println!(" {:20} | {}", node.contact_info.tpu.to_string(), node.id); } println!("Nodes: {}", validators.len()); diff --git a/src/broadcast_stage.rs b/src/broadcast_stage.rs index 97a599c630c646..1ec06f15288170 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_stage.rs @@ -28,7 +28,7 @@ fn broadcast( transmit_index: &mut WindowIndex, receive_index: &mut u64, ) -> Result<()> { - let debug_id = node_info.debug_id(); + let id = node_info.id; let timer = Duration::new(1, 0); let mut dq = receiver.recv_timeout(timer)?; while let Ok(mut nq) = receiver.try_recv() { @@ -43,12 +43,12 @@ fn broadcast( let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE as usize).map(|x| x.to_vec()); if log_enabled!(Level::Trace) { - trace!("{}", window::print_window(debug_id, window, *receive_index)); + trace!("{}", window::print_window(&id, window, *receive_index)); } for mut blobs in blobs_chunked { let blobs_len = blobs.len(); - trace!("{:x}: broadcast blobs.len: {}", debug_id, blobs_len); + trace!("{}: broadcast blobs.len: {}", id, blobs_len); // Index the blobs window::index_blobs(node_info, &blobs, receive_index) @@ -64,8 +64,8 @@ fn broadcast( let pos = (ix % WINDOW_SIZE) as usize; if let Some(x) = mem::replace(&mut win[pos].data, None) { trace!( - "{:x} popped {} at {}", - debug_id, + "{} popped {} at {}", + id, x.read().unwrap().get_index().unwrap(), pos ); @@ -73,20 +73,20 @@ fn broadcast( } if let Some(x) = mem::replace(&mut win[pos].coding, None) { trace!( - "{:x} popped {} at {}", - debug_id, + "{} popped {} at {}", + id, x.read().unwrap().get_index().unwrap(), pos ); recycler.recycle(x, "broadcast-coding"); } - trace!("{:x} null {}", debug_id, pos); + trace!("{} null {}", id, pos); } while let Some(b) = blobs.pop() { let ix = b.read().unwrap().get_index().expect("blob index"); let pos = (ix % WINDOW_SIZE) as usize; - trace!("{:x} caching {} at {}", debug_id, ix, pos); + trace!("{} caching {} at {}", id, ix, pos); assert!(win[pos].data.is_none()); win[pos].data = Some(b); } @@ -96,7 +96,7 @@ fn broadcast( #[cfg(feature = "erasure")] { erasure::generate_coding( - debug_id, + &id, &mut window.write().unwrap(), recycler, *receive_index, diff --git a/src/crdt.rs b/src/crdt.rs index 789efb07b16a57..27dd0e0af9fa5c 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -13,7 +13,6 @@ //! //! Bank needs to provide an interface for us to query the stake weight use bincode::{deserialize, serialize}; -use byteorder::{LittleEndian, ReadBytesExt}; use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseWeightedPeerStrategy}; use counter::Counter; use hash::Hash; @@ -27,7 +26,6 @@ use result::{Error, Result}; use signature::{Keypair, KeypairUtil, Pubkey}; use std; use std::collections::HashMap; -use std::io::Cursor; use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; @@ -110,13 +108,6 @@ pub struct NodeInfo { pub ledger_state: LedgerState, } -fn make_debug_id(key: &Pubkey) -> u64 { - let buf: &[u8] = &key.as_ref(); - let mut rdr = Cursor::new(&buf[..8]); - rdr.read_u64::() - .expect("rdr.read_u64 in fn debug_id") -} - impl NodeInfo { pub fn new( id: Pubkey, @@ -156,9 +147,6 @@ impl NodeInfo { assert!(addr.ip().is_multicast()); Self::new(Keypair::new().pubkey(), addr, addr, addr, addr) } - pub fn debug_id(&self) -> u64 { - make_debug_id(&self.id) - } fn next_port(addr: &SocketAddr, nxt: u16) -> SocketAddr { let mut nxt_addr = *addr; nxt_addr.set_port(addr.port() + nxt); @@ -252,9 +240,6 @@ impl Crdt { me.table.insert(node_info.id, node_info); Ok(me) } - pub fn debug_id(&self) -> u64 { - make_debug_id(&self.id) - } pub fn my_data(&self) -> &NodeInfo { &self.table[&self.id] } @@ -271,12 +256,7 @@ impl Crdt { pub fn set_leader(&mut self, key: Pubkey) -> () { let mut me = self.my_data().clone(); - warn!( - "{:x}: LEADER_UPDATE TO {:x} from {:x}", - me.debug_id(), - make_debug_id(&key), - make_debug_id(&me.leader_id), - ); + warn!("{}: LEADER_UPDATE TO {} from {}", me.id, key, me.leader_id); me.leader_id = key; me.version += 1; self.insert(&me); @@ -288,38 +268,23 @@ impl Crdt { pub fn insert_vote(&mut self, pubkey: &Pubkey, v: &Vote, last_id: Hash) { if self.table.get(pubkey).is_none() { - warn!( - "{:x}: VOTE for unknown id: {:x}", - self.debug_id(), - make_debug_id(pubkey) - ); + warn!("{}: VOTE for unknown id: {}", self.id, pubkey); return; } if v.contact_info_version > self.table[pubkey].contact_info.version { warn!( - "{:x}: VOTE for new address version from: {:x} ours: {} vote: {:?}", - self.debug_id(), - make_debug_id(pubkey), - self.table[pubkey].contact_info.version, - v, + "{}: VOTE for new address version from: {} ours: {} vote: {:?}", + self.id, pubkey, self.table[pubkey].contact_info.version, v, ); return; } if *pubkey == self.my_data().leader_id { - info!( - "{:x}: LEADER_VOTED! {:x}", - self.debug_id(), - make_debug_id(&pubkey) - ); + info!("{}: LEADER_VOTED! {}", self.id, pubkey); inc_new_counter_info!("crdt-insert_vote-leader_voted", 1); } if v.version <= self.table[pubkey].version { - debug!( - "{:x}: VOTE for old version: {:x}", - self.debug_id(), - make_debug_id(&pubkey) - ); + debug!("{}: VOTE for old version: {}", self.id, pubkey); self.update_liveness(*pubkey); return; } else { @@ -327,11 +292,7 @@ impl Crdt { data.version = v.version; data.ledger_state.last_id = last_id; - debug!( - "{:x}: INSERTING VOTE! for {:x}", - self.debug_id(), - data.debug_id() - ); + debug!("{}: INSERTING VOTE! for {}", self.id, data.id); self.update_liveness(data.id); self.insert(&data); } @@ -339,7 +300,7 @@ impl Crdt { pub fn insert_votes(&mut self, votes: &[(Pubkey, Vote, Hash)]) { inc_new_counter_info!("crdt-vote-count", votes.len()); if !votes.is_empty() { - info!("{:x}: INSERTING VOTES {}", self.debug_id(), votes.len()); + info!("{}: INSERTING VOTES {}", self.id, votes.len()); } for v in votes { self.insert_vote(&v.0, &v.1, v.2); @@ -352,12 +313,7 @@ impl Crdt { if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) { //somehow we signed a message for our own identity with a higher version than // we have stored ourselves - trace!( - "{:x}: insert v.id: {:x} version: {}", - self.debug_id(), - v.debug_id(), - v.version - ); + trace!("{}: insert v.id: {} version: {}", self.id, v.id, v.version); if self.table.get(&v.id).is_none() { inc_new_counter_info!("crdt-insert-new_entry", 1, 1); } @@ -369,9 +325,9 @@ impl Crdt { 1 } else { trace!( - "{:x}: INSERT FAILED data: {:x} new.version: {} me.version: {}", - self.debug_id(), - v.debug_id(), + "{}: INSERT FAILED data: {} new.version: {} me.version: {}", + self.id, + v.id, v.version, self.table[&v.id].version ); @@ -382,12 +338,7 @@ impl Crdt { fn update_liveness(&mut self, id: Pubkey) { //update the liveness table let now = timestamp(); - trace!( - "{:x} updating liveness {:x} to {}", - self.debug_id(), - make_debug_id(&id), - now - ); + trace!("{} updating liveness {} to {}", self.id, id, now); *self.alive.entry(id).or_insert(now) = now; } /// purge old validators @@ -412,13 +363,7 @@ impl Crdt { if k != self.id && (now - v) > limit { Some(k) } else { - trace!( - "{:x} purge skipped {:x} {} {}", - self.debug_id(), - make_debug_id(&k), - now - v, - limit - ); + trace!("{} purge skipped {} {} {}", self.id, k, now - v, limit); None } }) @@ -432,16 +377,12 @@ impl Crdt { self.remote.remove(id); self.local.remove(id); self.external_liveness.remove(id); - info!("{:x}: PURGE {:x}", self.debug_id(), make_debug_id(id)); + info!("{}: PURGE {}", self.id, id); for map in self.external_liveness.values_mut() { map.remove(id); } if *id == leader_id { - info!( - "{:x}: PURGE LEADER {:x}", - self.debug_id(), - make_debug_id(id), - ); + info!("{}: PURGE LEADER {}", self.id, id,); inc_new_counter_info!("crdt-purge-purged_leader", 1, 1); self.set_leader(Pubkey::default()); } @@ -463,19 +404,14 @@ impl Crdt { false } else if !(Self::is_valid_address(&v.contact_info.tvu)) { trace!( - "{:x}:broadcast skip not listening {:x} {}", - me.debug_id(), - v.debug_id(), + "{}:broadcast skip not listening {} {}", + me.id, + v.id, v.contact_info.tvu, ); false } else { - trace!( - "{:x}:broadcast node {:x} {}", - me.debug_id(), - v.debug_id(), - v.contact_info.tvu - ); + trace!("{}:broadcast node {} {}", me.id, v.id, v.contact_info.tvu); true } }) @@ -496,13 +432,13 @@ impl Crdt { received_index: u64, ) -> Result<()> { if broadcast_table.is_empty() { - warn!("{:x}:not enough peers in crdt table", me.debug_id()); + warn!("{}:not enough peers in crdt table", me.id); inc_new_counter_info!("crdt-broadcast-not_enough_peers_error", 1); Err(CrdtError::NoPeers)?; } trace!( - "{:x} transmit_index: {:?} received_index: {} broadcast_len: {}", - me.debug_id(), + "{} transmit_index: {:?} received_index: {} broadcast_len: {}", + me.id, *transmit_index, received_index, broadcast_table.len() @@ -521,8 +457,8 @@ impl Crdt { let w_idx = idx as usize % window_l.len(); trace!( - "{:x} broadcast order data w_idx {} br_idx {}", - me.debug_id(), + "{} broadcast order data w_idx {} br_idx {}", + me.id, w_idx, br_idx ); @@ -541,8 +477,8 @@ impl Crdt { } trace!( - "{:x} broadcast order coding w_idx: {} br_idx :{}", - me.debug_id(), + "{} broadcast order coding w_idx: {} br_idx :{}", + me.id, w_idx, br_idx, ); @@ -562,21 +498,21 @@ impl Crdt { let blob = bl.read().expect("blob read lock in streamer::broadcast"); //TODO profile this, may need multiple sockets for par_iter trace!( - "{:x}: BROADCAST idx: {} sz: {} to {:x},{} coding: {}", - me.debug_id(), + "{}: BROADCAST idx: {} sz: {} to {},{} coding: {}", + me.id, blob.get_index().unwrap(), blob.meta.size, - v.debug_id(), + v.id, v.contact_info.tvu, blob.is_coding() ); assert!(blob.meta.size <= BLOB_SIZE); let e = s.send_to(&blob.data[..blob.meta.size], &v.contact_info.tvu); trace!( - "{:x}: done broadcast {} to {:x} {}", - me.debug_id(), + "{}: done broadcast {} to {} {}", + me.id, blob.meta.size, - v.debug_id(), + v.id, v.contact_info.tvu ); e @@ -642,10 +578,10 @@ impl Crdt { .par_iter() .map(|v| { debug!( - "{:x}: retransmit blob {} to {:x} {}", - me.debug_id(), + "{}: retransmit blob {} to {} {}", + me.id, rblob.get_index().unwrap(), - v.debug_id(), + v.id, v.contact_info.tvu, ); //TODO profile this, may need multiple sockets for par_iter @@ -742,21 +678,17 @@ impl Crdt { let choose_peer_result = choose_peer_strategy.choose_peer(options); if let Err(Error::CrdtError(CrdtError::NoPeers)) = &choose_peer_result { - trace!( - "crdt too small for gossip {:x} {}", - self.debug_id(), - self.table.len() - ); + trace!("crdt too small for gossip {} {}", self.id, self.table.len()); }; let v = choose_peer_result?; let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0); let req = Protocol::RequestUpdates(remote_update_index, self.my_data().clone()); trace!( - "created gossip request from {:x} {:?} to {:x} {}", - self.debug_id(), + "created gossip request from {} {:?} to {} {}", + self.id, self.my_data(), - v.debug_id(), + v.id, v.contact_info.ncp ); @@ -806,17 +738,11 @@ impl Crdt { for v in cur { let cnt = table.entry(&v.leader_id).or_insert(0); *cnt += 1; - trace!("leader {:x} {}", make_debug_id(&v.leader_id), *cnt); + trace!("leader {} {}", v.leader_id, *cnt); } let mut sorted: Vec<(&Pubkey, usize)> = table.into_iter().collect(); - let my_id = self.debug_id(); for x in &sorted { - trace!( - "{:x}: sorted leaders {:x} votes: {}", - my_id, - make_debug_id(&x.0), - x.1 - ); + trace!("{}: sorted leaders {} votes: {}", self.id, x.0, x.1); } sorted.sort_by_key(|a| a.1); sorted.last().map(|a| *a.0) @@ -980,9 +906,9 @@ impl Crdt { inc_new_counter_info!("crdt-window-request-fail", 1); trace!( - "{:x}: failed RequestWindowIndex {:x} {} {}", - me.debug_id(), - from.debug_id(), + "{}: failed RequestWindowIndex {} {} {}", + me.id, + from.id, ix, pos, ); @@ -1025,11 +951,11 @@ impl Crdt { match request { // TODO sigverify these Protocol::RequestUpdates(version, mut from) => { - let debug_id = me.read().unwrap().debug_id(); + let id = me.read().unwrap().id; trace!( - "{:x} RequestUpdates {} from {}, professing to be {}", - debug_id, + "{} RequestUpdates {} from {}, professing to be {}", + id, version, from_addr, from.contact_info.ncp @@ -1037,9 +963,9 @@ impl Crdt { if from.id == me.read().unwrap().id { warn!( - "RequestUpdates ignored, I'm talking to myself: me={:x} remoteme={:x}", - me.read().unwrap().debug_id(), - from.debug_id() + "RequestUpdates ignored, I'm talking to myself: me={} remoteme={}", + me.read().unwrap().id, + from.id ); inc_new_counter_info!("crdt-window-request-loopback", 1); return None; @@ -1080,8 +1006,8 @@ impl Crdt { if len < 1 { let me = me.read().unwrap(); trace!( - "no updates me {:x} ix {} since {}", - debug_id, + "no updates me {} ix {} since {}", + id, me.update_index, version ); @@ -1091,10 +1017,10 @@ impl Crdt { if let Ok(r) = to_blob(rsp, from.contact_info.ncp, &blob_recycler) { trace!( - "sending updates me {:x} len {} to {:x} {}", - debug_id, + "sending updates me {} len {} to {} {}", + id, len, - from.debug_id(), + from.id, from.contact_info.ncp, ); Some(r) @@ -1107,8 +1033,8 @@ impl Crdt { Protocol::ReceiveUpdates(from, update_index, data, external_liveness) => { let now = Instant::now(); trace!( - "ReceivedUpdates from={:x} update_index={} len={}", - make_debug_id(&from), + "ReceivedUpdates from={} update_index={} len={}", + from, update_index, data.len() ); @@ -1133,9 +1059,9 @@ impl Crdt { if from.id == me.read().unwrap().id { warn!( - "{:x}: Ignored received RequestWindowIndex from ME {:x} {} ", - me.read().unwrap().debug_id(), - from.debug_id(), + "{}: Ignored received RequestWindowIndex from ME {} {} ", + me.read().unwrap().id, + from.id, ix, ); inc_new_counter_info!("crdt-window-request-address-eq", 1); @@ -1145,12 +1071,7 @@ impl Crdt { me.write().unwrap().insert(&from); let me = me.read().unwrap().my_data().clone(); inc_new_counter_info!("crdt-window-request-recv", 1); - trace!( - "{:x}: received RequestWindowIndex {:x} {} ", - me.debug_id(), - from.debug_id(), - ix, - ); + trace!("{}: received RequestWindowIndex {} {} ", me.id, from.id, ix,); let res = Self::run_window_request( &from, &from_addr, @@ -1229,8 +1150,8 @@ impl Crdt { if e.is_err() { let me = me.read().unwrap(); debug!( - "{:x}: run_listen timeout, table size: {}", - me.debug_id(), + "{}: run_listen timeout, table size: {}", + me.id, me.table.len() ); } diff --git a/src/erasure.rs b/src/erasure.rs index 41270603580807..c4d0c7c7bdbee0 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -1,5 +1,6 @@ // Support erasure coding use packet::{BlobRecycler, SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE}; +use signature::Pubkey; use std::cmp; use std::mem; use std::result; @@ -214,7 +215,7 @@ pub fn decode_blocks( // // pub fn generate_coding( - debug_id: u64, + id: &Pubkey, window: &mut [WindowSlot], recycler: &BlobRecycler, receive_index: u64, @@ -234,8 +235,8 @@ pub fn generate_coding( break; } info!( - "generate_coding {:x} start: {} end: {} start_idx: {} num_blobs: {}", - debug_id, block_start, block_end, start_idx, num_blobs + "generate_coding {} start: {} end: {} start_idx: {} num_blobs: {}", + id, block_start, block_end, start_idx, num_blobs ); let mut max_data_size = 0; @@ -243,12 +244,12 @@ pub fn generate_coding( // find max_data_size, maybe bail if not all the data is here for i in block_start..block_end { let n = i % window.len(); - trace!("{:x} window[{}] = {:?}", debug_id, n, window[n].data); + trace!("{} window[{}] = {:?}", id, n, window[n].data); if let Some(b) = &window[n].data { max_data_size = cmp::max(b.read().unwrap().meta.size, max_data_size); } else { - trace!("{:x} data block is null @ {}", debug_id, n); + trace!("{} data block is null @ {}", id, n); return Ok(()); } } @@ -256,7 +257,7 @@ pub fn generate_coding( // round up to the nearest jerasure alignment max_data_size = align!(max_data_size, JERASURE_ALIGN); - trace!("{:x} max_data_size: {}", debug_id, max_data_size); + trace!("{} max_data_size: {}", id, max_data_size); let mut data_blobs = Vec::with_capacity(NUM_DATA); for i in block_start..block_end { @@ -299,8 +300,8 @@ pub fn generate_coding( let id = data_rl.get_id().unwrap(); trace!( - "{:x} copying index {} id {:?} from data to coding", - debug_id, + "{} copying index {} id {:?} from data to coding", + id, index, id ); @@ -324,7 +325,7 @@ pub fn generate_coding( .iter() .enumerate() .map(|(i, l)| { - trace!("{:x} i: {} data: {}", debug_id, i, l.data[0]); + trace!("{} i: {} data: {}", id, i, l.data[0]); &l.data[..max_data_size] }) .collect(); @@ -338,15 +339,15 @@ pub fn generate_coding( .iter_mut() .enumerate() .map(|(i, l)| { - trace!("{:x} i: {} coding: {}", debug_id, i, l.data[0],); + trace!("{} i: {} coding: {}", id, i, l.data[0],); &mut l.data_mut()[..max_data_size] }) .collect(); generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?; debug!( - "{:x} start_idx: {} data: {}:{} coding: {}:{}", - debug_id, start_idx, block_start, block_end, coding_start, block_end + "{} start_idx: {} data: {}:{} coding: {}:{}", + id, start_idx, block_start, block_end, coding_start, block_end ); block_start = block_end; } @@ -358,7 +359,7 @@ pub fn generate_coding( // true if slot is stale (i.e. has the wrong index), old blob is flushed // false if slot has a blob with the right index fn is_missing( - debug_id: u64, + id: &Pubkey, idx: u64, window_slot: &mut Option, recycler: &BlobRecycler, @@ -367,14 +368,14 @@ fn is_missing( if let Some(blob) = mem::replace(window_slot, None) { let blob_idx = blob.read().unwrap().get_index().unwrap(); if blob_idx == idx { - trace!("recover {:x}: idx: {} good {}", debug_id, idx, c_or_d); + trace!("recover {}: idx: {} good {}", id, idx, c_or_d); // put it back mem::replace(window_slot, Some(blob)); false } else { trace!( - "recover {:x}: idx: {} old {} {}, recycling", - debug_id, + "recover {}: idx: {} old {} {}, recycling", + id, idx, c_or_d, blob_idx, @@ -384,7 +385,7 @@ fn is_missing( true } } else { - trace!("recover {:x}: idx: {} None {}", debug_id, idx, c_or_d); + trace!("recover {}: idx: {} None {}", id, idx, c_or_d); // nothing there true } @@ -395,7 +396,7 @@ fn is_missing( // if a blob is stale, remove it from the window slot // side effect: block will be cleaned of old blobs fn find_missing( - debug_id: u64, + id: &Pubkey, block_start_idx: u64, block_start: usize, window: &mut [WindowSlot], @@ -411,12 +412,11 @@ fn find_missing( let idx = (i - block_start) as u64 + block_start_idx; let n = i % window.len(); - if is_missing(debug_id, idx, &mut window[n].data, recycler, "data") { + if is_missing(id, idx, &mut window[n].data, recycler, "data") { data_missing += 1; } - if i >= coding_start && is_missing(debug_id, idx, &mut window[n].coding, recycler, "coding") - { + if i >= coding_start && is_missing(id, idx, &mut window[n].coding, recycler, "coding") { coding_missing += 1; } } @@ -430,7 +430,7 @@ fn find_missing( // any of the blocks, the block is skipped. // Side effect: old blobs in a block are None'd pub fn recover( - debug_id: u64, + id: &Pubkey, recycler: &BlobRecycler, window: &mut [WindowSlot], start_idx: u64, @@ -444,8 +444,8 @@ pub fn recover( let coding_start = block_start + NUM_DATA - NUM_CODING; let block_end = block_start + NUM_DATA; trace!( - "recover {:x}: block_start_idx: {} block_start: {} coding_start: {} block_end: {}", - debug_id, + "recover {}: block_start_idx: {} block_start: {} coding_start: {} block_end: {}", + id, block_start_idx, block_start, coding_start, @@ -453,7 +453,7 @@ pub fn recover( ); let (data_missing, coding_missing) = - find_missing(debug_id, block_start_idx, block_start, window, recycler); + find_missing(id, block_start_idx, block_start, window, recycler); // if we're not missing data, or if we have too much missin but have enough coding if data_missing == 0 { @@ -463,8 +463,8 @@ pub fn recover( if (data_missing + coding_missing) > NUM_CODING { trace!( - "recover {:x}: start: {} skipping recovery data: {} coding: {}", - debug_id, + "recover {}: start: {} skipping recovery data: {} coding: {}", + id, block_start, data_missing, coding_missing @@ -474,8 +474,8 @@ pub fn recover( } trace!( - "recover {:x}: recovering: data: {} coding: {}", - debug_id, + "recover {}: recovering: data: {} coding: {}", + id, data_missing, coding_missing ); @@ -492,7 +492,7 @@ pub fn recover( if let Some(b) = window[j].data.clone() { if meta.is_none() { meta = Some(b.read().unwrap().meta.clone()); - trace!("recover {:x} meta at {} {:?}", debug_id, j, meta); + trace!("recover {} meta at {} {:?}", id, j, meta); } blobs.push(b); } else { @@ -509,8 +509,8 @@ pub fn recover( if size.is_none() { size = Some(b.read().unwrap().meta.size - BLOB_HEADER_SIZE); trace!( - "{:x} recover size {} from {}", - debug_id, + "{} recover size {} from {}", + id, size.unwrap(), i as u64 + block_start_idx ); @@ -540,12 +540,7 @@ pub fn recover( // marks end of erasures erasures.push(-1); - trace!( - "erasures[]: {:x} {:?} data_size: {}", - debug_id, - erasures, - size, - ); + trace!("erasures[]: {} {:?} data_size: {}", id, erasures, size,); //lock everything for write for b in &blobs { locks.push(b.write().expect("'locks' arr in pb fn recover")); @@ -556,16 +551,16 @@ pub fn recover( let mut data_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_DATA); for (i, l) in locks.iter_mut().enumerate() { if i < NUM_DATA { - trace!("{:x} pushing data: {}", debug_id, i); + trace!("{} pushing data: {}", id, i); data_ptrs.push(&mut l.data[..size]); } else { - trace!("{:x} pushing coding: {}", debug_id, i); + trace!("{} pushing coding: {}", id, i); coding_ptrs.push(&mut l.data_mut()[..size]); } } trace!( - "{:x} coding_ptrs.len: {} data_ptrs.len {}", - debug_id, + "{} coding_ptrs.len: {} data_ptrs.len {}", + id, coding_ptrs.len(), data_ptrs.len() ); @@ -587,10 +582,7 @@ pub fn recover( data_size = locks[n].get_data_size().unwrap() as usize; data_size -= BLOB_HEADER_SIZE; if data_size > BLOB_DATA_SIZE { - error!( - "{:x} corrupt data blob[{}] data_size: {}", - debug_id, idx, data_size - ); + error!("{} corrupt data blob[{}] data_size: {}", id, idx, data_size); corrupt = true; } } else { @@ -600,8 +592,8 @@ pub fn recover( if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE { error!( - "{:x} corrupt coding blob[{}] data_size: {}", - debug_id, idx, data_size + "{} corrupt coding blob[{}] data_size: {}", + id, idx, data_size ); corrupt = true; } @@ -610,15 +602,15 @@ pub fn recover( locks[n].meta = meta.clone().unwrap(); locks[n].set_size(data_size); trace!( - "{:x} erasures[{}] ({}) size: {:x} data[0]: {}", - debug_id, + "{} erasures[{}] ({}) size: {} data[0]: {}", + id, *i, idx, data_size, locks[n].data()[0] ); } - assert!(!corrupt, " {:x} ", debug_id); + assert!(!corrupt, " {} ", id); Ok(()) } @@ -630,8 +622,7 @@ mod test { use logger; use packet::{BlobRecycler, BLOB_DATA_SIZE, BLOB_HEADER_SIZE, BLOB_SIZE}; use rand::{thread_rng, Rng}; - use signature::Keypair; - use signature::KeypairUtil; + use signature::{Keypair, KeypairUtil, Pubkey}; // use std::sync::{Arc, RwLock}; use window::{index_blobs, WindowSlot}; @@ -842,9 +833,10 @@ mod test { // Generate the coding blocks let mut index = (erasure::NUM_DATA + 2) as u64; + let id = Pubkey::default(); assert!( erasure::generate_coding( - 0, + &id, &mut window, &blob_recycler, offset as u64, @@ -871,7 +863,7 @@ mod test { // Recover it from coding assert!( erasure::recover( - 0, + &id, &blob_recycler, &mut window, (offset + WINDOW_SIZE) as u64, @@ -921,7 +913,7 @@ mod test { // Recover it from coding assert!( erasure::recover( - 0, + &id, &blob_recycler, &mut window, (offset + WINDOW_SIZE) as u64, @@ -967,7 +959,7 @@ mod test { // Recover it from coding assert!( erasure::recover( - 0, + &id, &blob_recycler, &mut window, (offset + WINDOW_SIZE) as u64, diff --git a/src/vote_stage.rs b/src/vote_stage.rs index 85dfe58065670d..784a5360ef5944 100644 --- a/src/vote_stage.rs +++ b/src/vote_stage.rs @@ -11,7 +11,7 @@ use metrics; use packet::{BlobRecycler, SharedBlob}; use result::Result; use service::Service; -use signature::Keypair; +use signature::{Keypair, Pubkey}; use std::result; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; @@ -58,7 +58,7 @@ pub fn create_new_signed_vote_blob( } fn get_last_id_to_vote_on( - debug_id: u64, + id: &Pubkey, ids: &[Hash], bank: &Arc, now: u64, @@ -70,8 +70,8 @@ fn get_last_id_to_vote_on( //TODO(anatoly): this isn't stake based voting debug!( - "{:x}: valid_ids {}/{} {}", - debug_id, + "{}: valid_ids {}/{} {}", + id, valid_ids.len(), ids.len(), super_majority_index, @@ -112,7 +112,7 @@ fn get_last_id_to_vote_on( } pub fn send_leader_vote( - debug_id: u64, + id: &Pubkey, keypair: &Keypair, bank: &Arc, crdt: &Arc>, @@ -125,7 +125,7 @@ pub fn send_leader_vote( if now - *last_vote > VOTE_TIMEOUT_MS { let ids: Vec<_> = crdt.read().unwrap().valid_last_ids(); if let Ok((last_id, super_majority_timestamp)) = get_last_id_to_vote_on( - debug_id, + id, &ids, bank, now, @@ -139,10 +139,7 @@ pub fn send_leader_vote( let finality_ms = now - super_majority_timestamp; *last_valid_validator_timestamp = super_majority_timestamp; - debug!( - "{:x} leader_sent_vote finality: {} ms", - debug_id, finality_ms - ); + debug!("{} leader_sent_vote finality: {} ms", id, finality_ms); inc_new_counter_info!("vote_stage-leader_sent_vote", 1); bank.set_finality((now - *last_valid_validator_timestamp) as usize); @@ -329,7 +326,7 @@ pub mod tests { let mut last_vote: u64 = timing::timestamp() - VOTE_TIMEOUT_MS - 1; let mut last_valid_validator_timestamp = 0; let res = send_leader_vote( - 1234, + &mint.pubkey(), &mint.keypair(), &bank, &leader, @@ -369,7 +366,7 @@ pub mod tests { last_vote = timing::timestamp() - VOTE_TIMEOUT_MS - 1; let res = send_leader_vote( - 2345, + &Pubkey::default(), &mint.keypair(), &bank, &leader, @@ -417,7 +414,7 @@ pub mod tests { // see that we fail to have 2/3rds consensus assert!( get_last_id_to_vote_on( - 1234, + &Pubkey::default(), &ids, &bank, 0, @@ -430,7 +427,7 @@ pub mod tests { bank.register_entry_id(&ids[6]); let res = get_last_id_to_vote_on( - 1234, + &Pubkey::default(), &ids, &bank, 0, diff --git a/src/window.rs b/src/window.rs index 089ac4a4c19ec3..139fb36f09f6ea 100644 --- a/src/window.rs +++ b/src/window.rs @@ -111,7 +111,7 @@ fn repair_backoff(last: &mut u64, times: &mut usize, consumed: u64) -> bool { } fn repair_window( - debug_id: u64, + id: &Pubkey, window: &SharedWindow, crdt: &Arc>, recycler: &BlobRecycler, @@ -122,7 +122,7 @@ fn repair_window( ) -> Option)>> { //exponential backoff if !repair_backoff(last, times, consumed) { - trace!("{:x} !repair_backoff() times = {}", debug_id, times); + trace!("{} !repair_backoff() times = {}", id, times); return None; } @@ -135,8 +135,8 @@ fn repair_window( inc_new_counter_info!("streamer-repair_window-repair", reqs.len()); if log_enabled!(Level::Trace) { trace!( - "{:x}: repair_window counter times: {} consumed: {} highest_lost: {} missing: {}", - debug_id, + "{}: repair_window counter times: {} consumed: {} highest_lost: {} missing: {}", + id, *times, consumed, highest_lost, @@ -144,7 +144,7 @@ fn repair_window( ); for (to, _) in reqs.clone() { - trace!("{:x}: repair_window request to {}", debug_id, to); + trace!("{}: repair_window request to {}", id, to); } } Some(reqs) @@ -199,7 +199,7 @@ fn add_block_to_retransmit_queue( fn retransmit_all_leader_blocks( maybe_leader: Option, dq: &[SharedBlob], - debug_id: u64, + id: &Pubkey, recycler: &BlobRecycler, consumed: u64, received: u64, @@ -235,12 +235,12 @@ fn retransmit_all_leader_blocks( } } } else { - warn!("{:x}: no leader to retransmit from", debug_id); + warn!("{}: no leader to retransmit from", id); } if !retransmit_queue.is_empty() { trace!( - "{:x}: RECV_WINDOW {} {}: retransmit {}", - debug_id, + "{}: RECV_WINDOW {} {}: retransmit {}", + id, consumed, received, retransmit_queue.len(), @@ -255,7 +255,7 @@ fn retransmit_all_leader_blocks( /// starting from consumed is thereby formed, add that continuous /// range of blobs to a queue to be sent on to the next stage. /// -/// * `debug_id` - this node's id in a useful-for-debug format +/// * `id` - this node's id /// * `blob` - the blob to be processed into the window and rebroadcast /// * `pix` - the index of the blob, corresponds to /// the entry height of this blob @@ -266,7 +266,7 @@ fn retransmit_all_leader_blocks( /// * `consumed` - input/output, the entry-height to which this /// node has populated and rebroadcast entries fn process_blob( - debug_id: u64, + id: &Pubkey, blob: SharedBlob, pix: u64, consume_queue: &mut SharedBlobs, @@ -290,7 +290,7 @@ fn process_blob( // blob unless the incoming blob is a duplicate (based on idx) // returns whether the incoming is a duplicate blob fn insert_blob_is_dup( - debug_id: u64, + id: &Pubkey, blob: SharedBlob, pix: u64, window_slot: &mut Option, @@ -301,31 +301,24 @@ fn process_blob( let is_dup = old.read().unwrap().get_index().unwrap() == pix; recycler.recycle(old, "insert_blob_is_dup"); trace!( - "{:x}: occupied {} window slot {:}, is_dup: {}", - debug_id, + "{}: occupied {} window slot {:}, is_dup: {}", + id, c_or_d, pix, is_dup ); is_dup } else { - trace!("{:x}: empty {} window slot {:}", debug_id, c_or_d, pix); + trace!("{}: empty {} window slot {:}", id, c_or_d, pix); false } } // insert the new blob into the window, overwrite and recycle old (or duplicate) entry let is_duplicate = if is_coding { - insert_blob_is_dup( - debug_id, - blob, - pix, - &mut window[w].coding, - recycler, - "coding", - ) + insert_blob_is_dup(id, blob, pix, &mut window[w].coding, recycler, "coding") } else { - insert_blob_is_dup(debug_id, blob, pix, &mut window[w].data, recycler, "data") + insert_blob_is_dup(id, blob, pix, &mut window[w].data, recycler, "data") }; if is_duplicate { @@ -338,21 +331,21 @@ fn process_blob( #[cfg(feature = "erasure")] { if erasure::recover( - debug_id, + id, recycler, &mut window, *consumed, (*consumed % WINDOW_SIZE) as usize, ).is_err() { - trace!("{:x}: erasure::recover failed", debug_id); + trace!("{}: erasure::recover failed", id); } } // push all contiguous blobs into consumed queue, increment consumed loop { let k = (*consumed % WINDOW_SIZE) as usize; - trace!("{:x}: k: {} consumed: {}", debug_id, k, *consumed,); + trace!("{}: k: {} consumed: {}", id, k, *consumed,); if let Some(blob) = &window[k].data { if blob.read().unwrap().get_index().unwrap() < *consumed { @@ -368,14 +361,14 @@ fn process_blob( } } -fn blob_idx_in_window(debug_id: u64, pix: u64, consumed: u64, received: &mut u64) -> bool { +fn blob_idx_in_window(id: &Pubkey, pix: u64, consumed: u64, received: &mut u64) -> bool { // Prevent receive window from running over // Got a blob which has already been consumed, skip it // probably from a repair window request if pix < consumed { trace!( - "{:x}: received: {} but older than consumed: {} skipping..", - debug_id, + "{}: received: {} but older than consumed: {} skipping..", + id, pix, consumed ); @@ -389,8 +382,8 @@ fn blob_idx_in_window(debug_id: u64, pix: u64, consumed: u64, received: &mut u64 if pix >= consumed + WINDOW_SIZE { trace!( - "{:x}: received: {} will overrun window: {} skipping..", - debug_id, + "{}: received: {} will overrun window: {} skipping..", + id, pix, consumed + WINDOW_SIZE ); @@ -403,7 +396,7 @@ fn blob_idx_in_window(debug_id: u64, pix: u64, consumed: u64, received: &mut u64 #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] fn recv_window( - debug_id: u64, + id: &Pubkey, window: &SharedWindow, crdt: &Arc>, recycler: &BlobRecycler, @@ -428,8 +421,8 @@ fn recv_window( let now = Instant::now(); inc_new_counter_info!("streamer-recv_window-recv", dq.len(), 100); trace!( - "{:x}: RECV_WINDOW {} {}: got packets {}", - debug_id, + "{}: RECV_WINDOW {} {}: got packets {}", + id, *consumed, *received, dq.len(), @@ -438,7 +431,7 @@ fn recv_window( retransmit_all_leader_blocks( maybe_leader, &dq, - debug_id, + id, recycler, *consumed, *received, @@ -457,15 +450,15 @@ fn recv_window( }; pixs.push(pix); - if !blob_idx_in_window(debug_id, pix, *consumed, received) { + if !blob_idx_in_window(&id, pix, *consumed, received) { recycler.recycle(b, "recv_window"); continue; } - trace!("{:x} window pix: {} size: {}", debug_id, pix, meta_size); + trace!("{} window pix: {} size: {}", id, pix, meta_size); process_blob( - debug_id, + id, b, pix, &mut consume_queue, @@ -477,10 +470,10 @@ fn recv_window( ); } if log_enabled!(Level::Trace) { - trace!("{}", print_window(debug_id, window, *consumed)); + trace!("{}", print_window(id, window, *consumed)); trace!( - "{:x}: consumed: {} received: {} sending consume.len: {} pixs: {:?} took {} ms", - debug_id, + "{}: consumed: {} received: {} sending consume.len: {} pixs: {:?} took {} ms", + id, *consumed, *received, consume_queue.len(), @@ -495,7 +488,7 @@ fn recv_window( Ok(()) } -pub fn print_window(debug_id: u64, window: &SharedWindow, consumed: u64) -> String { +pub fn print_window(id: &Pubkey, window: &SharedWindow, consumed: u64) -> String { let pointer: Vec<_> = window .read() .unwrap() @@ -529,11 +522,11 @@ pub fn print_window(debug_id: u64, window: &SharedWindow, consumed: u64) -> Stri }) .collect(); format!( - "\n{:x}: WINDOW ({}): {}\n{:x}: WINDOW ({}): {}", - debug_id, + "\n{}: WINDOW ({}): {}\n{}: WINDOW ({}): {}", + id, consumed, pointer.join(""), - debug_id, + id, consumed, buf.join("") ) @@ -552,7 +545,7 @@ pub fn index_blobs( receive_index: &mut u64, ) -> Result<()> { // enumerate all the blobs, those are the indices - trace!("{:x}: INDEX_BLOBS {}", node_info.debug_id(), blobs.len()); + trace!("{}: INDEX_BLOBS {}", node_info.id, blobs.len()); for (i, b) in blobs.iter().enumerate() { // only leader should be broadcasting let mut blob = b.write().expect("'blob' write lock in crdt::index_blobs"); @@ -576,14 +569,14 @@ pub fn initialized_window( entry_height: u64, ) -> SharedWindow { let window = default_window(); - let debug_id = node_info.debug_id(); + let id = node_info.id; { let mut win = window.write().unwrap(); trace!( - "{:x} initialized window entry_height:{} blobs_len:{}", - debug_id, + "{} initialized window entry_height:{} blobs_len:{}", + id, entry_height, blobs.len() ); @@ -597,7 +590,7 @@ pub fn initialized_window( for b in blobs.into_iter().skip(diff) { let ix = b.read().unwrap().get_index().expect("blob index"); let pos = (ix % WINDOW_SIZE) as usize; - trace!("{:x} caching {} at {}", debug_id, ix, pos); + trace!("{} caching {} at {}", id, ix, pos); assert!(win[pos].data.is_none()); win[pos].data = Some(b); } @@ -634,12 +627,12 @@ pub fn window( let mut received = entry_height; let mut last = entry_height; let mut times = 0; - let debug_id = crdt.read().unwrap().debug_id(); + let id = crdt.read().unwrap().id; let mut pending_retransmits = false; - trace!("{:x}: RECV_WINDOW started", debug_id); + trace!("{}: RECV_WINDOW started", id); loop { if let Err(e) = recv_window( - debug_id, + &id, &window, &crdt, &recycler, @@ -660,11 +653,11 @@ pub fn window( } } if let Some(reqs) = repair_window( - debug_id, &window, &crdt, &recycler, &mut last, &mut times, consumed, received, + &id, &window, &crdt, &recycler, &mut last, &mut times, consumed, received, ) { for (to, req) in reqs { repair_socket.send_to(&req, to).unwrap_or_else(|e| { - info!("{:x} repair req send_to({}) error {:?}", debug_id, to, e); + info!("{} repair req send_to({}) error {:?}", id, to, e); 0 }); } @@ -679,6 +672,7 @@ mod test { use crdt::{Crdt, Node}; use logger; use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE}; + use signature::Pubkey; use std::io; use std::io::Write; use std::net::UdpSocket; @@ -1023,30 +1017,26 @@ mod test { ); } - fn wrap_blob_idx_in_window( - debug_id: u64, - pix: u64, - consumed: u64, - received: u64, - ) -> (bool, u64) { + fn wrap_blob_idx_in_window(id: &Pubkey, pix: u64, consumed: u64, received: u64) -> (bool, u64) { let mut received = received; - let is_in_window = blob_idx_in_window(debug_id, pix, consumed, &mut received); + let is_in_window = blob_idx_in_window(&id, pix, consumed, &mut received); (is_in_window, received) } #[test] pub fn blob_idx_in_window_test() { + let id = Pubkey::default(); assert_eq!( - wrap_blob_idx_in_window(0, 90 + WINDOW_SIZE, 90, 100), + wrap_blob_idx_in_window(&id, 90 + WINDOW_SIZE, 90, 100), (false, 90 + WINDOW_SIZE) ); assert_eq!( - wrap_blob_idx_in_window(0, 91 + WINDOW_SIZE, 90, 100), + wrap_blob_idx_in_window(&id, 91 + WINDOW_SIZE, 90, 100), (false, 91 + WINDOW_SIZE) ); - assert_eq!(wrap_blob_idx_in_window(0, 89, 90, 100), (false, 100)); + assert_eq!(wrap_blob_idx_in_window(&id, 89, 90, 100), (false, 100)); - assert_eq!(wrap_blob_idx_in_window(0, 91, 90, 100), (true, 100)); - assert_eq!(wrap_blob_idx_in_window(0, 101, 90, 100), (true, 101)); + assert_eq!(wrap_blob_idx_in_window(&id, 91, 90, 100), (true, 100)); + assert_eq!(wrap_blob_idx_in_window(&id, 101, 90, 100), (true, 101)); } #[test] pub fn test_repair_backoff() { diff --git a/src/write_stage.rs b/src/write_stage.rs index 8041e0414def63..7f3efc439fb08d 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -90,7 +90,7 @@ impl WriteStage { .spawn(move || { let mut last_vote = 0; let mut last_valid_validator_timestamp = 0; - let debug_id = crdt.read().unwrap().debug_id(); + let id = crdt.read().unwrap().id; loop { if let Err(e) = Self::write_and_send_entries( &crdt, @@ -113,7 +113,7 @@ impl WriteStage { } }; if let Err(e) = send_leader_vote( - debug_id, + &id, &keypair, &bank, &crdt, diff --git a/tests/multinode.rs b/tests/multinode.rs index 0da5d9cc271f09..76aa4063280a40 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -247,7 +247,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { //verify validator has the same balance let mut success = 0usize; for server in servers.iter() { - info!("0server: {:x}", server.debug_id()); + info!("0server: {}", server.id); let mut client = mk_client(server); if let Ok(bal) = client.poll_get_balance(&bob_pubkey) { info!("validator balance {}", bal); @@ -289,7 +289,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { for server in servers.iter() { let mut client = mk_client(server); - info!("1server: {:x}", server.debug_id()); + info!("1server: {}", server.id); for _ in 0..15 { if let Ok(bal) = client.poll_get_balance(&bob_pubkey) { info!("validator balance {}", bal); @@ -548,7 +548,7 @@ fn test_multi_node_dynamic_network() { ).unwrap(); info!("leader balance {}", leader_balance); - info!("{:x} LEADER", leader_data.debug_id()); + info!("{} LEADER", leader_data.id); let leader_balance = retry_send_tx_and_retry_get_balance( &leader_data, &alice_arc.read().unwrap(), @@ -604,7 +604,7 @@ fn test_multi_node_dynamic_network() { .spawn(move || { let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); let rd = validator.info.clone(); - info!("starting {} {:x}", keypair.pubkey(), rd.debug_id()); + info!("starting {} {}", keypair.pubkey(), rd.id); let val = Fullnode::new( validator, &ledger_path, @@ -676,7 +676,7 @@ fn test_multi_node_dynamic_network() { let mut num_nodes_behind = 0i64; validators.retain(|server| { let mut client = mk_client(&server.0); - trace!("{:x} checking signature", server.0.debug_id()); + trace!("{} checking signature", server.0.id); num_nodes_behind += if client.check_signature(&sig) { 0 } else { 1 }; server.1.exit(); true