Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dedup bloom filter is too slow #22607

Merged
merged 7 commits into from
Jan 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 11 additions & 21 deletions core/src/sigverify_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ use {
core::time::Duration,
crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
itertools::Itertools,
solana_bloom::bloom::{AtomicBloom, Bloom},
solana_measure::measure::Measure,
solana_perf::packet::PacketBatch,
solana_perf::sigverify::dedup_packets,
solana_perf::sigverify::Deduper,
solana_sdk::timing,
solana_streamer::streamer::{self, PacketBatchReceiver, StreamerError},
std::{
Expand Down Expand Up @@ -214,7 +213,7 @@ impl SigVerifyStage {
}

fn verifier<T: SigVerifier>(
bloom: &AtomicBloom<&[u8]>,
deduper: &Deduper,
recvr: &PacketBatchReceiver,
sendr: &Sender<Vec<PacketBatch>>,
verifier: &T,
Expand All @@ -230,15 +229,15 @@ impl SigVerifyStage {
);

let mut dedup_time = Measure::start("sigverify_dedup_time");
let dedup_fail = dedup_packets(bloom, &mut batches) as usize;
let dedup_fail = deduper.dedup_packets(&mut batches) as usize;
dedup_time.stop();
let valid_packets = num_packets.saturating_sub(dedup_fail);
let num_unique = num_packets.saturating_sub(dedup_fail);

let mut discard_time = Measure::start("sigverify_discard_time");
if valid_packets > MAX_SIGVERIFY_BATCH {
if num_unique > MAX_SIGVERIFY_BATCH {
Self::discard_excess_packets(&mut batches, MAX_SIGVERIFY_BATCH)
aeyakovenko marked this conversation as resolved.
Show resolved Hide resolved
};
let excess_fail = valid_packets.saturating_sub(MAX_SIGVERIFY_BATCH);
let excess_fail = num_unique.saturating_sub(MAX_SIGVERIFY_BATCH);
discard_time.stop();

let mut verify_batch_time = Measure::start("sigverify_batch_time");
Expand Down Expand Up @@ -289,25 +288,16 @@ impl SigVerifyStage {
let verifier = verifier.clone();
let mut stats = SigVerifierStats::default();
let mut last_print = Instant::now();
const MAX_BLOOM_AGE: Duration = Duration::from_millis(2_000);
const MAX_BLOOM_ITEMS: usize = 1_000_000;
const MAX_BLOOM_FAIL: f64 = 0.0001;
const MAX_BLOOM_BITS: usize = 8 << 22;
const MAX_DEDUPER_AGE: Duration = Duration::from_secs(2);
const MAX_DEDUPER_ITEMS: u32 = 1_000_000;
Builder::new()
.name("solana-verifier".to_string())
.spawn(move || {
let mut bloom =
Bloom::random(MAX_BLOOM_ITEMS, MAX_BLOOM_FAIL, MAX_BLOOM_BITS).into();
let mut bloom_age = Instant::now();
let mut deduper = Deduper::new(MAX_DEDUPER_ITEMS, MAX_DEDUPER_AGE);
loop {
let now = Instant::now();
if now.duration_since(bloom_age) > MAX_BLOOM_AGE {
bloom =
Bloom::random(MAX_BLOOM_ITEMS, MAX_BLOOM_FAIL, MAX_BLOOM_BITS).into();
bloom_age = now;
}
deduper.reset();
if let Err(e) = Self::verifier(
&bloom,
&deduper,
&packet_receiver,
&verified_sender,
&verifier,
Expand Down
1 change: 1 addition & 0 deletions perf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ documentation = "https://docs.rs/solana-perf"
edition = "2021"

[dependencies]
ahash = "0.7.6"
bincode = "1.3.3"
curve25519-dalek = { version = "3" }
dlopen = "0.1.8"
Expand Down
56 changes: 38 additions & 18 deletions perf/benches/dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ extern crate test;

use {
rand::prelude::*,
solana_bloom::bloom::{AtomicBloom, Bloom},
solana_perf::{
packet::{to_packet_batches, PacketBatch},
sigverify,
},
std::time::Duration,
test::Bencher,
};

const NUM: usize = 4096;

fn test_packet_with_size(size: usize, rng: &mut ThreadRng) -> Vec<u8> {
// subtract 8 bytes because the length will get serialized as well
(0..size.checked_sub(8).unwrap())
Expand All @@ -22,20 +24,14 @@ fn test_packet_with_size(size: usize, rng: &mut ThreadRng) -> Vec<u8> {

fn do_bench_dedup_packets(bencher: &mut Bencher, mut batches: Vec<PacketBatch>) {
// verify packets
let mut bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 22).into();
let mut deduper = sigverify::Deduper::new(1_000_000, Duration::from_millis(2_000));
bencher.iter(|| {
// bench
sigverify::dedup_packets(&bloom, &mut batches);

// reset
bloom.clear_for_tests();
batches.iter_mut().for_each(|batch| {
batch
.packets
.iter_mut()
.for_each(|p| p.meta.set_discard(false))
});
})
let _ans = deduper.dedup_packets(&mut batches);
deduper.reset();
aeyakovenko marked this conversation as resolved.
Show resolved Hide resolved
batches
.iter_mut()
.for_each(|b| b.packets.iter_mut().for_each(|p| p.meta.set_discard(false)));
});
}

#[bench]
Expand All @@ -46,7 +42,7 @@ fn bench_dedup_same_small_packets(bencher: &mut Bencher) {

let batches = to_packet_batches(
&std::iter::repeat(small_packet)
.take(4096)
.take(NUM)
.collect::<Vec<_>>(),
128,
);
Expand All @@ -61,7 +57,7 @@ fn bench_dedup_same_big_packets(bencher: &mut Bencher) {
let big_packet = test_packet_with_size(1024, &mut rng);

let batches = to_packet_batches(
&std::iter::repeat(big_packet).take(4096).collect::<Vec<_>>(),
&std::iter::repeat(big_packet).take(NUM).collect::<Vec<_>>(),
128,
);

Expand All @@ -74,7 +70,7 @@ fn bench_dedup_diff_small_packets(bencher: &mut Bencher) {
let mut rng = rand::thread_rng();

let batches = to_packet_batches(
&(0..4096)
&(0..NUM)
.map(|_| test_packet_with_size(128, &mut rng))
.collect::<Vec<_>>(),
128,
Expand All @@ -89,11 +85,35 @@ fn bench_dedup_diff_big_packets(bencher: &mut Bencher) {
let mut rng = rand::thread_rng();

let batches = to_packet_batches(
&(0..4096)
&(0..NUM)
.map(|_| test_packet_with_size(1024, &mut rng))
.collect::<Vec<_>>(),
128,
);

do_bench_dedup_packets(bencher, batches);
}

#[bench]
#[ignore]
fn bench_dedup_baseline(bencher: &mut Bencher) {
let mut rng = rand::thread_rng();

let batches = to_packet_batches(
&(0..0)
.map(|_| test_packet_with_size(128, &mut rng))
.collect::<Vec<_>>(),
128,
);

do_bench_dedup_packets(bencher, batches);
}

#[bench]
#[ignore]
fn bench_dedup_reset(bencher: &mut Bencher) {
let mut deduper = sigverify::Deduper::new(1_000_000, Duration::from_millis(0));
bencher.iter(|| {
deduper.reset();
});
}
Loading