Skip to content

Commit

Permalink
ancestor hashes socket ping/pong support (#26866)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbiseda authored Aug 10, 2022
1 parent ccfbc54 commit 370de81
Show file tree
Hide file tree
Showing 3 changed files with 267 additions and 156 deletions.
204 changes: 140 additions & 64 deletions core/src/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,34 @@ use {
duplicate_repair_status::{DeadSlotAncestorRequestStatus, DuplicateAncestorDecision},
outstanding_requests::OutstandingRequests,
packet_threshold::DynamicPacketToProcessThreshold,
repair_response::{self},
repair_service::{DuplicateSlotsResetSender, RepairInfo, RepairStatsGroup},
replay_stage::DUPLICATE_THRESHOLD,
result::{Error, Result},
serve_repair::{AncestorHashesRepairType, ServeRepair},
serve_repair::{
AncestorHashesRepairType, AncestorHashesResponse, RepairProtocol, ServeRepair,
},
},
bincode::serialize,
crossbeam_channel::{unbounded, Receiver, Sender},
dashmap::{mapref::entry::Entry::Occupied, DashMap},
solana_ledger::{blockstore::Blockstore, shred::SIZE_OF_NONCE},
solana_gossip::{cluster_info::ClusterInfo, ping_pong::Pong},
solana_ledger::blockstore::Blockstore,
solana_perf::{
packet::{Packet, PacketBatch},
packet::{deserialize_from_with_limit, Packet, PacketBatch},
recycler::Recycler,
},
solana_runtime::bank::Bank,
solana_sdk::{
clock::{Slot, SLOT_MS},
pubkey::Pubkey,
signature::Signable,
signer::keypair::Keypair,
timing::timestamp,
},
solana_streamer::streamer::{self, PacketBatchReceiver, StreamerReceiveStats},
std::{
collections::HashSet,
io::{Cursor, Read},
net::UdpSocket,
sync::{
atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -62,27 +67,25 @@ type RetryableSlotsReceiver = Receiver<Slot>;
type OutstandingAncestorHashesRepairs = OutstandingRequests<AncestorHashesRepairType>;

#[derive(Default)]
pub struct AncestorHashesResponsesStats {
pub total_packets: usize,
pub dropped_packets: usize,
pub invalid_packets: usize,
pub processed: usize,
struct AncestorHashesResponsesStats {
total_packets: usize,
processed: usize,
dropped_packets: usize,
invalid_packets: usize,
ping_count: usize,
ping_err_verify_count: usize,
}

impl AncestorHashesResponsesStats {
fn report(&mut self) {
inc_new_counter_info!(
"ancestor_hashes_responses-total_packets",
self.total_packets
);
inc_new_counter_info!("ancestor_hashes_responses-processed", self.processed);
inc_new_counter_info!(
"ancestor_hashes_responses-dropped_packets",
self.dropped_packets
);
inc_new_counter_info!(
"ancestor_hashes_responses-invalid_packets",
self.invalid_packets
datapoint_info!(
"ancestor_hashes_responses",
("total_packets", self.total_packets, i64),
("processed", self.processed, i64),
("dropped_packets", self.dropped_packets, i64),
("invalid_packets", self.invalid_packets, i64),
("ping_count", self.ping_count, i64),
("ping_err_verify_count", self.ping_err_verify_count, i64),
);
*self = AncestorHashesResponsesStats::default();
}
Expand Down Expand Up @@ -174,6 +177,8 @@ impl AncestorHashesService {
exit.clone(),
repair_info.duplicate_slots_reset_sender.clone(),
retryable_slots_sender,
repair_info.cluster_info.clone(),
ancestor_hashes_request_socket.clone(),
);

// Generate ancestor requests for dead slots that are repairable
Expand Down Expand Up @@ -206,6 +211,8 @@ impl AncestorHashesService {
exit: Arc<AtomicBool>,
duplicate_slots_reset_sender: DuplicateSlotsResetSender,
retryable_slots_sender: RetryableSlotsSender,
cluster_info: Arc<ClusterInfo>,
ancestor_socket: Arc<UdpSocket>,
) -> JoinHandle<()> {
Builder::new()
.name("solana-ancestor-hashes-responses-service".to_string())
Expand All @@ -214,6 +221,7 @@ impl AncestorHashesService {
let mut stats = AncestorHashesResponsesStats::default();
let mut packet_threshold = DynamicPacketToProcessThreshold::default();
loop {
let keypair = cluster_info.keypair().clone();
let result = Self::process_new_packets_from_channel(
&ancestor_hashes_request_statuses,
&response_receiver,
Expand All @@ -223,6 +231,8 @@ impl AncestorHashesService {
&mut packet_threshold,
&duplicate_slots_reset_sender,
&retryable_slots_sender,
&keypair,
&ancestor_socket,
);
match result {
Err(Error::RecvTimeout(_)) | Ok(_) => {}
Expand All @@ -241,6 +251,7 @@ impl AncestorHashesService {
}

/// Process messages from the network
#[allow(clippy::too_many_arguments)]
fn process_new_packets_from_channel(
ancestor_hashes_request_statuses: &DashMap<Slot, DeadSlotAncestorRequestStatus>,
response_receiver: &PacketBatchReceiver,
Expand All @@ -250,6 +261,8 @@ impl AncestorHashesService {
packet_threshold: &mut DynamicPacketToProcessThreshold,
duplicate_slots_reset_sender: &DuplicateSlotsResetSender,
retryable_slots_sender: &RetryableSlotsSender,
keypair: &Keypair,
ancestor_socket: &UdpSocket,
) -> Result<()> {
let timeout = Duration::new(1, 0);
let mut packet_batches = vec![response_receiver.recv_timeout(timeout)?];
Expand Down Expand Up @@ -278,6 +291,8 @@ impl AncestorHashesService {
blockstore,
duplicate_slots_reset_sender,
retryable_slots_sender,
keypair,
ancestor_socket,
);
}
packet_threshold.update(total_packets, timer.elapsed());
Expand All @@ -292,6 +307,8 @@ impl AncestorHashesService {
blockstore: &Blockstore,
duplicate_slots_reset_sender: &DuplicateSlotsResetSender,
retryable_slots_sender: &RetryableSlotsSender,
keypair: &Keypair,
ancestor_socket: &UdpSocket,
) {
packet_batch.iter().for_each(|packet| {
let decision = Self::verify_and_process_ancestor_response(
Expand All @@ -300,6 +317,8 @@ impl AncestorHashesService {
stats,
outstanding_requests,
blockstore,
keypair,
ancestor_socket,
);
if let Some((slot, decision)) = decision {
Self::handle_ancestor_request_decision(
Expand All @@ -321,55 +340,104 @@ impl AncestorHashesService {
stats: &mut AncestorHashesResponsesStats,
outstanding_requests: &RwLock<OutstandingAncestorHashesRepairs>,
blockstore: &Blockstore,
keypair: &Keypair,
ancestor_socket: &UdpSocket,
) -> Option<(Slot, DuplicateAncestorDecision)> {
let from_addr = packet.meta.socket_addr();
let ancestor_hashes_response = packet
.deserialize_slice(..packet.meta.size.saturating_sub(SIZE_OF_NONCE))
.ok()?;

// Verify the response
let request_slot = repair_response::nonce(packet).and_then(|nonce| {
outstanding_requests.write().unwrap().register_response(
nonce,
&ancestor_hashes_response,
timestamp(),
// If the response is valid, return the slot the request
// was for
|ancestor_hashes_request| ancestor_hashes_request.0,
)
});
let packet_data = match packet.data(..) {
Some(data) => data,
None => {
stats.invalid_packets += 1;
return None;
}
};
let mut cursor = Cursor::new(packet_data);
let response = match deserialize_from_with_limit(&mut cursor) {
Ok(response) => response,
Err(_) => {
stats.invalid_packets += 1;
return None;
}
};

if request_slot.is_none() {
stats.invalid_packets += 1;
return None;
}
match response {
AncestorHashesResponse::Hashes(ref hashes) => {
// deserialize trailing nonce
let nonce = match deserialize_from_with_limit(&mut cursor) {
Ok(nonce) => nonce,
Err(_) => {
stats.invalid_packets += 1;
return None;
}
};

// If was a valid response, there must be a valid `request_slot`
let request_slot = request_slot.unwrap();
stats.processed += 1;
// verify that packet does not contain extraneous data
if cursor.bytes().next().is_some() {
stats.invalid_packets += 1;
return None;
}

if let Occupied(mut ancestor_hashes_status_ref) =
ancestor_hashes_request_statuses.entry(request_slot)
{
let decision = ancestor_hashes_status_ref.get_mut().add_response(
&from_addr,
ancestor_hashes_response.into_slot_hashes(),
blockstore,
);
if decision.is_some() {
// Once a request is completed, remove it from the map so that new
// requests for the same slot can be made again if necessary. It's
// important to hold the `write` lock here via
// `ancestor_hashes_status_ref` so that we don't race with deletion +
// insertion from the `t_ancestor_requests` thread, which may
// 1) Remove expired statuses from `ancestor_hashes_request_statuses`
// 2) Insert another new one via `manage_ancestor_requests()`.
// In which case we wouldn't want to delete the newly inserted entry here.
ancestor_hashes_status_ref.remove();
let request_slot = outstanding_requests.write().unwrap().register_response(
nonce,
&response,
timestamp(),
// If the response is valid, return the slot the request
// was for
|ancestor_hashes_request| ancestor_hashes_request.0,
);

if request_slot.is_none() {
stats.invalid_packets += 1;
return None;
}

// If was a valid response, there must be a valid `request_slot`
let request_slot = request_slot.unwrap();
stats.processed += 1;

if let Occupied(mut ancestor_hashes_status_ref) =
ancestor_hashes_request_statuses.entry(request_slot)
{
let decision = ancestor_hashes_status_ref.get_mut().add_response(
&from_addr,
hashes.clone(),
blockstore,
);
if decision.is_some() {
// Once a request is completed, remove it from the map so that new
// requests for the same slot can be made again if necessary. It's
// important to hold the `write` lock here via
// `ancestor_hashes_status_ref` so that we don't race with deletion +
// insertion from the `t_ancestor_requests` thread, which may
// 1) Remove expired statuses from `ancestor_hashes_request_statuses`
// 2) Insert another new one via `manage_ancestor_requests()`.
// In which case we wouldn't want to delete the newly inserted entry here.
ancestor_hashes_status_ref.remove();
}
decision.map(|decision| (request_slot, decision))
} else {
None
}
}
AncestorHashesResponse::Ping(ping) => {
// verify that packet does not contain extraneous data
if cursor.bytes().next().is_some() {
stats.invalid_packets += 1;
return None;
}
if ping.verify() {
stats.ping_count += 1;
if let Ok(pong) = Pong::new(&ping, keypair) {
let pong = RepairProtocol::Pong(pong);
if let Ok(pong_bytes) = serialize(&pong) {
let _ignore = ancestor_socket.send_to(&pong_bytes[..], from_addr);
}
}
} else {
stats.ping_err_verify_count += 1;
}
None
}
decision.map(|decision| (request_slot, decision))
} else {
None
}
}

Expand Down Expand Up @@ -1145,6 +1213,8 @@ mod test {
&mut AncestorHashesResponsesStats::default(),
&outstanding_requests,
&requester_blockstore,
&requester_cluster_info.keypair(),
&ancestor_hashes_request_socket,
)
.unwrap();

Expand Down Expand Up @@ -1385,7 +1455,9 @@ mod test {

let ManageAncestorHashesState {
ancestor_hashes_request_statuses,
ancestor_hashes_request_socket,
outstanding_requests,
repair_info,
..
} = ManageAncestorHashesState::new(bank_forks);

Expand All @@ -1402,6 +1474,8 @@ mod test {
&mut AncestorHashesResponsesStats::default(),
&outstanding_requests,
&blockstore,
&repair_info.cluster_info.keypair(),
&ancestor_hashes_request_socket,
)
.is_none());
}
Expand Down Expand Up @@ -1506,6 +1580,8 @@ mod test {
&mut AncestorHashesResponsesStats::default(),
&outstanding_requests,
&requester_blockstore,
&requester_cluster_info.keypair(),
&ancestor_hashes_request_socket,
)
.unwrap();

Expand Down
Loading

0 comments on commit 370de81

Please sign in to comment.