diff --git a/archiver-lib/src/archiver.rs b/archiver-lib/src/archiver.rs index ce4d83e3500f7f..ed3e41273135c6 100644 --- a/archiver-lib/src/archiver.rs +++ b/archiver-lib/src/archiver.rs @@ -14,7 +14,7 @@ use solana_core::{ gossip_service::GossipService, packet::{limited_deserialize, PACKET_DATA_SIZE}, repair_service, - repair_service::{RepairService, RepairSlotRange, RepairStrategy}, + repair_service::{RepairService, RepairSlotRange, RepairStats, RepairStrategy}, serve_repair::ServeRepair, shred_fetch_stage::ShredFetchStage, sigverify_stage::{DisabledSigVerifier, SigVerifyStage}, @@ -839,13 +839,14 @@ impl Archiver { repair_service::MAX_REPAIR_LENGTH, &repair_slot_range, ); + let mut repair_stats = RepairStats::default(); //iter over the repairs and send them if let Ok(repairs) = repairs { let reqs: Vec<_> = repairs .into_iter() .filter_map(|repair_request| { serve_repair - .map_repair_request(&repair_request) + .map_repair_request(&repair_request, &mut repair_stats) .map(|result| ((archiver_info.gossip, result), repair_request)) .ok() }) diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index a1a37c8a0eb706..da223a3233c3c4 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -20,9 +20,31 @@ use std::{ sync::{Arc, RwLock}, thread::sleep, thread::{self, Builder, JoinHandle}, - time::Duration, + time::{Duration, Instant}, }; +#[derive(Default)] +pub struct RepairStatsGroup { + pub count: u64, + pub min: u64, + pub max: u64, +} + +impl RepairStatsGroup { + pub fn update(&mut self, slot: u64) { + self.count += 1; + self.min = std::cmp::min(self.min, slot); + self.max = std::cmp::max(self.max, slot); + } +} + +#[derive(Default)] +pub struct RepairStats { + pub shred: RepairStatsGroup, + pub highest_shred: RepairStatsGroup, + pub orphan: RepairStatsGroup, +} + pub const MAX_REPAIR_LENGTH: usize = 512; pub const REPAIR_MS: u64 = 100; pub const MAX_ORPHANS: usize = 5; @@ -107,6 +129,8 @@ impl RepairService { cluster_info, ); } + let mut repair_stats = RepairStats::default(); + let mut last_stats = Instant::now(); loop { if exit.load(Ordering::Relaxed) { break; @@ -148,7 +172,7 @@ impl RepairService { .into_iter() .filter_map(|repair_request| { serve_repair - .repair_request(&repair_request) + .repair_request(&repair_request, &mut repair_stats) .map(|result| (result, repair_request)) .ok() }) @@ -161,6 +185,24 @@ impl RepairService { }); } } + if last_stats.elapsed().as_secs() > 1 { + let repair_total = repair_stats.shred.count + + repair_stats.highest_shred.count + + repair_stats.orphan.count; + if repair_total > 0 { + datapoint_info!( + "serve_repair-repair", + ("repair-total", repair_total, i64), + ("shred-count", repair_stats.shred.count, i64), + ("highest-shred-count", repair_stats.highest_shred.count, i64), + ("orphan-count", repair_stats.orphan.count, i64), + ("repair-highest-slot", repair_stats.highest_shred.max, i64), + ("repair-orphan", repair_stats.orphan.max, i64), + ); + } + repair_stats = RepairStats::default(); + last_stats = Instant::now(); + } sleep(Duration::from_millis(REPAIR_MS)); } } diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 721e05cc600618..b81104804f0241 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -4,6 +4,7 @@ use crate::{ cluster_info::{ClusterInfo, ClusterInfoError}, contact_info::ContactInfo, packet::Packet, + repair_service::RepairStats, result::{Error, Result}, }; use bincode::serialize; @@ -46,6 +47,16 @@ impl RepairType { } } +#[derive(Default)] +pub struct ServeRepairStats { + pub total_packets: usize, + pub processed: usize, + pub self_repair: usize, + pub window_index: usize, + pub highest_window_index: usize, + pub orphan: usize, +} + /// Window protocol messages #[derive(Serialize, Deserialize, Debug)] enum RepairProtocol { @@ -104,6 +115,7 @@ impl ServeRepair { from_addr: &SocketAddr, blockstore: Option<&Arc>, request: RepairProtocol, + stats: &mut ServeRepairStats, ) -> Option { let now = Instant::now(); @@ -111,18 +123,14 @@ impl ServeRepair { let my_id = me.read().unwrap().keypair.pubkey(); let from = Self::get_repair_sender(&request); if from.id == my_id { - warn!( - "{}: Ignored received repair request from ME {}", - my_id, from.id, - ); - inc_new_counter_debug!("serve_repair-handle-repair--eq", 1); + stats.self_repair += 1; return None; } let (res, label) = { match &request { RepairProtocol::WindowIndex(from, slot, shred_index) => { - inc_new_counter_debug!("serve_repair-request-window-index", 1); + stats.window_index += 1; ( Self::run_window_request( recycler, @@ -138,7 +146,7 @@ impl ServeRepair { } RepairProtocol::HighestWindowIndex(_, slot, highest_index) => { - inc_new_counter_debug!("serve_repair-request-highest-window-index", 1); + stats.highest_window_index += 1; ( Self::run_highest_window_request( recycler, @@ -151,7 +159,7 @@ impl ServeRepair { ) } RepairProtocol::Orphan(_, slot) => { - inc_new_counter_debug!("serve_repair-request-orphan", 1); + stats.orphan += 1; ( Self::run_orphan( recycler, @@ -186,6 +194,7 @@ impl ServeRepair { requests_receiver: &PacketReceiver, response_sender: &PacketSender, max_packets: &mut usize, + stats: &mut ServeRepairStats, ) -> Result<()> { //TODO cache connections let timeout = Duration::new(1, 0); @@ -202,7 +211,7 @@ impl ServeRepair { let mut time = Measure::start("repair::handle_packets"); for reqs in reqs_v { - Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender); + Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender, stats); } time.stop(); if total_packets >= *max_packets { @@ -215,6 +224,31 @@ impl ServeRepair { Ok(()) } + fn report_reset_stats(me: &Arc>, stats: &mut ServeRepairStats) { + if stats.self_repair > 0 { + let my_id = me.read().unwrap().keypair.pubkey(); + warn!( + "{}: Ignored received repair requests from ME: {}", + my_id, stats.self_repair, + ); + inc_new_counter_debug!("serve_repair-handle-repair--eq", stats.self_repair); + } + + debug!( + "repair_listener: total_packets: {} passed: {}", + stats.total_packets, stats.processed + ); + + inc_new_counter_debug!("serve_repair-request-window-index", stats.window_index); + inc_new_counter_debug!( + "serve_repair-request-highest-window-index", + stats.highest_window_index + ); + inc_new_counter_debug!("serve_repair-request-orphan", stats.orphan); + + *stats = ServeRepairStats::default(); + } + pub fn listen( me: Arc>, blockstore: Option>, @@ -228,6 +262,8 @@ impl ServeRepair { .name("solana-repair-listen".to_string()) .spawn(move || { let mut max_packets = 1024; + let mut last_print = Instant::now(); + let mut stats = ServeRepairStats::default(); loop { let result = Self::run_listen( &me, @@ -236,6 +272,7 @@ impl ServeRepair { &requests_receiver, &response_sender, &mut max_packets, + &mut stats, ); match result { Err(Error::RecvTimeoutError(_)) | Ok(_) => {} @@ -244,6 +281,10 @@ impl ServeRepair { if exit.load(Ordering::Relaxed) { return; } + if last_print.elapsed().as_secs() > 2 { + Self::report_reset_stats(&me, &mut stats); + last_print = Instant::now(); + } thread_mem_usage::datapoint("solana-repair-listen"); } }) @@ -256,6 +297,7 @@ impl ServeRepair { blockstore: Option<&Arc>, packets: Packets, response_sender: &PacketSender, + stats: &mut ServeRepairStats, ) { // iter over the packets, collect pulls separately and process everything else let allocated = thread_mem_usage::Allocatedp::default(); @@ -265,7 +307,9 @@ impl ServeRepair { limited_deserialize(&packet.data[..packet.meta.size]) .into_iter() .for_each(|request| { - let rsp = Self::handle_repair(me, recycler, &from_addr, blockstore, request); + stats.processed += 1; + let rsp = + Self::handle_repair(me, recycler, &from_addr, blockstore, request, stats); if let Some(rsp) = rsp { let _ignore_disconnect = response_sender.send(rsp); } @@ -295,7 +339,11 @@ impl ServeRepair { Ok(out) } - pub fn repair_request(&self, repair_request: &RepairType) -> Result<(SocketAddr, Vec)> { + pub fn repair_request( + &self, + repair_request: &RepairType, + repair_stats: &mut RepairStats, + ) -> Result<(SocketAddr, Vec)> { // find a peer that appears to be accepting replication and has the desired slot, as indicated // by a valid tvu port location let valid: Vec<_> = self @@ -308,31 +356,27 @@ impl ServeRepair { } let n = thread_rng().gen::() % valid.len(); let addr = valid[n].serve_repair; // send the request to the peer's serve_repair port - let out = self.map_repair_request(repair_request)?; + let out = self.map_repair_request(repair_request, repair_stats)?; Ok((addr, out)) } - pub fn map_repair_request(&self, repair_request: &RepairType) -> Result> { + pub fn map_repair_request( + &self, + repair_request: &RepairType, + repair_stats: &mut RepairStats, + ) -> Result> { match repair_request { RepairType::Shred(slot, shred_index) => { - datapoint_debug!( - "serve_repair-repair", - ("repair-slot", *slot, i64), - ("repair-ix", *shred_index, i64) - ); + repair_stats.shred.update(*slot); Ok(self.window_index_request_bytes(*slot, *shred_index)?) } RepairType::HighestShred(slot, shred_index) => { - datapoint_info!( - "serve_repair-repair_highest", - ("repair-highest-slot", *slot, i64), - ("repair-highest-ix", *shred_index, i64) - ); + repair_stats.highest_shred.update(*slot); Ok(self.window_highest_index_request_bytes(*slot, *shred_index)?) } RepairType::Orphan(slot) => { - datapoint_info!("serve_repair-repair_orphan", ("repair-orphan", *slot, i64)); + repair_stats.orphan.update(*slot); Ok(self.orphan_bytes(*slot)?) } } @@ -592,7 +636,7 @@ mod tests { let me = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp()); let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(me))); let serve_repair = ServeRepair::new(cluster_info.clone()); - let rv = serve_repair.repair_request(&RepairType::Shred(0, 0)); + let rv = serve_repair.repair_request(&RepairType::Shred(0, 0), &mut RepairStats::default()); assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers))); let serve_repair_addr = socketaddr!([127, 0, 0, 1], 1243); @@ -613,7 +657,7 @@ mod tests { }; cluster_info.write().unwrap().insert_info(nxt.clone()); let rv = serve_repair - .repair_request(&RepairType::Shred(0, 0)) + .repair_request(&RepairType::Shred(0, 0), &mut RepairStats::default()) .unwrap(); assert_eq!(nxt.serve_repair, serve_repair_addr); assert_eq!(rv.0, nxt.serve_repair); @@ -640,7 +684,7 @@ mod tests { while !one || !two { //this randomly picks an option, so eventually it should pick both let rv = serve_repair - .repair_request(&RepairType::Shred(0, 0)) + .repair_request(&RepairType::Shred(0, 0), &mut RepairStats::default()) .unwrap(); if rv.0 == serve_repair_addr { one = true;