From 7a7b020580ebe26533bea14d64a20753ef601cf8 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 16 Mar 2023 16:45:42 +0000 Subject: [PATCH] dedups packets using an atomic bloom filter (#30726) Current Deduper implementation uses many bits per entry: https://github.com/solana-labs/solana/blob/65cd55261/perf/src/deduper.rs#L70-L73 and may be saturated quickly. It also lacks api to specify desired false positive rate. The commit instead uses an atomic bloom filter with K hash functions. The false positive rate is obtained by tracking popcount of bits. --- Cargo.lock | 2 + core/src/sigverify_stage.rs | 11 +- perf/Cargo.toml | 2 + perf/benches/dedup.rs | 14 ++- perf/src/deduper.rs | 217 +++++++++++++++++++++++------------- 5 files changed, 158 insertions(+), 88 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 99cf3120b25c56..8bd0488d94dbab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6026,6 +6026,7 @@ dependencies = [ "matches", "nix", "rand 0.7.3", + "rand_chacha 0.2.2", "rayon", "serde", "solana-logger 1.16.0", @@ -6033,6 +6034,7 @@ dependencies = [ "solana-rayon-threadlimit", "solana-sdk 1.16.0", "solana-vote-program", + "test-case", ] [[package]] diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index bb157144cf740a..ffe9613a79bce0 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -290,7 +290,7 @@ impl SigVerifyStage { } fn verifier( - deduper: &Deduper, + deduper: &Deduper<2>, recvr: &FindPacketSenderStakeReceiver, verifier: &mut T, stats: &mut SigVerifierStats, @@ -410,13 +410,16 @@ impl SigVerifyStage { let mut stats = SigVerifierStats::default(); let mut last_print = Instant::now(); const MAX_DEDUPER_AGE: Duration = Duration::from_secs(2); - const MAX_DEDUPER_ITEMS: u32 = 1_000_000; + const DEDUPER_FALSE_POSITIVE_RATE: f64 = 0.001; + const DEDUPER_NUM_BITS: u64 = 63_999_979; Builder::new() .name("solSigVerifier".to_string()) .spawn(move || { - let mut deduper = Deduper::new(MAX_DEDUPER_ITEMS, MAX_DEDUPER_AGE); + let mut rng = rand::thread_rng(); + let mut deduper = + Deduper::<2>::new(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, DEDUPER_NUM_BITS); loop { - deduper.reset(); + deduper.maybe_reset(&mut rng, &MAX_DEDUPER_AGE); if let Err(e) = Self::verifier(&deduper, &packet_receiver, &mut verifier, &mut stats) { diff --git a/perf/Cargo.toml b/perf/Cargo.toml index 1d0bad068ec1f6..dc123d9f506708 100644 --- a/perf/Cargo.toml +++ b/perf/Cargo.toml @@ -37,7 +37,9 @@ name = "solana_perf" [dev-dependencies] matches = { workspace = true } +rand_chacha = "0.2.2" solana-logger = { workspace = true } +test-case = { workspace = true } [[bench]] name = "sigverify" diff --git a/perf/benches/dedup.rs b/perf/benches/dedup.rs index 20eae904928b84..b2a601bc77045a 100644 --- a/perf/benches/dedup.rs +++ b/perf/benches/dedup.rs @@ -24,10 +24,13 @@ fn test_packet_with_size(size: usize, rng: &mut ThreadRng) -> Vec { fn do_bench_dedup_packets(bencher: &mut Bencher, mut batches: Vec) { // verify packets - let mut deduper = Deduper::new(1_000_000, Duration::from_millis(2_000)); + let mut rng = rand::thread_rng(); + let mut deduper = Deduper::<2>::new( + &mut rng, /*false_positive_rate:*/ 0.001, /*num_bits:*/ 63_999_979, + ); bencher.iter(|| { let _ans = deduper.dedup_packets_and_count_discards(&mut batches, |_, _, _| ()); - deduper.reset(); + deduper.maybe_reset(&mut rng, /*reset_cycle:*/ &Duration::from_secs(2)); batches .iter_mut() .for_each(|b| b.iter_mut().for_each(|p| p.meta_mut().set_discard(false))); @@ -112,8 +115,11 @@ fn bench_dedup_baseline(bencher: &mut Bencher) { #[bench] #[ignore] fn bench_dedup_reset(bencher: &mut Bencher) { - let mut deduper = Deduper::new(1_000_000, Duration::from_millis(0)); + let mut rng = rand::thread_rng(); + let mut deduper = Deduper::<2>::new( + &mut rng, /*false_positive_rate:*/ 0.001, /*num_bits:*/ 63_999_979, + ); bencher.iter(|| { - deduper.reset(); + deduper.maybe_reset(&mut rng, /*reset_cycle:*/ &Duration::from_millis(0)); }); } diff --git a/perf/src/deduper.rs b/perf/src/deduper.rs index 661145db14e942..777535a6bc28e4 100644 --- a/perf/src/deduper.rs +++ b/perf/src/deduper.rs @@ -3,82 +3,71 @@ use { crate::packet::{Packet, PacketBatch}, ahash::AHasher, - rand::{thread_rng, Rng}, - solana_sdk::saturating_add_assign, + rand::Rng, std::{ - convert::TryFrom, - hash::Hasher, - sync::atomic::{AtomicBool, AtomicU64, Ordering}, + hash::{Hash, Hasher}, + iter::repeat_with, + sync::atomic::{AtomicU64, Ordering}, time::{Duration, Instant}, }, }; -pub struct Deduper { - filter: Vec, - seed: (u128, u128), - age: Instant, - max_age: Duration, - pub saturated: AtomicBool, +pub struct Deduper { + num_bits: u64, + bits: Vec, + seeds: [(u128, u128); K], + clock: Instant, + // Maximum number of one bits before the false positive + // rate exceeds the specified threshold. + capacity: u64, + popcount: AtomicU64, // Number of one bits in self.bits. } -impl Deduper { - pub fn new(size: u32, max_age: Duration) -> Self { - let mut filter: Vec = Vec::with_capacity(size as usize); - filter.resize_with(size as usize, Default::default); - let seed = thread_rng().gen(); +impl Deduper { + pub fn new(rng: &mut R, false_positive_rate: f64, num_bits: u64) -> Self { + assert!(0.0 < false_positive_rate && false_positive_rate < 1.0); + let size = usize::try_from(num_bits.checked_add(63).unwrap() / 64).unwrap(); + let capacity = num_bits as f64 * false_positive_rate.powf(1f64 / K as f64); Self { - filter, - seed, - age: Instant::now(), - max_age, - saturated: AtomicBool::new(false), + num_bits, + seeds: std::array::from_fn(|_| rng.gen()), + clock: Instant::now(), + bits: repeat_with(AtomicU64::default).take(size).collect(), + capacity: capacity as u64, + popcount: AtomicU64::default(), } } - pub fn reset(&mut self) { - let now = Instant::now(); - //this should reset every 500k unique packets per 1m sized deduper - //false positive rate is 1/1000 at that point - let saturated = self.saturated.load(Ordering::Relaxed); - if saturated || now.duration_since(self.age) > self.max_age { - let len = self.filter.len(); - self.filter.clear(); - self.filter.resize_with(len, AtomicU64::default); - self.seed = thread_rng().gen(); - self.age = now; - self.saturated.store(false, Ordering::Relaxed); + pub fn maybe_reset(&mut self, rng: &mut R, reset_cycle: &Duration) { + let popcount = self.popcount.load(Ordering::Relaxed); + if popcount >= self.capacity || &self.clock.elapsed() >= reset_cycle { + self.seeds = std::array::from_fn(|_| rng.gen()); + self.clock = Instant::now(); + self.bits.fill_with(AtomicU64::default); + self.popcount = AtomicU64::default(); } } - /// Compute hash from packet data, returns (hash, bin_pos). - fn compute_hash(&self, packet: &Packet) -> (u64, usize) { - let mut hasher = AHasher::new_with_keys(self.seed.0, self.seed.1); - hasher.write(packet.data(..).unwrap_or_default()); - let h = hasher.finish(); - let len = self.filter.len(); - let pos = (usize::try_from(h).unwrap()).wrapping_rem(len); - (h, pos) - } - - // Deduplicates packets and returns 1 if packet is to be discarded. Else, 0. - fn dedup_packet(&self, packet: &mut Packet) -> u64 { - // If this packet was already marked as discard, drop it - if packet.meta().discard() { - return 1; - } - let (hash, pos) = self.compute_hash(packet); - // saturate each position with or - let prev = self.filter[pos].fetch_or(hash, Ordering::Relaxed); - if prev == u64::MAX { - self.saturated.store(true, Ordering::Relaxed); - //reset this value - self.filter[pos].store(hash, Ordering::Relaxed); - } - if hash == prev & hash { - packet.meta_mut().set_discard(true); - return 1; + // Returns true if the packet is duplicate. + #[must_use] + #[allow(clippy::integer_arithmetic)] + fn dedup_packet(&self, packet: &Packet) -> bool { + // Should not dedup packet if already discarded. + debug_assert!(!packet.meta().discard()); + let mut out = true; + for seed in self.seeds { + let mut hasher = AHasher::new_with_keys(seed.0, seed.1); + packet.data(..).unwrap_or_default().hash(&mut hasher); + let hash: u64 = hasher.finish() % self.num_bits; + let index = (hash >> 6) as usize; + let mask: u64 = 1u64 << (hash & 63); + let old = self.bits[index].fetch_or(mask, Ordering::Relaxed); + if old & mask == 0u64 { + self.popcount.fetch_add(1, Ordering::Relaxed); + out = false; + } } - 0 + out } pub fn dedup_packets_and_count_discards( @@ -86,18 +75,21 @@ impl Deduper { batches: &mut [PacketBatch], mut process_received_packet: impl FnMut(&mut Packet, bool, bool), ) -> u64 { - let mut num_removed: u64 = 0; - batches.iter_mut().for_each(|batch| { - batch.iter_mut().for_each(|p| { - let removed_before_sigverify = p.meta().discard(); - let is_duplicate = self.dedup_packet(p); - if is_duplicate == 1 { - saturating_add_assign!(num_removed, 1); + batches + .iter_mut() + .flat_map(PacketBatch::iter_mut) + .map(|packet| { + if packet.meta().discard() { + process_received_packet(packet, true, false); + } else if self.dedup_packet(packet) { + packet.meta_mut().set_discard(true); + process_received_packet(packet, false, true); + } else { + process_received_packet(packet, false, false); } - process_received_packet(p, removed_before_sigverify, is_duplicate == 1); + u64::from(packet.meta().discard()) }) - }); - num_removed + .sum() } } @@ -107,6 +99,10 @@ mod tests { use { super::*, crate::{packet::to_packet_batches, sigverify, test_tx::test_tx}, + rand::SeedableRng, + rand_chacha::ChaChaRng, + solana_sdk::packet::{Meta, PACKET_DATA_SIZE}, + test_case::test_case, }; #[test] @@ -116,7 +112,10 @@ mod tests { let mut batches = to_packet_batches(&std::iter::repeat(tx).take(1024).collect::>(), 128); let packet_count = sigverify::count_packets_in_batches(&batches); - let filter = Deduper::new(1_000_000, Duration::from_millis(0)); + let mut rng = rand::thread_rng(); + let filter = Deduper::<2>::new( + &mut rng, /*false_positive_rate:*/ 0.001, /*num_bits:*/ 63_999_979, + ); let mut num_deduped = 0; let discard = filter.dedup_packets_and_count_discards( &mut batches, @@ -130,13 +129,16 @@ mod tests { #[test] fn test_dedup_diff() { - let mut filter = Deduper::new(1_000_000, Duration::from_millis(0)); + let mut rng = rand::thread_rng(); + let mut filter = Deduper::<2>::new( + &mut rng, /*false_positive_rate:*/ 0.001, /*num_bits:*/ 63_999_979, + ); let mut batches = to_packet_batches(&(0..1024).map(|_| test_tx()).collect::>(), 128); let discard = filter.dedup_packets_and_count_discards(&mut batches, |_, _, _| ()) as usize; // because dedup uses a threadpool, there maybe up to N threads of txs that go through assert_eq!(discard, 0); - filter.reset(); - for i in filter.filter { + filter.maybe_reset(&mut rng, /*reset_cycle:*/ &Duration::from_millis(0)); + for i in filter.bits { assert_eq!(i.load(Ordering::Relaxed), 0); } } @@ -144,24 +146,30 @@ mod tests { #[test] #[ignore] fn test_dedup_saturated() { - let filter = Deduper::new(1_000_000, Duration::from_millis(0)); + let mut rng = rand::thread_rng(); + let filter = Deduper::<2>::new( + &mut rng, /*false_positive_rate:*/ 0.001, /*num_bits:*/ 63_999_979, + ); let mut discard = 0; - assert!(!filter.saturated.load(Ordering::Relaxed)); + assert!(filter.popcount.load(Ordering::Relaxed) < filter.capacity); for i in 0..1000 { let mut batches = to_packet_batches(&(0..1000).map(|_| test_tx()).collect::>(), 128); discard += filter.dedup_packets_and_count_discards(&mut batches, |_, _, _| ()) as usize; trace!("{} {}", i, discard); - if filter.saturated.load(Ordering::Relaxed) { + if filter.popcount.load(Ordering::Relaxed) >= filter.capacity { break; } } - assert!(filter.saturated.load(Ordering::Relaxed)); + assert!(filter.popcount.load(Ordering::Relaxed) >= filter.capacity); } #[test] fn test_dedup_false_positive() { - let filter = Deduper::new(1_000_000, Duration::from_millis(0)); + let mut rng = rand::thread_rng(); + let filter = Deduper::<2>::new( + &mut rng, /*false_positive_rate:*/ 0.001, /*num_bits:*/ 63_999_979, + ); let mut discard = 0; for i in 0..10 { let mut batches = @@ -172,4 +180,53 @@ mod tests { //allow for 1 false positive even if extremely unlikely assert!(discard < 2); } + + #[test_case(63_999_979, 0.001, 2_023_857)] + #[test_case(622_401_961, 0.001, 19_682_078)] + #[test_case(622_401_979, 0.001, 19_682_078)] + #[test_case(629_145_593, 0.001, 19_895_330)] + #[test_case(632_455_543, 0.001, 20_000_000)] + #[test_case(637_534_199, 0.001, 20_160_601)] + #[test_case(622_401_961, 0.0001, 6_224_019)] + #[test_case(622_401_979, 0.0001, 6_224_019)] + #[test_case(629_145_593, 0.0001, 6_291_455)] + #[test_case(632_455_543, 0.0001, 6_324_555)] + #[test_case(637_534_199, 0.0001, 6_375_341)] + fn test_dedup_capacity(num_bits: u64, false_positive_rate: f64, capacity: u64) { + let mut rng = rand::thread_rng(); + let deduper = Deduper::<2>::new(&mut rng, false_positive_rate, num_bits); + assert_eq!(deduper.capacity, capacity); + } + + #[test_case([0xf9; 32], 3_199_997, 101_192, 51_414, 70, 101_125)] + #[test_case([0xdc; 32], 3_200_003, 101_192, 51_414, 71, 101_132)] + #[test_case([0xa5; 32], 6_399_971, 202_384, 102_828, 127, 202_157)] + #[test_case([0xdb; 32], 6_400_013, 202_386, 102_828, 145, 202_277)] + #[test_case([0xcd; 32], 12_799_987, 404_771, 205_655, 289, 404_434)] + #[test_case([0xc3; 32], 12_800_009, 404_771, 205_656, 309, 404_278)] + fn test_dedup_seeded( + seed: [u8; 32], + num_bits: u64, + capacity: u64, + num_packets: usize, + num_dups: usize, + popcount: u64, + ) { + let mut rng = ChaChaRng::from_seed(seed); + let deduper = Deduper::<2>::new(&mut rng, /*false_positive_rate:*/ 0.001, num_bits); + assert_eq!(deduper.capacity, capacity); + let mut packet = Packet::new([0u8; PACKET_DATA_SIZE], Meta::default()); + let mut dup_count = 0usize; + for _ in 0..num_packets { + let size = rng.gen_range(0, PACKET_DATA_SIZE); + packet.meta_mut().size = size; + rng.fill(&mut packet.buffer_mut()[0..size]); + if deduper.dedup_packet(&packet) { + dup_count += 1; + } + assert!(deduper.dedup_packet(&packet)); + } + assert_eq!(dup_count, num_dups); + assert_eq!(deduper.popcount.load(Ordering::Relaxed), popcount); + } }