From cded4d5335fcfce5a6ef85fb2ab88760720f6388 Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Thu, 26 Mar 2020 12:42:17 -0700 Subject: [PATCH] Fix repair dos --- core/src/serve_repair.rs | 39 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 36 insertions(+), 3 deletions(-) diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index f4d56ebedeb378..2cb873da0d559f 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -8,6 +8,7 @@ use crate::{ }; use bincode::serialize; use solana_ledger::blockstore::Blockstore; +use solana_measure::measure::Measure; use solana_measure::thread_mem_usage; use solana_metrics::{datapoint_debug, inc_new_counter_debug}; use solana_perf::packet::{limited_deserialize, Packet, Packets, PacketsRecycler}; @@ -50,6 +51,7 @@ impl RepairType { #[derive(Default)] pub struct ServeRepairStats { pub total_packets: usize, + pub dropped_packets: usize, pub processed: usize, pub self_repair: usize, pub window_index: usize, @@ -196,13 +198,39 @@ impl ServeRepair { requests_receiver: &PacketReceiver, response_sender: &PacketSender, stats: &mut ServeRepairStats, + max_packets: &mut usize, ) -> Result<()> { //TODO cache connections let timeout = Duration::new(1, 0); - let reqs = requests_receiver.recv_timeout(timeout)?; - stats.total_packets += reqs.packets.len(); + let mut reqs_v = vec![requests_receiver.recv_timeout(timeout)?]; + let mut total_packets = reqs_v[0].packets.len(); + + let mut dropped_packets = 0; + while let Ok(more) = requests_receiver.try_recv() { + total_packets += more.packets.len(); + if total_packets < *max_packets { + // Drop the rest in the channel in case of dos + reqs_v.push(more); + } else { + dropped_packets += more.packets.len(); + } + } - Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender, stats); + stats.dropped_packets += dropped_packets; + stats.total_packets += total_packets; + + let mut time = Measure::start("repair::handle_packets"); + for reqs in reqs_v { + Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender, stats); + } + time.stop(); + if total_packets >= *max_packets { + if time.as_ms() > 1000 { + *max_packets = (*max_packets * 9) / 10; + } else { + *max_packets = (*max_packets * 10) / 9; + } + } Ok(()) } @@ -216,6 +244,9 @@ impl ServeRepair { inc_new_counter_debug!("serve_repair-handle-repair--eq", stats.self_repair); } + inc_new_counter_info!("serve_repair-total_packets", stats.total_packets); + inc_new_counter_info!("serve_repair-dropped_packets", stats.dropped_packets); + debug!( "repair_listener: total_packets: {} passed: {}", stats.total_packets, stats.processed @@ -245,6 +276,7 @@ impl ServeRepair { .spawn(move || { let mut last_print = Instant::now(); let mut stats = ServeRepairStats::default(); + let mut max_packets = 1024; loop { let result = Self::run_listen( &me, @@ -253,6 +285,7 @@ impl ServeRepair { &requests_receiver, &response_sender, &mut stats, + &mut max_packets, ); match result { Err(Error::RecvTimeoutError(_)) | Ok(_) => {}