Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Fix for repair dos #9056

Merged
merged 1 commit into from
Apr 1, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions core/src/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
};
use bincode::serialize;
use solana_ledger::blockstore::Blockstore;
use solana_measure::measure::Measure;
use solana_measure::thread_mem_usage;
use solana_metrics::{datapoint_debug, inc_new_counter_debug};
use solana_perf::packet::{limited_deserialize, Packet, Packets, PacketsRecycler};
Expand Down Expand Up @@ -50,6 +51,7 @@ impl RepairType {
#[derive(Default)]
pub struct ServeRepairStats {
pub total_packets: usize,
pub dropped_packets: usize,
pub processed: usize,
pub self_repair: usize,
pub window_index: usize,
Expand Down Expand Up @@ -196,13 +198,39 @@ impl ServeRepair {
requests_receiver: &PacketReceiver,
response_sender: &PacketSender,
stats: &mut ServeRepairStats,
max_packets: &mut usize,
) -> Result<()> {
//TODO cache connections
let timeout = Duration::new(1, 0);
let reqs = requests_receiver.recv_timeout(timeout)?;
stats.total_packets += reqs.packets.len();
let mut reqs_v = vec![requests_receiver.recv_timeout(timeout)?];
let mut total_packets = reqs_v[0].packets.len();

let mut dropped_packets = 0;
while let Ok(more) = requests_receiver.try_recv() {
sakridge marked this conversation as resolved.
Show resolved Hide resolved
total_packets += more.packets.len();
if total_packets < *max_packets {
// Drop the rest in the channel in case of dos
reqs_v.push(more);
} else {
dropped_packets += more.packets.len();
}
}

Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender, stats);
stats.dropped_packets += dropped_packets;
stats.total_packets += total_packets;

let mut time = Measure::start("repair::handle_packets");
for reqs in reqs_v {
Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender, stats);
}
time.stop();
if total_packets >= *max_packets {
if time.as_ms() > 1000 {
*max_packets = (*max_packets * 9) / 10;
} else {
*max_packets = (*max_packets * 10) / 9;
}
}
Ok(())
}

Expand All @@ -216,6 +244,9 @@ impl ServeRepair {
inc_new_counter_debug!("serve_repair-handle-repair--eq", stats.self_repair);
}

inc_new_counter_info!("serve_repair-total_packets", stats.total_packets);
inc_new_counter_info!("serve_repair-dropped_packets", stats.dropped_packets);

debug!(
"repair_listener: total_packets: {} passed: {}",
stats.total_packets, stats.processed
Expand Down Expand Up @@ -245,6 +276,7 @@ impl ServeRepair {
.spawn(move || {
let mut last_print = Instant::now();
let mut stats = ServeRepairStats::default();
let mut max_packets = 1024;
loop {
let result = Self::run_listen(
&me,
Expand All @@ -253,6 +285,7 @@ impl ServeRepair {
&requests_receiver,
&response_sender,
&mut stats,
&mut max_packets,
);
match result {
Err(Error::RecvTimeoutError(_)) | Ok(_) => {}
Expand Down