Skip to content

Commit

Permalink
shrink batches when over 80% of the space is wasted (backport #23066) (
Browse files Browse the repository at this point in the history
…#23189)

* shrink batches when over 80% of the space is wasted (#23066)

* shrink batches when over 80% of the space is wasted

(cherry picked from commit 83d31c9)

# Conflicts:
#	core/benches/sigverify_stage.rs
#	core/src/sigverify_stage.rs
#	perf/src/sigverify.rs

* fixup!

Co-authored-by: anatoly yakovenko <[email protected]>
  • Loading branch information
mergify[bot] and aeyakovenko authored Feb 16, 2022
1 parent 41bbc11 commit 3fd78ac
Show file tree
Hide file tree
Showing 4 changed files with 458 additions and 22 deletions.
43 changes: 24 additions & 19 deletions core/benches/sigverify_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use {
log::*,
rand::{thread_rng, Rng},
solana_core::{sigverify::TransactionSigVerifier, sigverify_stage::SigVerifyStage},
solana_perf::{packet::to_packet_batches, test_tx::test_tx},
solana_perf::{packet::to_packet_batches, packet::PacketBatch, test_tx::test_tx},
solana_sdk::{
hash::Hash,
signature::{Keypair, Signer},
Expand Down Expand Up @@ -112,19 +112,10 @@ fn bench_packet_discard_mixed_senders(bencher: &mut Bencher) {
});
}

#[bench]
fn bench_sigverify_stage(bencher: &mut Bencher) {
solana_logger::setup();
let (packet_s, packet_r) = channel();
let (verified_s, verified_r) = unbounded();
let verifier = TransactionSigVerifier::default();
let stage = SigVerifyStage::new(packet_r, verified_s, verifier);

let now = Instant::now();
fn gen_batches(use_same_tx: bool) -> Vec<PacketBatch> {
let len = 4096;
let use_same_tx = true;
let chunk_size = 1024;
let mut batches = if use_same_tx {
if use_same_tx {
let tx = test_tx();
to_packet_batches(&vec![tx; len], chunk_size)
} else {
Expand All @@ -142,14 +133,28 @@ fn bench_sigverify_stage(bencher: &mut Bencher) {
})
.collect();
to_packet_batches(&txs, chunk_size)
};
}
}

trace!(
"starting... generation took: {} ms batches: {}",
duration_as_ms(&now.elapsed()),
batches.len()
);
#[bench]
fn bench_sigverify_stage(bencher: &mut Bencher) {
solana_logger::setup();
trace!("start");
let (packet_s, packet_r) = channel();
let (verified_s, verified_r) = unbounded();
let verifier = TransactionSigVerifier::default();
let stage = SigVerifyStage::new(packet_r, verified_s, verifier);

let use_same_tx = true;
bencher.iter(move || {
let now = Instant::now();
let mut batches = gen_batches(use_same_tx);
trace!(
"starting... generation took: {} ms batches: {}",
duration_as_ms(&now.elapsed()),
batches.len()
);

let mut sent_len = 0;
for _ in 0..batches.len() {
if let Some(batch) = batches.pop() {
Expand All @@ -165,7 +170,7 @@ fn bench_sigverify_stage(bencher: &mut Bencher) {
received += v.packets.len();
batches.push(v);
}
if received >= sent_len {
if use_same_tx || received >= sent_len {
break;
}
}
Expand Down
86 changes: 84 additions & 2 deletions core/src/sigverify_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use {
itertools::Itertools,
solana_measure::measure::Measure,
solana_perf::packet::PacketBatch,
solana_perf::sigverify::Deduper,
solana_perf::sigverify::{count_valid_packets, shrink_batches, Deduper},
solana_sdk::timing,
solana_streamer::streamer::{self, PacketBatchReceiver, StreamerError},
std::{
Expand Down Expand Up @@ -59,6 +59,9 @@ struct SigVerifierStats {
total_packets: usize,
total_dedup: usize,
total_excess_fail: usize,
total_shrink_time: usize,
total_shrinks: usize,
total_valid_packets: usize,
}

impl SigVerifierStats {
Expand Down Expand Up @@ -167,6 +170,9 @@ impl SigVerifierStats {
("total_packets", self.total_packets, i64),
("total_dedup", self.total_dedup, i64),
("total_excess_fail", self.total_excess_fail, i64),
("total_shrink_time", self.total_shrink_time, i64),
("total_shrinks", self.total_shrinks, i64),
("total_valid_packets", self.total_valid_packets, i64),
);
}
}
Expand Down Expand Up @@ -242,7 +248,20 @@ impl SigVerifyStage {
discard_time.stop();

let mut verify_batch_time = Measure::start("sigverify_batch_time");
let batches = verifier.verify_batches(batches);
let mut batches = verifier.verify_batches(batches);
verify_batch_time.stop();

let mut shrink_time = Measure::start("sigverify_shrink_time");
let num_valid_packets = count_valid_packets(&batches);
let start_len = batches.len();
const MAX_EMPTY_BATCH_RATIO: usize = 4;
if num_packets > num_valid_packets.saturating_mul(MAX_EMPTY_BATCH_RATIO) {
let valid = shrink_batches(&mut batches);
batches.truncate(valid);
}
let total_shrinks = start_len.saturating_sub(batches.len());
shrink_time.stop();

sendr.send(batches)?;
verify_batch_time.stop();

Expand Down Expand Up @@ -276,7 +295,10 @@ impl SigVerifyStage {
stats.total_batches += batches_len;
stats.total_packets += num_packets;
stats.total_dedup += dedup_fail;
stats.total_valid_packets += num_valid_packets;
stats.total_excess_fail += excess_fail;
stats.total_shrink_time += shrink_time.as_us() as usize;
stats.total_shrinks += total_shrinks;

Ok(())
}
Expand Down Expand Up @@ -342,6 +364,12 @@ impl SigVerifyStage {

#[cfg(test)]
mod tests {
use crate::sigverify::TransactionSigVerifier;
use crate::sigverify_stage::timing::duration_as_ms;
use crossbeam_channel::unbounded;
use solana_perf::packet::to_packet_batches;
use solana_perf::test_tx::test_tx;
use std::sync::mpsc::channel;
use {super::*, solana_perf::packet::Packet};

fn count_non_discard(packet_batches: &[PacketBatch]) -> usize {
Expand Down Expand Up @@ -370,4 +398,58 @@ mod tests {
assert!(!batches[0].packets[0].meta.discard());
assert!(!batches[0].packets[3].meta.discard());
}
fn gen_batches(use_same_tx: bool) -> Vec<PacketBatch> {
let len = 4096;
let chunk_size = 1024;
if use_same_tx {
let tx = test_tx();
to_packet_batches(&vec![tx; len], chunk_size)
} else {
let txs: Vec<_> = (0..len).map(|_| test_tx()).collect();
to_packet_batches(&txs, chunk_size)
}
}

#[test]
fn test_sigverify_stage() {
solana_logger::setup();
trace!("start");
let (packet_s, packet_r) = channel();
let (verified_s, verified_r) = unbounded();
let verifier = TransactionSigVerifier::default();
let stage = SigVerifyStage::new(packet_r, verified_s, verifier);

let use_same_tx = true;
let now = Instant::now();
let mut batches = gen_batches(use_same_tx);
trace!(
"starting... generation took: {} ms batches: {}",
duration_as_ms(&now.elapsed()),
batches.len()
);

let mut sent_len = 0;
for _ in 0..batches.len() {
if let Some(batch) = batches.pop() {
sent_len += batch.packets.len();
packet_s.send(batch).unwrap();
}
}
let mut received = 0;
trace!("sent: {}", sent_len);
loop {
if let Ok(mut verifieds) = verified_r.recv_timeout(Duration::from_millis(10)) {
while let Some(v) = verifieds.pop() {
received += v.packets.len();
batches.push(v);
}
if use_same_tx || received >= sent_len {
break;
}
}
}
trace!("received: {}", received);
drop(packet_s);
stage.join().unwrap();
}
}
86 changes: 86 additions & 0 deletions perf/benches/shrink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#![allow(clippy::integer_arithmetic)]
#![feature(test)]

extern crate test;

use {
rand::prelude::*,
solana_perf::{
packet::{to_packet_batches, PacketBatch, PACKETS_PER_BATCH},
sigverify,
},
test::Bencher,
};

const NUM_PACKETS: usize = 1024 * 4;

fn test_packet_with_size(size: usize, rng: &mut ThreadRng) -> Vec<u8> {
// subtract 8 bytes because the length will get serialized as well
(0..size.checked_sub(8).unwrap())
.map(|_| rng.gen())
.collect()
}

fn do_bench_shrink_packets(bencher: &mut Bencher, mut batches: Vec<PacketBatch>) {
// verify packets
bencher.iter(|| {
let _ans = sigverify::shrink_batches(&mut batches);
batches.iter_mut().for_each(|b| {
b.packets
.iter_mut()
.for_each(|p| p.meta.set_discard(thread_rng().gen()))
});
});
}

#[bench]
#[ignore]
fn bench_shrink_diff_small_packets(bencher: &mut Bencher) {
let mut rng = rand::thread_rng();

let batches = to_packet_batches(
&(0..NUM_PACKETS)
.map(|_| test_packet_with_size(128, &mut rng))
.collect::<Vec<_>>(),
PACKETS_PER_BATCH,
);

do_bench_shrink_packets(bencher, batches);
}

#[bench]
#[ignore]
fn bench_shrink_diff_big_packets(bencher: &mut Bencher) {
let mut rng = rand::thread_rng();

let batches = to_packet_batches(
&(0..NUM_PACKETS)
.map(|_| test_packet_with_size(1024, &mut rng))
.collect::<Vec<_>>(),
PACKETS_PER_BATCH,
);

do_bench_shrink_packets(bencher, batches);
}

#[bench]
#[ignore]
fn bench_shrink_count_packets(bencher: &mut Bencher) {
let mut rng = rand::thread_rng();

let mut batches = to_packet_batches(
&(0..NUM_PACKETS)
.map(|_| test_packet_with_size(128, &mut rng))
.collect::<Vec<_>>(),
PACKETS_PER_BATCH,
);
batches.iter_mut().for_each(|b| {
b.packets
.iter_mut()
.for_each(|p| p.meta.set_discard(thread_rng().gen()))
});

bencher.iter(|| {
let _ = sigverify::count_valid_packets(&batches);
});
}
Loading

0 comments on commit 3fd78ac

Please sign in to comment.