Skip to content

Commit

Permalink
enforces hash domain for ping-pong protocol (backport #28433) (#28961)
Browse files Browse the repository at this point in the history
* enforces hash domain for ping-pong protocol (#28433)

#27193
added hash domain to ping-pong protocol.
For backward compatibility responses both with and without domain were
generated and accepted.
Now that all clusters are upgraded, this commit enforces the hash domain
by removing the response without the domain.

(cherry picked from commit e283461)

# Conflicts:
#	gossip/src/cluster_info.rs

* removes mergify merge conflicts

Co-authored-by: behzad nouri <[email protected]>
  • Loading branch information
mergify[bot] and behzadnouri authored Nov 28, 2022
1 parent 6ee0963 commit c351c2d
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 65 deletions.
13 changes: 4 additions & 9 deletions core/src/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,15 +430,10 @@ impl AncestorHashesService {
return None;
}
stats.ping_count += 1;
// Respond both with and without domain so that the other node
// will accept the response regardless of its upgrade status.
// TODO: remove domain = false once cluster is upgraded.
for domain in [false, true] {
if let Ok(pong) = Pong::new(domain, &ping, keypair) {
let pong = RepairProtocol::Pong(pong);
if let Ok(pong_bytes) = serialize(&pong) {
let _ignore = ancestor_socket.send_to(&pong_bytes[..], from_addr);
}
if let Ok(pong) = Pong::new(&ping, keypair) {
let pong = RepairProtocol::Pong(pong);
if let Ok(pong_bytes) = serialize(&pong) {
let _ignore = ancestor_socket.send_to(&pong_bytes[..], from_addr);
}
}
None
Expand Down
15 changes: 5 additions & 10 deletions core/src/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -946,16 +946,11 @@ impl ServeRepair {
}
packet.meta.set_discard(true);
stats.ping_count += 1;
// Respond both with and without domain so that the other node
// will accept the response regardless of its upgrade status.
// TODO: remove domain = false once cluster is upgraded.
for domain in [false, true] {
if let Ok(pong) = Pong::new(domain, &ping, keypair) {
let pong = RepairProtocol::Pong(pong);
if let Ok(pong_bytes) = serialize(&pong) {
let from_addr = packet.meta.socket_addr();
pending_pongs.push((pong_bytes, from_addr));
}
if let Ok(pong) = Pong::new(&ping, keypair) {
let pong = RepairProtocol::Pong(pong);
if let Ok(pong_bytes) = serialize(&pong) {
let from_addr = packet.meta.socket_addr();
pending_pongs.push((pong_bytes, from_addr));
}
}
}
Expand Down
37 changes: 16 additions & 21 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2163,23 +2163,20 @@ impl ClusterInfo {
I: IntoIterator<Item = (SocketAddr, Ping)>,
{
let keypair = self.keypair();
let mut packets = Vec::new();
for (addr, ping) in pings {
// Respond both with and without domain so that the other node will
// accept the response regardless of its upgrade status.
// TODO: remove domain = false once cluster is upgraded.
for domain in [false, true] {
if let Ok(pong) = Pong::new(domain, &ping, &keypair) {
let pong = Protocol::PongMessage(pong);
match Packet::from_data(Some(&addr), pong) {
Ok(packet) => packets.push(packet),
Err(err) => {
error!("failed to write pong packet: {:?}", err);
}
let packets: Vec<_> = pings
.into_iter()
.filter_map(|(addr, ping)| {
let pong = Pong::new(&ping, &keypair).ok()?;
let pong = Protocol::PongMessage(pong);
match Packet::from_data(Some(&addr), pong) {
Ok(packet) => Some(packet),
Err(err) => {
error!("failed to write pong packet: {:?}", err);
None
}
}
}
}
})
.collect();
if packets.is_empty() {
None
} else {
Expand Down Expand Up @@ -3214,9 +3211,7 @@ mod tests {
let pongs: Vec<(SocketAddr, Pong)> = pings
.iter()
.zip(&remote_nodes)
.map(|(ping, (keypair, socket))| {
(*socket, Pong::new(/*domain:*/ true, ping, keypair).unwrap())
})
.map(|(ping, (keypair, socket))| (*socket, Pong::new(ping, keypair).unwrap()))
.collect();
let now = now + Duration::from_millis(1);
cluster_info.handle_batch_pong_messages(pongs, now);
Expand Down Expand Up @@ -3259,7 +3254,7 @@ mod tests {
.collect();
let pongs: Vec<_> = pings
.iter()
.map(|ping| Pong::new(/*domain:*/ false, ping, &this_node).unwrap())
.map(|ping| Pong::new(ping, &this_node).unwrap())
.collect();
let recycler = PacketBatchRecycler::default();
let packets = cluster_info
Expand All @@ -3271,9 +3266,9 @@ mod tests {
&recycler,
)
.unwrap();
assert_eq!(remote_nodes.len() * 2, packets.len());
assert_eq!(remote_nodes.len(), packets.len());
for (packet, (_, socket), pong) in izip!(
packets.into_iter().step_by(2),
packets.into_iter(),
remote_nodes.into_iter(),
pongs.into_iter()
) {
Expand Down
29 changes: 4 additions & 25 deletions gossip/src/ping_pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,9 @@ impl<T: Serialize> Signable for Ping<T> {
}

impl Pong {
pub fn new<T: Serialize>(
domain: bool,
ping: &Ping<T>,
keypair: &Keypair,
) -> Result<Self, Error> {
pub fn new<T: Serialize>(ping: &Ping<T>, keypair: &Keypair) -> Result<Self, Error> {
let token = serialize(&ping.token)?;
let hash = if domain {
hash::hashv(&[PING_PONG_HASH_PREFIX, &token])
} else {
hash::hash(&token)
};
let hash = hash::hashv(&[PING_PONG_HASH_PREFIX, &token]);
let pong = Pong {
from: keypair.pubkey(),
hash,
Expand Down Expand Up @@ -203,11 +195,6 @@ impl PingCache {
_ => {
let ping = pingf()?;
let token = serialize(&ping.token).ok()?;
// For backward compatibility, for now responses both with and
// without domain are accepted.
// TODO: remove no domain case once cluster is upgraded.
let hash = hash::hash(&token);
self.pending_cache.put(hash, node);
let hash = hash::hashv(&[PING_PONG_HASH_PREFIX, &token]);
self.pending_cache.put(hash, node);
self.pings.put(node, now);
Expand Down Expand Up @@ -303,12 +290,7 @@ mod tests {
assert!(ping.verify());
assert!(ping.sanitize().is_ok());

let pong = Pong::new(/*domain:*/ false, &ping, &keypair).unwrap();
assert!(pong.verify());
assert!(pong.sanitize().is_ok());
assert_eq!(hash::hash(&ping.token), pong.hash);

let pong = Pong::new(/*domian:*/ true, &ping, &keypair).unwrap();
let pong = Pong::new(&ping, &keypair).unwrap();
assert!(pong.verify());
assert!(pong.sanitize().is_ok());
assert_eq!(
Expand Down Expand Up @@ -370,10 +352,7 @@ mod tests {
assert!(ping.is_none());
}
Some(ping) => {
let domain = rng.gen_ratio(1, 2);
let pong = Pong::new(domain, ping, keypair).unwrap();
assert!(cache.add(&pong, *socket, now));
let pong = Pong::new(!domain, ping, keypair).unwrap();
let pong = Pong::new(ping, keypair).unwrap();
assert!(cache.add(&pong, *socket, now));
}
}
Expand Down

0 comments on commit c351c2d

Please sign in to comment.