Skip to content

Commit

Permalink
Repair support in dos tool and fix for repair dos
Browse files Browse the repository at this point in the history
Repair dos can easily cause memory exhaustion.
  • Loading branch information
sakridge committed Mar 24, 2020
1 parent 216b01b commit 3a73349
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 51 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions archiver-lib/src/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use solana_core::{
contact_info::ContactInfo,
gossip_service::GossipService,
repair_service,
repair_service::{RepairService, RepairSlotRange, RepairStrategy},
repair_service::{RepairService, RepairSlotRange, RepairStats, RepairStrategy},
serve_repair::ServeRepair,
shred_fetch_stage::ShredFetchStage,
sigverify_stage::{DisabledSigVerifier, SigVerifyStage},
Expand Down Expand Up @@ -839,13 +839,14 @@ impl Archiver {
repair_service::MAX_REPAIR_LENGTH,
&repair_slot_range,
);
let mut repair_stats = RepairStats::default();
//iter over the repairs and send them
if let Ok(repairs) = repairs {
let reqs: Vec<_> = repairs
.into_iter()
.filter_map(|repair_request| {
serve_repair
.map_repair_request(&repair_request)
.map_repair_request(&repair_request, &mut repair_stats)
.map(|result| ((archiver_info.gossip, result), repair_request))
.ok()
})
Expand Down
49 changes: 47 additions & 2 deletions core/src/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,31 @@ use std::{
sync::{Arc, RwLock},
thread::sleep,
thread::{self, Builder, JoinHandle},
time::Duration,
time::{Duration, Instant},
};

#[derive(Default)]
pub struct RepairStatsGroup {
pub count: u64,
pub min: u64,
pub max: u64,
}

impl RepairStatsGroup {
pub fn update(&mut self, slot: u64) {
self.count += 1;
self.min = std::cmp::min(self.min, slot);
self.max = std::cmp::max(self.max, slot);
}
}

#[derive(Default)]
pub struct RepairStats {
pub shred: RepairStatsGroup,
pub highest_shred: RepairStatsGroup,
pub orphan: RepairStatsGroup,
}

pub const MAX_REPAIR_LENGTH: usize = 512;
pub const REPAIR_MS: u64 = 100;
pub const MAX_ORPHANS: usize = 5;
Expand Down Expand Up @@ -91,6 +113,8 @@ impl RepairService {
if let RepairStrategy::RepairAll { .. } = repair_strategy {
Self::initialize_lowest_slot(id, blockstore, cluster_info);
}
let mut repair_stats = RepairStats::default();
let mut last_stats = Instant::now();
loop {
if exit.load(Ordering::Relaxed) {
break;
Expand Down Expand Up @@ -135,7 +159,12 @@ impl RepairService {
.into_iter()
.filter_map(|repair_request| {
serve_repair
.repair_request(&cluster_slots, &repair_request, &mut cache)
.repair_request(
&cluster_slots,
&repair_request,
&mut cache,
&mut repair_stats,
)
.map(|result| (result, repair_request))
.ok()
})
Expand All @@ -148,6 +177,22 @@ impl RepairService {
});
}
}
if last_stats.elapsed().as_secs() > 1 {
let repair_total = repair_stats.shred.count
+ repair_stats.highest_shred.count
+ repair_stats.orphan.count;
if repair_total > 0 {
datapoint_info!(
"serve_repair-repair",
("repair-total", repair_total, i64),
("shred-count", repair_stats.shred.count, i64),
("highest-shred-count", repair_stats.highest_shred.count, i64),
("orphan", repair_stats.orphan.count, i64),
);
}
repair_stats = RepairStats::default();
last_stats = Instant::now();
}
sleep(Duration::from_millis(REPAIR_MS));
}
}
Expand Down
129 changes: 90 additions & 39 deletions core/src/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{
cluster_info::{ClusterInfo, ClusterInfoError},
cluster_slots::ClusterSlots,
contact_info::ContactInfo,
repair_service::RepairStats,
result::{Error, Result},
weighted_shuffle::weighted_best,
};
Expand Down Expand Up @@ -46,9 +47,19 @@ impl RepairType {
}
}

#[derive(Default)]
pub struct ServeRepairStats {
pub total_packets: usize,
pub processed: usize,
pub self_repair: usize,
pub window_index: usize,
pub highest_window_index: usize,
pub orphan: usize,
}

