From 784b766a25be6f4f8bbc4d158ba6bcf0d517e747 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Thu, 10 Feb 2022 15:37:30 -0800 Subject: [PATCH 1/9] shrink batches when over 80% of the space is wasted --- core/src/sigverify_stage.rs | 17 +++++++- perf/benches/shrink.rs | 86 +++++++++++++++++++++++++++++++++++++ perf/src/sigverify.rs | 68 ++++++++++++++++++++++++++++- 3 files changed, 168 insertions(+), 3 deletions(-) create mode 100644 perf/benches/shrink.rs diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index ec9dc692833b14..c15d7c92530c64 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -12,7 +12,7 @@ use { itertools::Itertools, solana_measure::measure::Measure, solana_perf::packet::PacketBatch, - solana_perf::sigverify::Deduper, + solana_perf::sigverify::{count_valid_packets, shrink_batches, Deduper}, solana_sdk::timing, solana_streamer::streamer::{self, PacketBatchReceiver, StreamerError}, std::{ @@ -58,6 +58,7 @@ struct SigVerifierStats { total_packets: usize, total_dedup: usize, total_excess_fail: usize, + total_shrink_time: usize, } impl SigVerifierStats { @@ -166,6 +167,7 @@ impl SigVerifierStats { ("total_packets", self.total_packets, i64), ("total_dedup", self.total_dedup, i64), ("total_excess_fail", self.total_excess_fail, i64), + ("total_shrink_time", self.total_shrink_time, i64), ); } } @@ -248,7 +250,17 @@ impl SigVerifyStage { discard_time.stop(); let mut verify_batch_time = Measure::start("sigverify_batch_time"); - let batches = verifier.verify_batches(batches, num_valid_packets); + let mut batches = verifier.verify_batches(batches, num_valid_packets); + verify_batch_time.stop(); + + let mut shrink_time = Measure::start("sigverify_shrink_time"); + let num_valid_packets = count_valid_packets(&batches); + if num_packets > num_valid_packets.saturating_mul(4) { + let valid = shrink_batches(&mut batches); + batches.truncate(valid); + } + shrink_time.stop(); + sendr.send(batches)?; verify_batch_time.stop(); @@ -283,6 +295,7 @@ impl SigVerifyStage { stats.total_packets += num_packets; stats.total_dedup += dedup_fail; stats.total_excess_fail += excess_fail; + stats.total_shrink_time += shrink_time.as_us() as usize; Ok(()) } diff --git a/perf/benches/shrink.rs b/perf/benches/shrink.rs new file mode 100644 index 00000000000000..cf60fd90e53e9a --- /dev/null +++ b/perf/benches/shrink.rs @@ -0,0 +1,86 @@ +#![allow(clippy::integer_arithmetic)] +#![feature(test)] + +extern crate test; + +use { + rand::prelude::*, + solana_perf::{ + packet::{to_packet_batches, PacketBatch, PACKETS_PER_BATCH}, + sigverify, + }, + test::Bencher, +}; + +const NUM_PACKETS: usize = 1024 * 4; + +fn test_packet_with_size(size: usize, rng: &mut ThreadRng) -> Vec { + // subtract 8 bytes because the length will get serialized as well + (0..size.checked_sub(8).unwrap()) + .map(|_| rng.gen()) + .collect() +} + +fn do_bench_shrink_packets(bencher: &mut Bencher, mut batches: Vec) { + // verify packets + bencher.iter(|| { + let _ans = sigverify::shrink_batches(&mut batches); + batches.iter_mut().for_each(|b| { + b.packets + .iter_mut() + .for_each(|p| p.meta.set_discard(thread_rng().gen())) + }); + }); +} + +#[bench] +#[ignore] +fn bench_shrink_diff_small_packets(bencher: &mut Bencher) { + let mut rng = rand::thread_rng(); + + let batches = to_packet_batches( + &(0..NUM_PACKETS) + .map(|_| test_packet_with_size(128, &mut rng)) + .collect::>(), + PACKETS_PER_BATCH, + ); + + do_bench_shrink_packets(bencher, batches); +} + +#[bench] +#[ignore] +fn bench_shrink_diff_big_packets(bencher: &mut Bencher) { + let mut rng = rand::thread_rng(); + + let batches = to_packet_batches( + &(0..NUM_PACKETS) + .map(|_| test_packet_with_size(1024, &mut rng)) + .collect::>(), + PACKETS_PER_BATCH, + ); + + do_bench_shrink_packets(bencher, batches); +} + +#[bench] +#[ignore] +fn bench_shrink_count_packets(bencher: &mut Bencher) { + let mut rng = rand::thread_rng(); + + let mut batches = to_packet_batches( + &(0..NUM_PACKETS) + .map(|_| test_packet_with_size(128, &mut rng)) + .collect::>(), + PACKETS_PER_BATCH, + ); + batches.iter_mut().for_each(|b| { + b.packets + .iter_mut() + .for_each(|p| p.meta.set_discard(thread_rng().gen())) + }); + + bencher.iter(|| { + let _ = sigverify::count_valid_packets(&batches); + }); +} diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index 83a895d79086a6..e6c42c0881c6cd 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -167,6 +167,13 @@ pub fn count_packets_in_batches(batches: &[PacketBatch]) -> usize { batches.iter().map(|batch| batch.packets.len()).sum() } +pub fn count_valid_packets(batches: &[PacketBatch]) -> usize { + batches + .iter() + .map(|batch| batch.packets.iter().filter(|p| !p.meta.discard()).count()) + .sum() +} + // internal function to be unit-tested; should be used only by get_packet_offsets fn do_get_packet_offsets( packet: &Packet, @@ -496,6 +503,45 @@ impl Deduper { } } +//inplace shrink a batch of packets +pub fn shrink_batches(batches: &mut Vec) -> usize { + let mut i = 0; + let mut ii = 0; + let mut j = 1; + let mut jj = 0; + let mut last = 0; + while j < batches.len() { + while jj < batches[j].packets.len() { + if batches[j].packets[jj].meta.discard() { + jj = jj.saturating_add(1); + continue; + } + last = j; + let mut done = false; + while i < j && !done { + while ii < batches[i].packets.len() { + if batches[i].packets[ii].meta.discard() { + batches[i].packets[ii] = batches[j].packets[jj].clone(); + batches[j].packets[jj].meta.set_discard(true); + last = i; + done = true; + break; + } + ii = ii.saturating_add(1); + } + if ii >= batches[i].packets.len() { + ii = 0; + i = i.saturating_add(1); + } + } + jj = jj.saturating_add(1); + } + jj = 0; + j = j.saturating_add(1); + } + last.saturating_add(1) +} + pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool, packet_count: usize) { use rayon::prelude::*; debug!("CPU ECDSA for {}", packet_count); @@ -681,7 +727,7 @@ mod tests { use { super::*, crate::{ - packet::{to_packet_batches, Packet, PacketBatch}, + packet::{to_packet_batches, Packet, PacketBatch, PACKETS_PER_BATCH}, sigverify::{self, PacketOffsets}, test_tx::{new_test_vote_tx, test_multisig_tx, test_tx}, }, @@ -1433,4 +1479,24 @@ mod tests { //allow for 1 false positive even if extremely unlikely assert!(discard < 2); } + + #[test] + fn test_shrink() { + for _ in 0..10 { + let mut batches = to_packet_batches( + &(0..1024).map(|_| test_tx()).collect::>(), + PACKETS_PER_BATCH, + ); + batches.iter_mut().for_each(|b| { + b.packets + .iter_mut() + .for_each(|p| p.meta.set_discard(thread_rng().gen())) + }); + let packet_count = count_valid_packets(&batches); + let res = shrink_batches(&mut batches); + batches.truncate(res); + let packet_count2 = count_valid_packets(&batches); + assert_eq!(packet_count, packet_count2); + } + } } From d7d73ebcef5c08e34505745668d923661a1805cb Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sat, 12 Feb 2022 09:31:58 -0800 Subject: [PATCH 2/9] more stats --- core/src/sigverify_stage.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index c15d7c92530c64..4eef853a7824be 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -59,6 +59,8 @@ struct SigVerifierStats { total_dedup: usize, total_excess_fail: usize, total_shrink_time: usize, + total_shrinks: usize, + total_valid_packets: usize, } impl SigVerifierStats { @@ -168,6 +170,8 @@ impl SigVerifierStats { ("total_dedup", self.total_dedup, i64), ("total_excess_fail", self.total_excess_fail, i64), ("total_shrink_time", self.total_shrink_time, i64), + ("total_shrinks", self.total_shrinks, i64), + ("total_valid_packets", self.total_valid_packets, i64), ); } } @@ -255,10 +259,13 @@ impl SigVerifyStage { let mut shrink_time = Measure::start("sigverify_shrink_time"); let num_valid_packets = count_valid_packets(&batches); - if num_packets > num_valid_packets.saturating_mul(4) { + let start_len = batches.len(); + const MAX_EMPTY_BATCH_RATIO: usize = 4; + if num_packets > num_valid_packets.saturating_mul(MAX_EMPTY_BATCH_RATIO) { let valid = shrink_batches(&mut batches); batches.truncate(valid); } + let total_shrinks = start_len.saturating_sub(batches.len()); shrink_time.stop(); sendr.send(batches)?; @@ -294,8 +301,10 @@ impl SigVerifyStage { stats.total_batches += batches_len; stats.total_packets += num_packets; stats.total_dedup += dedup_fail; + stats.total_valid_packets += num_valid_packets; stats.total_excess_fail += excess_fail; stats.total_shrink_time += shrink_time.as_us() as usize; + stats.total_shrinks += total_shrinks; Ok(()) } From afa75ad6224e5e2ace3307c8f579006fd4c425f8 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sun, 13 Feb 2022 11:57:57 -0800 Subject: [PATCH 3/9] better variable names, use for instead of while where makes sense --- perf/src/sigverify.rs | 52 +++++++++++++++++++++---------------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index e6c42c0881c6cd..fc65ef3e58ceac 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -505,41 +505,39 @@ impl Deduper { //inplace shrink a batch of packets pub fn shrink_batches(batches: &mut Vec) -> usize { - let mut i = 0; - let mut ii = 0; - let mut j = 1; - let mut jj = 0; - let mut last = 0; - while j < batches.len() { - while jj < batches[j].packets.len() { + let mut valid_batch_ix = 0; + let mut valid_packet_ix = 0; + let mut last_valid_batch = 0; + for j in 1..batches.len() { + for jj in 0..batches[j].packets.len() { if batches[j].packets[jj].meta.discard() { - jj = jj.saturating_add(1); continue; } - last = j; - let mut done = false; - while i < j && !done { - while ii < batches[i].packets.len() { - if batches[i].packets[ii].meta.discard() { - batches[i].packets[ii] = batches[j].packets[jj].clone(); + last_valid_batch = j; + let mut found_spot = false; + while valid_batch_ix < j && !found_spot { + while valid_packet_ix < batches[valid_batch_ix].packets.len() { + if batches[valid_batch_ix].packets[valid_packet_ix] + .meta + .discard() + { + batches[valid_batch_ix].packets[valid_packet_ix] = + batches[j].packets[jj].clone(); batches[j].packets[jj].meta.set_discard(true); - last = i; - done = true; + last_valid_batch = valid_batch_ix; + found_spot = true; break; } - ii = ii.saturating_add(1); + valid_packet_ix = valid_packet_ix.saturating_add(1); } - if ii >= batches[i].packets.len() { - ii = 0; - i = i.saturating_add(1); + if valid_packet_ix >= batches[valid_batch_ix].packets.len() { + valid_packet_ix = 0; + valid_batch_ix = valid_batch_ix.saturating_add(1); } } - jj = jj.saturating_add(1); } - jj = 0; - j = j.saturating_add(1); } - last.saturating_add(1) + last_valid_batch.saturating_add(1) } pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool, packet_count: usize) { @@ -1482,9 +1480,11 @@ mod tests { #[test] fn test_shrink() { - for _ in 0..10 { + for _ in 0..5 { let mut batches = to_packet_batches( - &(0..1024).map(|_| test_tx()).collect::>(), + &(0..PACKETS_PER_BATCH * 3) + .map(|_| test_tx()) + .collect::>(), PACKETS_PER_BATCH, ); batches.iter_mut().for_each(|b| { From 6dc0bea8b030acfe2ef5d7e34772fc320712831b Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Mon, 14 Feb 2022 12:13:23 -0800 Subject: [PATCH 4/9] test that pre/post shrink txs are the smae --- perf/src/sigverify.rs | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index fc65ef3e58ceac..815886e9a029ba 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -1492,11 +1492,32 @@ mod tests { .iter_mut() .for_each(|p| p.meta.set_discard(thread_rng().gen())) }); + //find all the non discarded packets + let mut start = vec![]; + batches.iter_mut().for_each(|b| { + b.packets + .iter_mut() + .filter(|p| !p.meta.discard()) + .for_each(|p| start.push(p.clone())) + }); + start.sort_by_key(|p| p.data); + let packet_count = count_valid_packets(&batches); let res = shrink_batches(&mut batches); batches.truncate(res); + + //make sure all the non discarded packets are the same + let mut end = vec![]; + batches.iter_mut().for_each(|b| { + b.packets + .iter_mut() + .filter(|p| !p.meta.discard()) + .for_each(|p| end.push(p.clone())) + }); + end.sort_by_key(|p| p.data); let packet_count2 = count_valid_packets(&batches); assert_eq!(packet_count, packet_count2); + assert_eq!(start, end); } } } From 0464e2604dde9113cd32d663b5dfbc8f77946b1e Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Mon, 14 Feb 2022 15:32:37 -0800 Subject: [PATCH 5/9] update --- perf/src/sigverify.rs | 186 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 181 insertions(+), 5 deletions(-) diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index 815886e9a029ba..83692974386d94 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -508,12 +508,12 @@ pub fn shrink_batches(batches: &mut Vec) -> usize { let mut valid_batch_ix = 0; let mut valid_packet_ix = 0; let mut last_valid_batch = 0; - for j in 1..batches.len() { + for j in 0..batches.len() { for jj in 0..batches[j].packets.len() { if batches[j].packets[jj].meta.discard() { continue; } - last_valid_batch = j; + last_valid_batch = j.saturating_add(1); let mut found_spot = false; while valid_batch_ix < j && !found_spot { while valid_packet_ix < batches[valid_batch_ix].packets.len() { @@ -524,7 +524,7 @@ pub fn shrink_batches(batches: &mut Vec) -> usize { batches[valid_batch_ix].packets[valid_packet_ix] = batches[j].packets[jj].clone(); batches[j].packets[jj].meta.set_discard(true); - last_valid_batch = valid_batch_ix; + last_valid_batch = valid_batch_ix.saturating_add(1); found_spot = true; break; } @@ -537,7 +537,7 @@ pub fn shrink_batches(batches: &mut Vec) -> usize { } } } - last_valid_batch.saturating_add(1) + last_valid_batch } pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool, packet_count: usize) { @@ -1479,7 +1479,7 @@ mod tests { } #[test] - fn test_shrink() { + fn test_shrink_fuzz() { for _ in 0..5 { let mut batches = to_packet_batches( &(0..PACKETS_PER_BATCH * 3) @@ -1520,4 +1520,180 @@ mod tests { assert_eq!(start, end); } } + + #[test] + fn test_shrink_empty() { + const PACKET_COUNT: usize = 1024; + const BATCH_COUNT: usize = PACKET_COUNT / PACKETS_PER_BATCH; + + // No batches + // truncate of 1 on len 0 is a noop + assert_eq!(shrink_batches(&mut vec![]), 0); + // One empty batch + assert_eq!(shrink_batches(&mut vec![PacketBatch::with_capacity(0)]), 0); + // Many empty batches + let mut batches = (0..BATCH_COUNT) + .map(|_| PacketBatch::with_capacity(0)) + .collect::>(); + assert_eq!(shrink_batches(&mut batches), 0); + } + + #[test] + fn test_shrink_vectors() { + const PACKET_COUNT: usize = 1024; + const BATCH_COUNT: usize = PACKET_COUNT / PACKETS_PER_BATCH; + + let set_discards = [ + // contiguous + // 0 + // No discards + |_, _| false, + // All discards + |_, _| true, + // single partitions + // discard last half of packets + |b, p| ((b * PACKETS_PER_BATCH) + p) >= (PACKET_COUNT / 2), + // discard first half of packets + |b, p| ((b * PACKETS_PER_BATCH) + p) < (PACKET_COUNT / 2), + // discard last half of each batch + |_, p| p >= (PACKETS_PER_BATCH / 2), + // 5 + // discard first half of each batch + |_, p| p < (PACKETS_PER_BATCH / 2), + // uniform sparse + // discard even packets + |b, p| ((b * PACKETS_PER_BATCH) + p) % 2 == 0, + // discard odd packets + |b, p| ((b * PACKETS_PER_BATCH) + p) % 2 == 1, + // discard even batches + |b, _| b % 2 == 0, + // discard odd batches + |b, _| b % 2 == 1, + // edges + // 10 + // discard first batch + |b, _| b == 0, + // discard last batch + |b, _| b == BATCH_COUNT - 1, + // discard first and last batches + |b, _| b == 0 || b == BATCH_COUNT - 1, + // discard all but first and last batches + |b, _| b != 0 && b != BATCH_COUNT - 1, + // discard first packet + |b, p| ((b * PACKETS_PER_BATCH) + p) == 0, + // 15 + // discard all but first packet + |b, p| ((b * PACKETS_PER_BATCH) + p) != 0, + // discard last packet + |b, p| ((b * PACKETS_PER_BATCH) + p) == PACKET_COUNT - 1, + // discard all but last packet + |b, p| ((b * PACKETS_PER_BATCH) + p) != PACKET_COUNT - 1, + // discard first packet of each batch + |_, p| p == 0, + // discard all but first packet of each batch + |_, p| p != 0, + // 20 + // discard last packet of each batch + |_, p| p == PACKETS_PER_BATCH - 1, + // discard all but last packet of each batch + |_, p| p != PACKETS_PER_BATCH - 1, + // discard first and last packet of each batch + |_, p| p == 0 || p == PACKETS_PER_BATCH - 1, + // discard all but first and last packet of each batch + |_, p| p != 0 && p != PACKETS_PER_BATCH - 1, + // discard all after first packet in second to last batch + |b, p| (b == BATCH_COUNT - 2 && p > 0) || b == BATCH_COUNT - 1, + // 25 + ]; + + let expect_valids = [ + // (expected_batches, expected_valid_packets) + // + // contiguous + // 0 + (BATCH_COUNT, PACKET_COUNT), + (0, 0), + // single partitions + (BATCH_COUNT / 2, PACKET_COUNT / 2), + (BATCH_COUNT / 2, PACKET_COUNT / 2), + (BATCH_COUNT / 2, PACKET_COUNT / 2), + // 5 + (BATCH_COUNT / 2, PACKET_COUNT / 2), + // uniform sparse + (BATCH_COUNT / 2, PACKET_COUNT / 2), + (BATCH_COUNT / 2, PACKET_COUNT / 2), + (BATCH_COUNT / 2, PACKET_COUNT / 2), + (BATCH_COUNT / 2, PACKET_COUNT / 2), + // edges + // 10 + (BATCH_COUNT - 1, PACKET_COUNT - PACKETS_PER_BATCH), + (BATCH_COUNT - 1, PACKET_COUNT - PACKETS_PER_BATCH), + (BATCH_COUNT - 2, PACKET_COUNT - 2 * PACKETS_PER_BATCH), + (2, 2 * PACKETS_PER_BATCH), + (BATCH_COUNT, PACKET_COUNT - 1), + // 15 + (1, 1), + (BATCH_COUNT, PACKET_COUNT - 1), + (1, 1), + ( + (BATCH_COUNT * (PACKETS_PER_BATCH - 1) + PACKETS_PER_BATCH) / PACKETS_PER_BATCH, + (PACKETS_PER_BATCH - 1) * BATCH_COUNT, + ), + ( + (BATCH_COUNT + PACKETS_PER_BATCH) / PACKETS_PER_BATCH, + BATCH_COUNT, + ), + // 20 + ( + (BATCH_COUNT * (PACKETS_PER_BATCH - 1) + PACKETS_PER_BATCH) / PACKETS_PER_BATCH, + (PACKETS_PER_BATCH - 1) * BATCH_COUNT, + ), + ( + (BATCH_COUNT + PACKETS_PER_BATCH) / PACKETS_PER_BATCH, + BATCH_COUNT, + ), + ( + (BATCH_COUNT * (PACKETS_PER_BATCH - 2) + PACKETS_PER_BATCH) / PACKETS_PER_BATCH, + (PACKETS_PER_BATCH - 2) * BATCH_COUNT, + ), + ( + (2 * BATCH_COUNT + PACKETS_PER_BATCH) / PACKETS_PER_BATCH, + PACKET_COUNT - (PACKETS_PER_BATCH - 2) * BATCH_COUNT, + ), + (BATCH_COUNT - 1, PACKET_COUNT - 2 * PACKETS_PER_BATCH + 1), + // 25 + ]; + + let test_cases = set_discards.iter().zip(&expect_valids).enumerate(); + for (i, (set_discard, (expect_batch_count, expect_valid_packets))) in test_cases { + println!("test_shrink case: {}", i); + let mut batches = to_packet_batches( + &(0..PACKET_COUNT).map(|_| test_tx()).collect::>(), + PACKETS_PER_BATCH, + ); + assert_eq!(batches.len(), BATCH_COUNT); + assert_eq!(count_valid_packets(&batches), PACKET_COUNT); + batches.iter_mut().enumerate().for_each(|(i, b)| { + b.packets + .iter_mut() + .enumerate() + .for_each(|(j, p)| p.meta.set_discard(set_discard(i, j))) + }); + assert_eq!(count_valid_packets(&batches), *expect_valid_packets); + println!("show valid packets for case {}", i); + batches.iter_mut().enumerate().for_each(|(i, b)| { + b.packets.iter_mut().enumerate().for_each(|(j, p)| { + if !p.meta.discard() { + println!("{} {}", i, j) + } + }) + }); + println!("done show valid packets for case {}", i); + let shrunken_batch_count = shrink_batches(&mut batches); + println!("shrunk batch test {} count: {}", i, shrunken_batch_count); + assert_eq!(shrunken_batch_count, *expect_batch_count); + batches.truncate(shrunken_batch_count); + assert_eq!(count_valid_packets(&batches), *expect_valid_packets); + } + } } From 1de5bf58b8ead6b4cea342019485ffff1e2705d2 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Tue, 15 Feb 2022 21:35:28 -0800 Subject: [PATCH 6/9] fix bench --- core/benches/sigverify_stage.rs | 45 +++++++++++++----------- core/src/sigverify_stage.rs | 62 +++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 20 deletions(-) diff --git a/core/benches/sigverify_stage.rs b/core/benches/sigverify_stage.rs index cde843a923cf1c..2eb7502ded388d 100644 --- a/core/benches/sigverify_stage.rs +++ b/core/benches/sigverify_stage.rs @@ -9,7 +9,7 @@ use { log::*, rand::{thread_rng, Rng}, solana_core::{sigverify::TransactionSigVerifier, sigverify_stage::SigVerifyStage}, - solana_perf::{packet::to_packet_batches, test_tx::test_tx}, + solana_perf::{packet::to_packet_batches, packet::PacketBatch, test_tx::test_tx}, solana_sdk::{ hash::Hash, signature::{Keypair, Signer}, @@ -109,19 +109,10 @@ fn bench_packet_discard_mixed_senders(bencher: &mut Bencher) { }); } -#[bench] -fn bench_sigverify_stage(bencher: &mut Bencher) { - solana_logger::setup(); - let (packet_s, packet_r) = unbounded(); - let (verified_s, verified_r) = unbounded(); - let verifier = TransactionSigVerifier::default(); - let stage = SigVerifyStage::new(packet_r, verified_s, verifier); - - let now = Instant::now(); +fn gen_batches(use_same_tx: bool) -> Vec { let len = 4096; - let use_same_tx = true; let chunk_size = 1024; - let mut batches = if use_same_tx { + if use_same_tx { let tx = test_tx(); to_packet_batches(&vec![tx; len], chunk_size) } else { @@ -139,14 +130,28 @@ fn bench_sigverify_stage(bencher: &mut Bencher) { }) .collect(); to_packet_batches(&txs, chunk_size) - }; + } +} - trace!( - "starting... generation took: {} ms batches: {}", - duration_as_ms(&now.elapsed()), - batches.len() - ); +#[bench] +fn bench_sigverify_stage(bencher: &mut Bencher) { + solana_logger::setup(); + println!("start"); + let (packet_s, packet_r) = unbounded(); + let (verified_s, verified_r) = unbounded(); + let verifier = TransactionSigVerifier::default(); + let stage = SigVerifyStage::new(packet_r, verified_s, verifier); + + let use_same_tx = true; bencher.iter(move || { + let now = Instant::now(); + let mut batches = gen_batches(use_same_tx); + println!( + "starting... generation took: {} ms batches: {}", + duration_as_ms(&now.elapsed()), + batches.len() + ); + let mut sent_len = 0; for _ in 0..batches.len() { if let Some(batch) = batches.pop() { @@ -155,7 +160,7 @@ fn bench_sigverify_stage(bencher: &mut Bencher) { } } let mut received = 0; - trace!("sent: {}", sent_len); + println!("sent: {}", sent_len); loop { if let Ok(mut verifieds) = verified_r.recv_timeout(Duration::from_millis(10)) { while let Some(v) = verifieds.pop() { @@ -167,7 +172,7 @@ fn bench_sigverify_stage(bencher: &mut Bencher) { } } } - trace!("received: {}", received); + println!("received: {}", received); }); stage.join().unwrap(); } diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 4eef853a7824be..7495112007ff6e 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -370,6 +370,11 @@ impl SigVerifyStage { #[cfg(test)] mod tests { + use crate::sigverify::TransactionSigVerifier; + use crate::sigverify_stage::timing::duration_as_ms; + use crossbeam_channel::unbounded; + use solana_perf::packet::to_packet_batches; + use solana_perf::test_tx::test_tx; use {super::*, solana_perf::packet::Packet}; fn count_non_discard(packet_batches: &[PacketBatch]) -> usize { @@ -401,4 +406,61 @@ mod tests { assert!(batches[0].packets[3].meta.discard()); assert!(!batches[0].packets[4].meta.discard()); } + fn gen_batches(use_same_tx: bool) -> Vec { + let len = 4096; + let chunk_size = 1024; + if use_same_tx { + let tx = test_tx(); + to_packet_batches(&vec![tx; len], chunk_size) + } else { + let txs: Vec<_> = (0..len) + .map(|_| { + test_tx() + }) + .collect(); + to_packet_batches(&txs, chunk_size) + } + } + + #[test] + fn test_sigverify_stage() { + solana_logger::setup(); + println!("start"); + let (packet_s, packet_r) = unbounded(); + let (verified_s, verified_r) = unbounded(); + let verifier = TransactionSigVerifier::default(); + let stage = SigVerifyStage::new(packet_r, verified_s, verifier); + + let use_same_tx = true; + let now = Instant::now(); + let mut batches = gen_batches(use_same_tx); + println!( + "starting... generation took: {} ms batches: {}", + duration_as_ms(&now.elapsed()), + batches.len() + ); + + let mut sent_len = 0; + for _ in 0..batches.len() { + if let Some(batch) = batches.pop() { + sent_len += batch.packets.len(); + packet_s.send(batch).unwrap(); + } + } + let mut received = 0; + println!("sent: {}", sent_len); + loop { + if let Ok(mut verifieds) = verified_r.recv_timeout(Duration::from_millis(10)) { + while let Some(v) = verifieds.pop() { + received += v.packets.len(); + batches.push(v); + } + if received >= sent_len { + break; + } + } + } + println!("received: {}", received); + stage.join().unwrap(); + } } From 75d335a5817d12b21c00e0777f7582113bce6344 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Tue, 15 Feb 2022 21:54:56 -0800 Subject: [PATCH 7/9] fixed bench --- core/benches/sigverify_stage.rs | 10 +++++----- core/src/sigverify_stage.rs | 3 ++- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/core/benches/sigverify_stage.rs b/core/benches/sigverify_stage.rs index 2eb7502ded388d..e8fadfee44c901 100644 --- a/core/benches/sigverify_stage.rs +++ b/core/benches/sigverify_stage.rs @@ -136,7 +136,7 @@ fn gen_batches(use_same_tx: bool) -> Vec { #[bench] fn bench_sigverify_stage(bencher: &mut Bencher) { solana_logger::setup(); - println!("start"); + trace!("start"); let (packet_s, packet_r) = unbounded(); let (verified_s, verified_r) = unbounded(); let verifier = TransactionSigVerifier::default(); @@ -146,7 +146,7 @@ fn bench_sigverify_stage(bencher: &mut Bencher) { bencher.iter(move || { let now = Instant::now(); let mut batches = gen_batches(use_same_tx); - println!( + trace!( "starting... generation took: {} ms batches: {}", duration_as_ms(&now.elapsed()), batches.len() @@ -160,19 +160,19 @@ fn bench_sigverify_stage(bencher: &mut Bencher) { } } let mut received = 0; - println!("sent: {}", sent_len); + trace!("sent: {}", sent_len); loop { if let Ok(mut verifieds) = verified_r.recv_timeout(Duration::from_millis(10)) { while let Some(v) = verifieds.pop() { received += v.packets.len(); batches.push(v); } - if received >= sent_len { + if use_same_tx || received >= sent_len { break; } } } - println!("received: {}", received); + trace!("received: {}", received); }); stage.join().unwrap(); } diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 7495112007ff6e..823321fca6938c 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -455,12 +455,13 @@ mod tests { received += v.packets.len(); batches.push(v); } - if received >= sent_len { + if use_same_tx || received >= sent_len { break; } } } println!("received: {}", received); + drop(packet_s); stage.join().unwrap(); } } From 5e800f6a7a885730298e7ba17ea3f5568fafd8e2 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Tue, 15 Feb 2022 21:55:59 -0800 Subject: [PATCH 8/9] fmt --- core/src/sigverify_stage.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 823321fca6938c..47a27489c53e63 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -413,11 +413,7 @@ mod tests { let tx = test_tx(); to_packet_batches(&vec![tx; len], chunk_size) } else { - let txs: Vec<_> = (0..len) - .map(|_| { - test_tx() - }) - .collect(); + let txs: Vec<_> = (0..len).map(|_| test_tx()).collect(); to_packet_batches(&txs, chunk_size) } } From 66958f8aca8af65aec30e04527ba5e7397f5a891 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Tue, 15 Feb 2022 22:04:35 -0800 Subject: [PATCH 9/9] fixup --- core/src/sigverify_stage.rs | 8 ++++---- perf/src/sigverify.rs | 14 +++++++------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 47a27489c53e63..baf4cd37d9bbd2 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -421,7 +421,7 @@ mod tests { #[test] fn test_sigverify_stage() { solana_logger::setup(); - println!("start"); + trace!("start"); let (packet_s, packet_r) = unbounded(); let (verified_s, verified_r) = unbounded(); let verifier = TransactionSigVerifier::default(); @@ -430,7 +430,7 @@ mod tests { let use_same_tx = true; let now = Instant::now(); let mut batches = gen_batches(use_same_tx); - println!( + trace!( "starting... generation took: {} ms batches: {}", duration_as_ms(&now.elapsed()), batches.len() @@ -444,7 +444,7 @@ mod tests { } } let mut received = 0; - println!("sent: {}", sent_len); + trace!("sent: {}", sent_len); loop { if let Ok(mut verifieds) = verified_r.recv_timeout(Duration::from_millis(10)) { while let Some(v) = verifieds.pop() { @@ -456,7 +456,7 @@ mod tests { } } } - println!("received: {}", received); + trace!("received: {}", received); drop(packet_s); stage.join().unwrap(); } diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index 83692974386d94..df330da128f933 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -508,22 +508,22 @@ pub fn shrink_batches(batches: &mut Vec) -> usize { let mut valid_batch_ix = 0; let mut valid_packet_ix = 0; let mut last_valid_batch = 0; - for j in 0..batches.len() { - for jj in 0..batches[j].packets.len() { - if batches[j].packets[jj].meta.discard() { + for batch_ix in 0..batches.len() { + for packet_ix in 0..batches[batch_ix].packets.len() { + if batches[batch_ix].packets[packet_ix].meta.discard() { continue; } - last_valid_batch = j.saturating_add(1); + last_valid_batch = batch_ix.saturating_add(1); let mut found_spot = false; - while valid_batch_ix < j && !found_spot { + while valid_batch_ix < batch_ix && !found_spot { while valid_packet_ix < batches[valid_batch_ix].packets.len() { if batches[valid_batch_ix].packets[valid_packet_ix] .meta .discard() { batches[valid_batch_ix].packets[valid_packet_ix] = - batches[j].packets[jj].clone(); - batches[j].packets[jj].meta.set_discard(true); + batches[batch_ix].packets[packet_ix].clone(); + batches[batch_ix].packets[packet_ix].meta.set_discard(true); last_valid_batch = valid_batch_ix.saturating_add(1); found_spot = true; break;