From 1cc99c09719d335bcaa81b74432cda53ec76c511 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 19 Jan 2022 22:38:33 -0800 Subject: [PATCH] Faster dedup --- core/src/sigverify_stage.rs | 27 ++----- perf/benches/dedup.rs | 25 ++---- perf/src/sigverify.rs | 147 +++++++++++++++++++++++++++--------- 3 files changed, 126 insertions(+), 73 deletions(-) diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 1d3101da44f6fe..76dd73472871b1 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -7,13 +7,11 @@ use { crate::sigverify, - core::time::Duration, crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender}, itertools::Itertools, - solana_bloom::bloom::{AtomicBloom, Bloom}, solana_measure::measure::Measure, solana_perf::packet::PacketBatch, - solana_perf::sigverify::dedup_packets, + solana_perf::sigverify::Deduper, solana_sdk::timing, solana_streamer::streamer::{self, PacketBatchReceiver, StreamerError}, std::{ @@ -214,7 +212,7 @@ impl SigVerifyStage { } fn verifier( - bloom: &AtomicBloom<&[u8]>, + deduper: &Deduper, recvr: &PacketBatchReceiver, sendr: &Sender>, verifier: &T, @@ -230,7 +228,7 @@ impl SigVerifyStage { ); let mut dedup_time = Measure::start("sigverify_dedup_time"); - let dedup_fail = dedup_packets(bloom, &mut batches) as usize; + let dedup_fail = deduper.dedup_packets(&mut batches) as usize; dedup_time.stop(); let valid_packets = num_packets.saturating_sub(dedup_fail); @@ -289,25 +287,16 @@ impl SigVerifyStage { let verifier = verifier.clone(); let mut stats = SigVerifierStats::default(); let mut last_print = Instant::now(); - const MAX_BLOOM_AGE: Duration = Duration::from_millis(2_000); - const MAX_BLOOM_ITEMS: usize = 1_000_000; - const MAX_BLOOM_FAIL: f64 = 0.0001; - const MAX_BLOOM_BITS: usize = 8 << 22; + const MAX_DEDUPER_AGE_MS: u64 = 2_000; + const MAX_DEDUPER_ITEMS: u32 = 1_000_000; Builder::new() .name("solana-verifier".to_string()) .spawn(move || { - let mut bloom = - Bloom::random(MAX_BLOOM_ITEMS, MAX_BLOOM_FAIL, MAX_BLOOM_BITS).into(); - let mut bloom_age = Instant::now(); + let mut deduper = Deduper::new(MAX_DEDUPER_ITEMS, MAX_DEDUPER_AGE_MS); loop { - let now = Instant::now(); - if now.duration_since(bloom_age) > MAX_BLOOM_AGE { - bloom = - Bloom::random(MAX_BLOOM_ITEMS, MAX_BLOOM_FAIL, MAX_BLOOM_BITS).into(); - bloom_age = now; - } + deduper.reset(); if let Err(e) = Self::verifier( - &bloom, + &deduper, &packet_receiver, &verified_sender, &verifier, diff --git a/perf/benches/dedup.rs b/perf/benches/dedup.rs index 8542da9b3f7a12..45ae1682a1dc82 100644 --- a/perf/benches/dedup.rs +++ b/perf/benches/dedup.rs @@ -5,7 +5,6 @@ extern crate test; use { rand::prelude::*, - solana_bloom::bloom::{AtomicBloom, Bloom}, solana_perf::{ packet::{to_packet_batches, PacketBatch}, sigverify, @@ -22,24 +21,17 @@ fn test_packet_with_size(size: usize, rng: &mut ThreadRng) -> Vec { fn do_bench_dedup_packets(bencher: &mut Bencher, mut batches: Vec) { // verify packets - let mut bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 22).into(); + let mut deduper = sigverify::Deduper::new(1_000_000, 2_000); bencher.iter(|| { - // bench - sigverify::dedup_packets(&bloom, &mut batches); - - // reset - bloom.clear_for_tests(); - batches.iter_mut().for_each(|batch| { - batch - .packets - .iter_mut() - .for_each(|p| p.meta.set_discard(false)) - }); - }) + let _ans = deduper.dedup_packets(&mut batches); + deduper.reset(); + batches + .iter_mut() + .for_each(|b| b.packets.iter_mut().for_each(|p| p.meta.set_discard(false))); + }); } #[bench] -#[ignore] fn bench_dedup_same_small_packets(bencher: &mut Bencher) { let mut rng = rand::thread_rng(); let small_packet = test_packet_with_size(128, &mut rng); @@ -55,7 +47,6 @@ fn bench_dedup_same_small_packets(bencher: &mut Bencher) { } #[bench] -#[ignore] fn bench_dedup_same_big_packets(bencher: &mut Bencher) { let mut rng = rand::thread_rng(); let big_packet = test_packet_with_size(1024, &mut rng); @@ -69,7 +60,6 @@ fn bench_dedup_same_big_packets(bencher: &mut Bencher) { } #[bench] -#[ignore] fn bench_dedup_diff_small_packets(bencher: &mut Bencher) { let mut rng = rand::thread_rng(); @@ -84,7 +74,6 @@ fn bench_dedup_diff_small_packets(bencher: &mut Bencher) { } #[bench] -#[ignore] fn bench_dedup_diff_big_packets(bencher: &mut Bencher) { let mut rng = rand::thread_rng(); diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index fa9f831d252fea..adf11915c04862 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -4,7 +4,6 @@ //! to the GPU. //! -use solana_bloom::bloom::AtomicBloom; #[cfg(test)] use solana_sdk::transaction::Transaction; use { @@ -24,7 +23,11 @@ use { short_vec::decode_shortu16_len, signature::Signature, }, - std::sync::atomic::{AtomicU64, Ordering}, + std::collections::hash_map::RandomState, + std::hash::BuildHasher, + std::hash::Hasher, + std::sync::atomic::{AtomicBool, AtomicU64, Ordering}, + std::time::{Duration, Instant}, std::{convert::TryFrom, mem::size_of}, }; @@ -420,34 +423,78 @@ pub fn generate_offsets( ) } -fn dedup_packet(count: &AtomicU64, packet: &mut Packet, bloom: &AtomicBloom<&[u8]>) { - // If this packet was already marked as discard, drop it - if packet.meta.discard() { - return; +pub struct Deduper { + filter: Vec, + seed: RandomState, + age: Instant, + max_age: Duration, + saturated: AtomicBool, +} + +impl Deduper { + pub fn new(size: u32, max_age_ms: u64) -> Self { + let mut filter: Vec = Vec::with_capacity(size as usize); + filter.resize_with(size as usize, Default::default); + let seed = RandomState::new(); + Self { + filter, + seed, + age: Instant::now(), + max_age: Duration::from_millis(max_age_ms), + saturated: AtomicBool::new(false), + } } - // If this packet was not newly added, it's a dup and should be discarded - if !bloom.add(&&packet.data.as_slice()[0..packet.meta.size]) { - packet.meta.set_discard(true); - count.fetch_add(1, Ordering::Relaxed); + pub fn reset(&mut self) { + let now = Instant::now(); + let saturated = self.saturated.load(Ordering::Relaxed); + if saturated || now.duration_since(self.age) > self.max_age { + for i in &self.filter { + i.store(0, Ordering::Relaxed); + } + self.seed = RandomState::new(); + self.age = now; + self.saturated.store(false, Ordering::Relaxed); + } } -} -pub fn dedup_packets(bloom: &AtomicBloom<&[u8]>, batches: &mut [PacketBatch]) -> u64 { - use rayon::prelude::*; - let packet_count = count_packets_in_batches(batches); - // machine specific random offset to read the u64 from the packet signature - let count = AtomicU64::new(0); - PAR_THREAD_POOL.install(|| { - batches.into_par_iter().for_each(|batch| { - batch - .packets - .par_iter_mut() - .for_each(|p| dedup_packet(&count, p, bloom)) - }) - }); - inc_new_counter_debug!("dedup_packets_total", packet_count); - count.load(Ordering::Relaxed) + fn dedup_packet(&self, count: &AtomicU64, packet: &mut Packet) { + // If this packet was already marked as discard, drop it + if packet.meta.discard() { + return; + } + let mut hasher = self.seed.build_hasher(); + hasher.write(&packet.data[0..packet.meta.size]); + let hash = hasher.finish(); + let len = self.filter.len(); + let pos = (usize::try_from(hash).unwrap()).wrapping_rem(len); + // saturate each position with or + let prev = self.filter[pos].fetch_or(hash, Ordering::Relaxed); + if prev == u64::MAX { + self.saturated.store(true, Ordering::Relaxed); + //reset this value + self.filter[pos].store(0, Ordering::Relaxed); + } + if hash == prev & hash { + packet.meta.set_discard(true); + count.fetch_add(1, Ordering::Relaxed); + } + } + + pub fn dedup_packets(&self, batches: &mut [PacketBatch]) -> u64 { + use rayon::prelude::*; + // machine specific random offset to read the u64 from the packet signature + let count = AtomicU64::new(0); + PAR_THREAD_POOL.install(|| { + batches.into_par_iter().for_each(|batch| { + batch + .packets + .par_iter_mut() + .for_each(|p| self.dedup_packet(&count, p)) + }) + }); + count.load(Ordering::Relaxed) + } } pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool) { @@ -634,7 +681,6 @@ mod tests { test_tx::{new_test_vote_tx, test_multisig_tx, test_tx}, }, bincode::{deserialize, serialize}, - solana_bloom::bloom::{AtomicBloom, Bloom}, solana_sdk::{ instruction::CompiledInstruction, message::{Message, MessageHeader}, @@ -1303,22 +1349,51 @@ mod tests { let mut batches = to_packet_batches(&std::iter::repeat(tx).take(1024).collect::>(), 128); let packet_count = sigverify::count_packets_in_batches(&batches); - let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 20).into(); - let discard = sigverify::dedup_packets(&bloom, &mut batches) as usize; - // because dedup uses a threadpool, there maybe up to N threads of txs that go through - let n = get_thread_count(); - assert!(packet_count < discard + n * 2); + let filter = Deduper::new(1_000_000, 0); + let discard = filter.dedup_packets(&mut batches) as usize; + assert_eq!(packet_count, discard + 1); } #[test] fn test_dedup_diff() { // generate packet vector + let mut filter = Deduper::new(1_000_000, 0); let mut batches = to_packet_batches(&(0..1024).map(|_| test_tx()).collect::>(), 128); - let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 20).into(); - let discard = sigverify::dedup_packets(&bloom, &mut batches) as usize; + let discard = filter.dedup_packets(&mut batches) as usize; // because dedup uses a threadpool, there maybe up to N threads of txs that go through - let n = get_thread_count(); - assert!(discard < n * 2); + assert_eq!(discard, 0); + filter.reset(); + for i in filter.filter { + assert_eq!(i.load(Ordering::Relaxed), 0); + } + } + #[test] + fn test_dedup_saturated() { + // generate packet vector + let filter = Deduper::new(1_000, 0); + assert!(!filter.saturated.load(Ordering::Relaxed)); + for _ in 0..10 { + let mut batches = + to_packet_batches(&(0..1024).map(|_| test_tx()).collect::>(), 128); + let _discard = filter.dedup_packets(&mut batches) as usize; + if filter.saturated.load(Ordering::Relaxed) { + break; + } + } + assert!(filter.saturated.load(Ordering::Relaxed)); + } + #[test] + fn test_dedup_false_positive() { + // generate packet vector + let filter = Deduper::new(1_000_000, 0); + let mut discard = 0; + for i in 0..10 { + let mut batches = + to_packet_batches(&(0..1024).map(|_| test_tx()).collect::>(), 128); + discard += filter.dedup_packets(&mut batches) as usize; + println!("false positive rate: {}/{}", discard, i * 1024); + } + assert!(discard < 1); } }