/// Window protocol messages
#[derive(Serialize, Deserialize, Debug)]
enum RepairProtocol {
pub enum RepairProtocol {
WindowIndex(ContactInfo, u64, u64),
HighestWindowIndex(ContactInfo, u64, u64),
Orphan(ContactInfo, u64),
Expand Down Expand Up @@ -106,25 +117,22 @@ impl ServeRepair {
from_addr: &SocketAddr,
blockstore: Option<&Arc<Blockstore>>,
request: RepairProtocol,
stats: &mut ServeRepairStats,
) -> Option<Packets> {
let now = Instant::now();

//TODO verify from is signed
let my_id = me.read().unwrap().keypair.pubkey();
let from = Self::get_repair_sender(&request);
if from.id == my_id {
warn!(
"{}: Ignored received repair request from ME {}",
my_id, from.id,
);
inc_new_counter_debug!("serve_repair-handle-repair--eq", 1);
stats.self_repair += 1;
return None;
}

let (res, label) = {
match &request {
RepairProtocol::WindowIndex(from, slot, shred_index) => {
inc_new_counter_debug!("serve_repair-request-window-index", 1);
stats.window_index += 1;
(
Self::run_window_request(
recycler,
Expand All @@ -140,7 +148,7 @@ impl ServeRepair {
}

RepairProtocol::HighestWindowIndex(_, slot, highest_index) => {
inc_new_counter_debug!("serve_repair-request-highest-window-index", 1);
stats.highest_window_index += 1;
(
Self::run_highest_window_request(
recycler,
Expand All @@ -153,7 +161,7 @@ impl ServeRepair {
)
}
RepairProtocol::Orphan(_, slot) => {
inc_new_counter_debug!("serve_repair-request-orphan", 1);
stats.orphan += 1;
(
Self::run_orphan(
recycler,
Expand Down Expand Up @@ -187,15 +195,46 @@ impl ServeRepair {
blockstore: Option<&Arc<Blockstore>>,
requests_receiver: &PacketReceiver,
response_sender: &PacketSender,
stats: &mut ServeRepairStats,
) -> Result<()> {
//TODO cache connections
let timeout = Duration::new(1, 0);
let reqs = requests_receiver.recv_timeout(timeout)?;
stats.total_packets += reqs.packets.len();
// Drop the rest in the channel in case of dos
while let Ok(more) = requests_receiver.try_recv() {
stats.total_packets += more.packets.len();
}

Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender);
Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender, stats);
Ok(())
}

fn report_reset_stats(me: &Arc<RwLock<Self>>, stats: &mut ServeRepairStats) {
if stats.self_repair > 0 {
let my_id = me.read().unwrap().keypair.pubkey();
warn!(
"{}: Ignored received repair requests from ME: {}",
my_id, stats.self_repair,
);
inc_new_counter_debug!("serve_repair-handle-repair--eq", stats.self_repair);
}

info!(
"repair_listener: total_packets: {} passed: {}",
stats.total_packets, stats.processed
);

inc_new_counter_debug!("serve_repair-request-window-index", stats.window_index);
inc_new_counter_debug!(
"serve_repair-request-highest-window-index",
stats.highest_window_index
);
inc_new_counter_debug!("serve_repair-request-orphan", stats.orphan);

*stats = ServeRepairStats::default();
}

pub fn listen(
me: Arc<RwLock<Self>>,
blockstore: Option<Arc<Blockstore>>,
Expand All @@ -207,22 +246,31 @@ impl ServeRepair {
let recycler = PacketsRecycler::default();
Builder::new()
.name("solana-repair-listen".to_string())
.spawn(move || loop {
let result = Self::run_listen(
&me,
&recycler,
blockstore.as_ref(),
&requests_receiver,
&response_sender,
);
match result {
Err(Error::RecvTimeoutError(_)) | Ok(_) => {}
Err(err) => info!("repair listener error: {:?}", err),
};
if exit.load(Ordering::Relaxed) {
return;
.spawn(move || {
let mut last_print = Instant::now();
let mut stats = ServeRepairStats::default();
loop {
let result = Self::run_listen(
&me,
&recycler,
blockstore.as_ref(),
&requests_receiver,
&response_sender,
&mut stats,
);
match result {
Err(Error::RecvTimeoutError(_)) | Ok(_) => {}
Err(err) => info!("repair listener error: {:?}", err),
};
if exit.load(Ordering::Relaxed) {
return;
}
if last_print.elapsed().as_secs() > 2 {
Self::report_reset_stats(&me, &mut stats);
last_print = Instant::now();
}
thread_mem_usage::datapoint("solana-repair-listen");
}
thread_mem_usage::datapoint("solana-repair-listen");
})
.unwrap()
}
Expand All @@ -233,6 +281,7 @@ impl ServeRepair {
blockstore: Option<&Arc<Blockstore>>,
packets: Packets,
response_sender: &PacketSender,
stats: &mut ServeRepairStats,
) {
// iter over the packets, collect pulls separately and process everything else
let allocated = thread_mem_usage::Allocatedp::default();
Expand All @@ -242,7 +291,9 @@ impl ServeRepair {
limited_deserialize(&packet.data[..packet.meta.size])
.into_iter()
.for_each(|request| {
let rsp = Self::handle_repair(me, recycler, &from_addr, blockstore, request);
stats.processed += 1;
let rsp =
Self::handle_repair(me, recycler, &from_addr, blockstore, request, stats);
if let Some(rsp) = rsp {
let _ignore_disconnect = response_sender.send(rsp);
}
Expand Down Expand Up @@ -277,6 +328,7 @@ impl ServeRepair {
cluster_slots: &ClusterSlots,
repair_request: &RepairType,
cache: &mut RepairCache,
repair_stats: &mut RepairStats,
) -> Result<(SocketAddr, Vec<u8>)> {
// find a peer that appears to be accepting replication and has the desired slot, as indicated
// by a valid tvu port location
Expand All @@ -295,30 +347,26 @@ impl ServeRepair {
let (repair_peers, weights) = cache.get(&repair_request.slot()).unwrap();
let n = weighted_best(&weights, Pubkey::new_rand().to_bytes());
let addr = repair_peers[n].serve_repair; // send the request to the peer's serve_repair port
let out = self.map_repair_request(repair_request)?;
let out = self.map_repair_request(repair_request, repair_stats)?;
Ok((addr, out))
}

pub fn map_repair_request(&self, repair_request: &RepairType) -> Result<Vec<u8>> {
pub fn map_repair_request(
&self,
repair_request: &RepairType,
repair_stats: &mut RepairStats,
) -> Result<Vec<u8>> {
match repair_request {
RepairType::Shred(slot, shred_index) => {
datapoint_debug!(
"serve_repair-repair",
("repair-slot", *slot, i64),
("repair-ix", *shred_index, i64)
);
repair_stats.shred.update(*slot);
Ok(self.window_index_request_bytes(*slot, *shred_index)?)
}
RepairType::HighestShred(slot, shred_index) => {
datapoint_info!(
"serve_repair-repair_highest",
("repair-highest-slot", *slot, i64),
("repair-highest-ix", *shred_index, i64)
);
repair_stats.highest_shred.update(*slot);
Ok(self.window_highest_index_request_bytes(*slot, *shred_index)?)
}
RepairType::Orphan(slot) => {
datapoint_info!("serve_repair-repair_orphan", ("repair-orphan", *slot, i64));
repair_stats.orphan.update(*slot);
Ok(self.orphan_bytes(*slot)?)
}
}
Expand Down Expand Up @@ -583,6 +631,7 @@ mod tests {
&cluster_slots,
&RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut RepairStats::default(),
);
assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers)));

Expand All @@ -608,6 +657,7 @@ mod tests {
&cluster_slots,
&RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut RepairStats::default(),
)
.unwrap();
assert_eq!(nxt.serve_repair, serve_repair_addr);
Expand Down Expand Up @@ -639,6 +689,7 @@ mod tests {
&cluster_slots,
&RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut RepairStats::default(),
)
.unwrap();
if rv.0 == serve_repair_addr {
Expand Down
1 change: 1 addition & 0 deletions dos/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ solana-core = { path = "../core", version = "1.1.0" }
solana-logger = { path = "../logger", version = "1.1.0" }
solana-net-utils = { path = "../net-utils", version = "1.1.0" }
solana-runtime = { path = "../runtime", version = "1.1.0" }
solana-sdk = { path = "../sdk", version = "1.1.0" }
Loading

0 comments on commit 3a73349

Please sign in to comment.