Skip to content

Commit

Permalink
adds hash domain to ping-pong protocol (#27193)
Browse files Browse the repository at this point in the history
In order to maintain backward compatibility, for now the responding node
will hash the token both with and without domain so that the other node
will accept the response regardless of its upgrade status.
Once the cluster has upgraded to the new code, we will remove the legacy
domain = false case.

(cherry picked from commit 6928b2a)

# Conflicts:
#	gossip/src/cluster_info.rs
  • Loading branch information
behzadnouri authored and mergify[bot] committed Aug 18, 2022
1 parent 9d93aa7 commit 02d1368
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 20 deletions.
15 changes: 10 additions & 5 deletions core/src/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,16 +425,21 @@ impl AncestorHashesService {
stats.invalid_packets += 1;
return None;
}
if ping.verify() {
stats.ping_count += 1;
if let Ok(pong) = Pong::new(&ping, keypair) {
if !ping.verify() {
stats.ping_err_verify_count += 1;
return None;
}
stats.ping_count += 1;
// Respond both with and without domain so that the other node
// will accept the response regardless of its upgrade status.
// TODO: remove domain = false once cluster is upgraded.
for domain in [false, true] {
if let Ok(pong) = Pong::new(domain, &ping, keypair) {
let pong = RepairProtocol::Pong(pong);
if let Ok(pong_bytes) = serialize(&pong) {
let _ignore = ancestor_socket.send_to(&pong_bytes[..], from_addr);
}
}
} else {
stats.ping_err_verify_count += 1;
}
None
}
Expand Down
15 changes: 10 additions & 5 deletions core/src/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1031,11 +1031,16 @@ impl ServeRepair {
}
packet.meta.set_discard(true);
stats.ping_count += 1;
if let Ok(pong) = Pong::new(&ping, keypair) {
let pong = RepairProtocol::Pong(pong);
if let Ok(pong_bytes) = serialize(&pong) {
let from_addr = packet.meta.socket_addr();
pending_pongs.push((pong_bytes, from_addr));
// Respond both with and without domain so that the other node
// will accept the response regardless of its upgrade status.
// TODO: remove domain = false once cluster is upgraded.
for domain in [false, true] {
if let Ok(pong) = Pong::new(domain, &ping, keypair) {
let pong = RepairProtocol::Pong(pong);
if let Ok(pong_bytes) = serialize(&pong) {
let from_addr = packet.meta.socket_addr();
pending_pongs.push((pong_bytes, from_addr));
}
}
}
}
Expand Down
26 changes: 22 additions & 4 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2160,6 +2160,7 @@ impl ClusterInfo {
I: IntoIterator<Item = (SocketAddr, Ping)>,
{
let keypair = self.keypair();
<<<<<<< HEAD
let packets: Vec<_> = pings
.into_iter()
.filter_map(|(addr, ping)| {
Expand All @@ -2175,6 +2176,21 @@ impl ClusterInfo {
})
.collect();
if packets.is_empty() {
=======
let mut pongs_and_dests = Vec::new();
for (addr, ping) in pings {
// Respond both with and without domain so that the other node will
// accept the response regardless of its upgrade status.
// TODO: remove domain = false once cluster is upgraded.
for domain in [false, true] {
if let Ok(pong) = Pong::new(domain, &ping, &keypair) {
let pong = Protocol::PongMessage(pong);
pongs_and_dests.push((addr, pong));
}
}
}
if pongs_and_dests.is_empty() {
>>>>>>> 6928b2a5a (adds hash domain to ping-pong protocol (#27193))
None
} else {
let packet_batch = PacketBatch::new_unpinned_with_recycler_data(
Expand Down Expand Up @@ -3181,7 +3197,9 @@ mod tests {
let pongs: Vec<(SocketAddr, Pong)> = pings
.iter()
.zip(&remote_nodes)
.map(|(ping, (keypair, socket))| (*socket, Pong::new(ping, keypair).unwrap()))
.map(|(ping, (keypair, socket))| {
(*socket, Pong::new(/*domain:*/ true, ping, keypair).unwrap())
})
.collect();
let now = now + Duration::from_millis(1);
cluster_info.handle_batch_pong_messages(pongs, now);
Expand Down Expand Up @@ -3224,7 +3242,7 @@ mod tests {
.collect();
let pongs: Vec<_> = pings
.iter()
.map(|ping| Pong::new(ping, &this_node).unwrap())
.map(|ping| Pong::new(/*domain:*/ false, ping, &this_node).unwrap())
.collect();
let recycler = PacketBatchRecycler::default();
let packets = cluster_info
Expand All @@ -3236,9 +3254,9 @@ mod tests {
&recycler,
)
.unwrap();
assert_eq!(remote_nodes.len(), packets.len());
assert_eq!(remote_nodes.len() * 2, packets.len());
for (packet, (_, socket), pong) in izip!(
packets.into_iter(),
packets.into_iter().step_by(2),
remote_nodes.into_iter(),
pongs.into_iter()
) {
Expand Down
40 changes: 34 additions & 6 deletions gossip/src/ping_pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use {
},
};

const PING_PONG_HASH_PREFIX: &[u8] = "SOLANA_PING_PONG".as_bytes();

#[derive(AbiExample, Debug, Deserialize, Serialize)]
pub struct Ping<T> {
from: Pubkey,
Expand Down Expand Up @@ -100,8 +102,17 @@ impl<T: Serialize> Signable for Ping<T> {
}

impl Pong {
pub fn new<T: Serialize>(ping: &Ping<T>, keypair: &Keypair) -> Result<Self, Error> {
let hash = hash::hash(&serialize(&ping.token)?);
pub fn new<T: Serialize>(
domain: bool,
ping: &Ping<T>,
keypair: &Keypair,
) -> Result<Self, Error> {
let token = serialize(&ping.token)?;
let hash = if domain {
hash::hashv(&[PING_PONG_HASH_PREFIX, &token])
} else {
hash::hash(&token)
};
let pong = Pong {
from: keypair.pubkey(),
hash,
Expand Down Expand Up @@ -187,9 +198,15 @@ impl PingCache {
Some(t) if now.saturating_duration_since(*t) < delay => None,
_ => {
let ping = pingf()?;
let hash = hash::hash(&serialize(&ping.token).ok()?);
self.pings.put(node, now);
let token = serialize(&ping.token).ok()?;
// For backward compatibility, for now responses both with and
// without domain are accepted.
// TODO: remove no domain case once cluster is upgraded.
let hash = hash::hash(&token);
self.pending_cache.put(hash, node);
let hash = hash::hashv(&[PING_PONG_HASH_PREFIX, &token]);
self.pending_cache.put(hash, node);
self.pings.put(node, now);
Some(ping)
}
}
Expand Down Expand Up @@ -281,10 +298,18 @@ mod tests {
assert!(ping.verify());
assert!(ping.sanitize().is_ok());

let pong = Pong::new(&ping, &keypair).unwrap();
let pong = Pong::new(/*domain:*/ false, &ping, &keypair).unwrap();
assert!(pong.verify());
assert!(pong.sanitize().is_ok());
assert_eq!(hash::hash(&ping.token), pong.hash);

let pong = Pong::new(/*domian:*/ true, &ping, &keypair).unwrap();
assert!(pong.verify());
assert!(pong.sanitize().is_ok());
assert_eq!(
hash::hashv(&[PING_PONG_HASH_PREFIX, &ping.token]),
pong.hash
);
}

#[test]
Expand Down Expand Up @@ -339,7 +364,10 @@ mod tests {
assert!(ping.is_none());
}
Some(ping) => {
let pong = Pong::new(ping, keypair).unwrap();
let domain = rng.gen_ratio(1, 2);
let pong = Pong::new(domain, ping, keypair).unwrap();
assert!(cache.add(&pong, *socket, now));
let pong = Pong::new(!domain, ping, keypair).unwrap();
assert!(cache.add(&pong, *socket, now));
}
}
Expand Down

0 comments on commit 02d1368

Please sign in to comment.