Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for repair dos (bp #9056) #9221

Merged
merged 1 commit into from
Apr 1, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions core/src/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
};
use bincode::serialize;
use solana_ledger::blockstore::Blockstore;
use solana_measure::measure::Measure;
use solana_measure::thread_mem_usage;
use solana_metrics::{datapoint_debug, inc_new_counter_debug};
use solana_perf::packet::{limited_deserialize, Packet, Packets, PacketsRecycler};
Expand Down Expand Up @@ -50,6 +51,7 @@ impl RepairType {
#[derive(Default)]
pub struct ServeRepairStats {
pub total_packets: usize,
pub dropped_packets: usize,
pub processed: usize,
pub self_repair: usize,
pub window_index: usize,
Expand Down Expand Up @@ -196,13 +198,39 @@ impl ServeRepair {
requests_receiver: &PacketReceiver,
response_sender: &PacketSender,
stats: &mut ServeRepairStats,
max_packets: &mut usize,
) -> Result<()> {
//TODO cache connections
let timeout = Duration::new(1, 0);
let reqs = requests_receiver.recv_timeout(timeout)?;
stats.total_packets += reqs.packets.len();
let mut reqs_v = vec![requests_receiver.recv_timeout(timeout)?];
let mut total_packets = reqs_v[0].packets.len();

let mut dropped_packets = 0;
while let Ok(more) = requests_receiver.try_recv() {
total_packets += more.packets.len();
if total_packets < *max_packets {
// Drop the rest in the channel in case of dos
reqs_v.push(more);
} else {
dropped_packets += more.packets.len();
}
}

Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender, stats);
stats.dropped_packets += dropped_packets;
stats.total_packets += total_packets;

let mut time = Measure::start("repair::handle_packets");
for reqs in reqs_v {
Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender, stats);
}
time.stop();
if total_packets >= *max_packets {
if time.as_ms() > 1000 {
*max_packets = (*max_packets * 9) / 10;
} else {
*max_packets = (*max_packets * 10) / 9;
}
}
Ok(())
}

Expand All @@ -216,6 +244,9 @@ impl ServeRepair {
inc_new_counter_debug!("serve_repair-handle-repair--eq", stats.self_repair);
}

inc_new_counter_info!("serve_repair-total_packets", stats.total_packets);
inc_new_counter_info!("serve_repair-dropped_packets", stats.dropped_packets);

debug!(
"repair_listener: total_packets: {} passed: {}",
stats.total_packets, stats.processed
Expand Down Expand Up @@ -245,6 +276,7 @@ impl ServeRepair {
.spawn(move || {
let mut last_print = Instant::now();
let mut stats = ServeRepairStats::default();
let mut max_packets = 1024;
loop {
let result = Self::run_listen(
&me,
Expand All @@ -253,6 +285,7 @@ impl ServeRepair {
&requests_receiver,
&response_sender,
&mut stats,
&mut max_packets,
);
match result {
Err(Error::RecvTimeoutError(_)) | Ok(_) => {}
Expand Down