Skip to content

Commit

Permalink
uses SipHasher24 for gossip ping tokens (#3974)
Browse files Browse the repository at this point in the history
The commit uses SipHasher24 for gossip ping tokens where keys are
refreshed every 1 minute.
  • Loading branch information
behzadnouri authored Dec 12, 2024
1 parent 9379fbc commit 9f70456
Show file tree
Hide file tree
Showing 14 changed files with 195 additions and 201 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 0 additions & 5 deletions core/src/banking_trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ pub struct BankingTracer {
active_tracer: Option<ActiveTracer>,
}

#[cfg_attr(
feature = "frozen-abi",
derive(AbiExample),
frozen_abi(digest = "6PCDw6YSEivfbwhbPmE4NAsXb88ZX6hkFnruP8B38nma")
)]
#[derive(Serialize, Deserialize, Debug)]
pub struct TimedTracedEvent(pub std::time::SystemTime, pub TracedEvent);

Expand Down
9 changes: 3 additions & 6 deletions core/src/repair/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use {
replay_stage::DUPLICATE_THRESHOLD,
shred_fetch_stage::receive_quic_datagrams,
},
bincode::serialize,
bytes::Bytes,
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
dashmap::{mapref::entry::Entry::Occupied, DashMap},
Expand Down Expand Up @@ -454,11 +453,9 @@ impl AncestorHashesService {
return None;
}
stats.ping_count += 1;
if let Ok(pong) = Pong::new(&ping, keypair) {
let pong = RepairProtocol::Pong(pong);
if let Ok(pong_bytes) = serialize(&pong) {
let _ignore = ancestor_socket.send_to(&pong_bytes[..], from_addr);
}
let pong = RepairProtocol::Pong(Pong::new(&ping, keypair));
if let Ok(pong) = bincode::serialize(&pong) {
let _ = ancestor_socket.send_to(&pong, from_addr);
}
None
}
Expand Down
45 changes: 21 additions & 24 deletions core/src/repair/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use {
solana_gossip::{
cluster_info::{ClusterInfo, ClusterInfoError},
contact_info::{ContactInfo, Protocol},
ping_pong::{self, PingCache, Pong},
ping_pong::{self, Pong},
weighted_shuffle::WeightedShuffle,
},
solana_ledger::{
Expand Down Expand Up @@ -81,7 +81,7 @@ pub const MAX_ANCESTOR_BYTES_IN_PACKET: usize =
pub const MAX_ANCESTOR_RESPONSES: usize =
MAX_ANCESTOR_BYTES_IN_PACKET / std::mem::size_of::<(Slot, Hash)>();
/// Number of bytes in the randomly generated token sent with ping messages.
pub(crate) const REPAIR_PING_TOKEN_SIZE: usize = HASH_BYTES;
const REPAIR_PING_TOKEN_SIZE: usize = HASH_BYTES;
pub const REPAIR_PING_CACHE_CAPACITY: usize = 65536;
pub const REPAIR_PING_CACHE_TTL: Duration = Duration::from_secs(1280);
const REPAIR_PING_CACHE_RATE_LIMIT_DELAY: Duration = Duration::from_secs(2);
Expand Down Expand Up @@ -141,11 +141,6 @@ impl AncestorHashesRepairType {
}
}

#[cfg_attr(
feature = "frozen-abi",
derive(AbiEnumVisitor, AbiExample),
frozen_abi(digest = "GPS6e6pgUdbXLwXN6XHTqrUVMwAL2YKLPDawgMi5hHzi")
)]
#[derive(Debug, Deserialize, Serialize)]
pub enum AncestorHashesResponse {
Hashes(Vec<(Slot, Hash)>),
Expand Down Expand Up @@ -219,7 +214,8 @@ impl RepairRequestHeader {
}
}

pub(crate) type Ping = ping_pong::Ping<[u8; REPAIR_PING_TOKEN_SIZE]>;
type Ping = ping_pong::Ping<REPAIR_PING_TOKEN_SIZE>;
type PingCache = ping_pong::PingCache<REPAIR_PING_TOKEN_SIZE>;

/// Window protocol messages
#[cfg_attr(
Expand Down Expand Up @@ -270,11 +266,6 @@ fn discard_malformed_repair_requests(
requests.len()
}

#[cfg_attr(
feature = "frozen-abi",
derive(AbiEnumVisitor, AbiExample),
frozen_abi(digest = "9A6ae44qpdT7PaxiDZbybMM2mewnSnPs3C4CxhpbbYuV")
)]
#[derive(Debug, Deserialize, Serialize)]
pub(crate) enum RepairResponse {
Ping(Ping),
Expand Down Expand Up @@ -824,6 +815,8 @@ impl ServeRepair {
assert!(REPAIR_PING_CACHE_RATE_LIMIT_DELAY > Duration::from_millis(REPAIR_MS));

let mut ping_cache = PingCache::new(
&mut rand::thread_rng(),
Instant::now(),
REPAIR_PING_CACHE_TTL,
REPAIR_PING_CACHE_RATE_LIMIT_DELAY,
REPAIR_PING_CACHE_CAPACITY,
Expand Down Expand Up @@ -924,10 +917,16 @@ impl ServeRepair {
identity_keypair: &Keypair,
) -> (bool, Option<Packet>) {
let mut rng = rand::thread_rng();
let mut pingf = move || Ping::new_rand(&mut rng, identity_keypair).ok();
let (check, ping) = request
.sender()
.map(|&sender| ping_cache.check(Instant::now(), (sender, *from_addr), &mut pingf))
.map(|&sender| {
ping_cache.check(
&mut rng,
identity_keypair,
Instant::now(),
(sender, *from_addr),
)
})
.unwrap_or_default();
let ping_pkt = if let Some(ping) = ping {
match request {
Expand Down Expand Up @@ -1232,12 +1231,10 @@ impl ServeRepair {
}
packet.meta_mut().set_discard(true);
stats.ping_count += 1;
if let Ok(pong) = Pong::new(&ping, keypair) {
let pong = RepairProtocol::Pong(pong);
if let Ok(pong_bytes) = serialize(&pong) {
let from_addr = packet.meta().socket_addr();
pending_pongs.push((pong_bytes, from_addr));
}
let pong = RepairProtocol::Pong(Pong::new(&ping, keypair));
if let Ok(pong) = bincode::serialize(&pong) {
let from_addr = packet.meta().socket_addr();
pending_pongs.push((pong, from_addr));
}
}
}
Expand Down Expand Up @@ -1462,7 +1459,7 @@ mod tests {
fn test_serialized_ping_size() {
let mut rng = rand::thread_rng();
let keypair = Keypair::new();
let ping = Ping::new_rand(&mut rng, &keypair).unwrap();
let ping = Ping::new(rng.gen(), &keypair);
let ping = RepairResponse::Ping(ping);
let pkt = Packet::from_data(None, ping).unwrap();
assert_eq!(pkt.meta().size, REPAIR_RESPONSE_SERIALIZED_PING_BYTES);
Expand Down Expand Up @@ -1516,8 +1513,8 @@ mod tests {
fn test_check_well_formed_repair_request() {
let mut rng = rand::thread_rng();
let keypair = Keypair::new();
let ping = ping_pong::Ping::<[u8; 32]>::new_rand(&mut rng, &keypair).unwrap();
let pong = Pong::new(&ping, &keypair).unwrap();
let ping = Ping::new(rng.gen(), &keypair);
let pong = Pong::new(&ping, &keypair);
let request = RepairProtocol::Pong(pong);
let mut pkt = Packet::from_data(None, request).unwrap();
let mut batch = vec![make_remote_request(&pkt)];
Expand Down
2 changes: 2 additions & 0 deletions gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ rand = { workspace = true }
rand_chacha = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true }
serde-big-array = { workspace = true }
serde_bytes = { workspace = true }
serde_derive = { workspace = true }
siphasher = { workspace = true }
solana-bloom = { workspace = true }
solana-clap-utils = { workspace = true }
solana-client = { workspace = true }
Expand Down
42 changes: 18 additions & 24 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ use {
epoch_slots::EpochSlots,
gossip_error::GossipError,
legacy_contact_info::LegacyContactInfo,
ping_pong::{PingCache, Pong},
ping_pong::Pong,
protocol::{
split_gossip_messages, Ping, Protocol, PruneData, DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
MAX_INCREMENTAL_SNAPSHOT_HASHES, MAX_PRUNE_DATA_NODES,
PULL_RESPONSE_MIN_SERIALIZED_SIZE, PUSH_MESSAGE_MAX_PAYLOAD_SIZE,
split_gossip_messages, Ping, PingCache, Protocol, PruneData,
DUPLICATE_SHRED_MAX_PAYLOAD_SIZE, MAX_INCREMENTAL_SNAPSHOT_HASHES,
MAX_PRUNE_DATA_NODES, PULL_RESPONSE_MIN_SERIALIZED_SIZE, PUSH_MESSAGE_MAX_PAYLOAD_SIZE,
},
restart_crds_values::{
RestartHeaviestFork, RestartLastVotedForkSlots, RestartLastVotedForkSlotsError,
Expand Down Expand Up @@ -217,6 +217,8 @@ impl ClusterInfo {
outbound_budget: DataBudget::default(),
my_contact_info: RwLock::new(contact_info),
ping_cache: Mutex::new(PingCache::new(
&mut rand::thread_rng(),
Instant::now(),
GOSSIP_PING_CACHE_TTL,
GOSSIP_PING_CACHE_RATE_LIMIT_DELAY,
GOSSIP_PING_CACHE_CAPACITY,
Expand Down Expand Up @@ -1729,24 +1731,19 @@ impl ClusterInfo {
// Returns a predicate checking if the pull request is from a valid
// address, and if the address have responded to a ping request. Also
// appends ping packets for the addresses which need to be (re)verified.
//
// allow lint false positive trait bound requirement (`CryptoRng` only
// implemented on `&'a mut T`
#[allow(clippy::needless_pass_by_ref_mut)]
fn check_pull_request<'a, R>(
&'a self,
now: Instant,
mut rng: &'a mut R,
rng: &'a mut R,
packet_batch: &'a mut PacketBatch,
) -> impl FnMut(&PullData) -> bool + 'a
where
R: Rng + CryptoRng,
{
let mut cache = HashMap::<(Pubkey, SocketAddr), bool>::new();
let mut pingf = move || Ping::new_rand(&mut rng, &self.keypair()).ok();
let mut ping_cache = self.ping_cache.lock().unwrap();
let mut hard_check = move |node| {
let (check, ping) = ping_cache.check(now, node, &mut pingf);
let (check, ping) = ping_cache.check(rng, &self.keypair(), now, node);
if let Some(ping) = ping {
let ping = Protocol::PingMessage(ping);
match Packet::from_data(Some(&node.1), ping) {
Expand Down Expand Up @@ -1964,10 +1961,9 @@ impl ClusterInfo {
let keypair = self.keypair();
let pongs_and_dests: Vec<_> = pings
.into_iter()
.filter_map(|(addr, ping)| {
let pong = Pong::new(&ping, &keypair).ok()?;
let pong = Protocol::PongMessage(pong);
Some((addr, pong))
.map(|(addr, ping)| {
let pong = Pong::new(&ping, &keypair);
(addr, Protocol::PongMessage(pong))
})
.collect();
if pongs_and_dests.is_empty() {
Expand Down Expand Up @@ -3110,9 +3106,8 @@ fn verify_gossip_addr<R: Rng + CryptoRng>(
};
let (out, ping) = {
let node = (*pubkey, addr);
let mut pingf = move || Ping::new_rand(rng, keypair).ok();
let mut ping_cache = ping_cache.lock().unwrap();
ping_cache.check(Instant::now(), node, &mut pingf)
ping_cache.check(rng, keypair, Instant::now(), node)
};
if let Some(ping) = ping {
pings.push((addr, Protocol::PingMessage(ping)));
Expand Down Expand Up @@ -3209,12 +3204,11 @@ mod tests {
.collect();
let pings: Vec<_> = {
let mut ping_cache = cluster_info.ping_cache.lock().unwrap();
let mut pingf = || Ping::new_rand(&mut rng, &this_node).ok();
remote_nodes
.iter()
.map(|(keypair, socket)| {
let node = (keypair.pubkey(), *socket);
let (check, ping) = ping_cache.check(now, node, &mut pingf);
let (check, ping) = ping_cache.check(&mut rng, &this_node, now, node);
// Assert that initially remote nodes will not pass the
// ping/pong check.
assert!(!check);
Expand All @@ -3225,7 +3219,7 @@ mod tests {
let pongs: Vec<(SocketAddr, Pong)> = pings
.iter()
.zip(&remote_nodes)
.map(|(ping, (keypair, socket))| (*socket, Pong::new(ping, keypair).unwrap()))
.map(|(ping, (keypair, socket))| (*socket, Pong::new(ping, keypair)))
.collect();
let now = now + Duration::from_millis(1);
cluster_info.handle_batch_pong_messages(pongs, now);
Expand All @@ -3234,7 +3228,7 @@ mod tests {
let mut ping_cache = cluster_info.ping_cache.lock().unwrap();
for (keypair, socket) in &remote_nodes {
let node = (keypair.pubkey(), *socket);
let (check, _) = ping_cache.check(now, node, || -> Option<Ping> { None });
let (check, _) = ping_cache.check(&mut rng, &this_node, now, node);
assert!(check);
}
}
Expand All @@ -3243,7 +3237,7 @@ mod tests {
let mut ping_cache = cluster_info.ping_cache.lock().unwrap();
let (keypair, socket) = new_rand_remote_node(&mut rng);
let node = (keypair.pubkey(), socket);
let (check, _) = ping_cache.check(now, node, || -> Option<Ping> { None });
let (check, _) = ping_cache.check(&mut rng, &this_node, now, node);
assert!(!check);
}
}
Expand All @@ -3263,11 +3257,11 @@ mod tests {
.collect();
let pings: Vec<_> = remote_nodes
.iter()
.map(|(keypair, _)| Ping::new_rand(&mut rng, keypair).unwrap())
.map(|(keypair, _)| Ping::new(rng.gen(), keypair))
.collect();
let pongs: Vec<_> = pings
.iter()
.map(|ping| Pong::new(ping, &this_node).unwrap())
.map(|ping| Pong::new(ping, &this_node))
.collect();
let recycler = PacketBatchRecycler::default();
let packets = cluster_info
Expand Down
8 changes: 4 additions & 4 deletions gossip/src/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ use {
crds_gossip_push::CrdsGossipPush,
crds_value::CrdsValue,
duplicate_shred::{self, DuplicateShredIndex, MAX_DUPLICATE_SHREDS},
ping_pong::PingCache,
protocol::Ping,
protocol::{Ping, PingCache},
},
itertools::Itertools,
rand::{CryptoRng, Rng},
Expand Down Expand Up @@ -386,7 +385,6 @@ pub(crate) fn maybe_ping_gossip_addresses<R: Rng + CryptoRng>(
pings: &mut Vec<(SocketAddr, Ping)>,
) -> Vec<ContactInfo> {
let mut ping_cache = ping_cache.lock().unwrap();
let mut pingf = move || Ping::new_rand(rng, keypair).ok();
let now = Instant::now();
nodes
.into_iter()
Expand All @@ -396,7 +394,7 @@ pub(crate) fn maybe_ping_gossip_addresses<R: Rng + CryptoRng>(
};
let (check, ping) = {
let node = (*node.pubkey(), node_gossip);
ping_cache.check(now, node, &mut pingf)
ping_cache.check(rng, keypair, now, node)
};
if let Some(ping) = ping {
pings.push((node_gossip, ping));
Expand Down Expand Up @@ -431,6 +429,8 @@ mod test {
)
.unwrap();
let ping_cache = PingCache::new(
&mut rand::thread_rng(),
Instant::now(),
Duration::from_secs(20 * 60), // ttl
Duration::from_secs(20 * 60) / 64, // rate_limit_delay
128, // capacity
Expand Down
Loading

0 comments on commit 9f70456

Please sign in to comment.