Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apfitzge/feature/scheduler interface no tests #3

Draft
wants to merge 1 commit into
base: apfitzge/feature/scheduler_interface_no_tests_base
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions banking-bench/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
#![allow(clippy::integer_arithmetic)]

use {
clap::{crate_description, crate_name, Arg, ArgEnum, Command},
crossbeam_channel::{unbounded, Receiver},
log::*,
rand::{thread_rng, Rng},
rayon::prelude::*,
solana_client::connection_cache::ConnectionCache,
solana_core::banking_stage::BankingStage,
solana_core::{
banking_stage::BankingStage,
scheduler_stage::{SchedulerKind, SchedulerStage},
},
solana_gossip::cluster_info::{ClusterInfo, Node},
solana_ledger::{
blockstore::Blockstore,
Expand Down Expand Up @@ -421,19 +425,43 @@ fn main() {
true => ConnectionCache::new(DEFAULT_TPU_CONNECTION_POOL_SIZE),
false => ConnectionCache::with_udp(DEFAULT_TPU_CONNECTION_POOL_SIZE),
};
let banking_stage = BankingStage::new_num_threads(

let (scheduler_stage, transactions_receivers, processed_transactions_sender) =
SchedulerStage::new_num_threads(
SchedulerKind::MultiIteratorScheduler,
num_banking_threads as usize - 2,
verified_receiver,
bank_forks.clone(),
poh_recorder.clone(),
&cluster_info,
);
let banking_stage = BankingStage::new_external_scheduler(
&cluster_info,
&poh_recorder,
verified_receiver,
transactions_receivers.unwrap(),
processed_transactions_sender.unwrap(),
tpu_vote_receiver,
vote_receiver,
num_banking_threads,
None,
replay_vote_sender,
None,
Arc::new(connection_cache),
bank_forks.clone(),
);

// let banking_stage = BankingStage::new_num_threads(
// &cluster_info,
// &poh_recorder,
// verified_receiver,
// tpu_vote_receiver,
// vote_receiver,
// num_banking_threads,
// None,
// replay_vote_sender,
// None,
// Arc::new(connection_cache),
// bank_forks.clone(),
// );
poh_recorder.write().unwrap().set_bank(&bank, false);

// This is so that the signal_receiver does not go out of scope after the closure.
Expand Down Expand Up @@ -578,6 +606,7 @@ fn main() {
drop(tpu_vote_sender);
drop(vote_sender);
exit.store(true, Ordering::Relaxed);
scheduler_stage.join().unwrap();
banking_stage.join().unwrap();
debug!("waited for banking_stage");
poh_service.join().unwrap();
Expand Down
33 changes: 32 additions & 1 deletion bench-tps/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,30 @@ impl<'a> KeypairChunks<'a> {
dest: dest_keypair_chunks,
}
}

/// Split input vector of keypairs into conflict sets with chunks of given size
fn new_with_conflict_groups(
keypairs: &'a [Keypair],
chunk_size: usize,
num_conflict_groups: usize,
) -> Self {
let mut source: Vec<Vec<&Keypair>> = Vec::new();
let mut dest: Vec<VecDeque<&Keypair>> = Vec::new();

// Transfer from accounts to the first account in each conflict group
for chunk in keypairs.chunks_exact(2 * chunk_size) {
source.push(chunk[..chunk_size].iter().collect());
dest.push(
chunk[..chunk_size]
.iter()
.enumerate()
.map(|(idx, _)| &chunk[chunk_size + idx % num_conflict_groups])
.collect(),
);
}

Self { source, dest }
}
}

struct TransactionChunkGenerator<'a, 'b, T: ?Sized> {
Expand All @@ -110,8 +134,13 @@ where
chunk_size: usize,
use_randomized_compute_unit_price: bool,
instruction_padding_config: Option<InstructionPaddingConfig>,
num_conflict_groups: Option<usize>,
) -> Self {
let account_chunks = KeypairChunks::new(gen_keypairs, chunk_size);
let account_chunks = if let Some(num_conflict_groups) = num_conflict_groups {
KeypairChunks::new_with_conflict_groups(gen_keypairs, chunk_size, num_conflict_groups)
} else {
KeypairChunks::new(gen_keypairs, chunk_size)
};
let nonce_chunks =
nonce_keypairs.map(|nonce_keypairs| KeypairChunks::new(nonce_keypairs, chunk_size));

Expand Down Expand Up @@ -353,6 +382,7 @@ where
use_randomized_compute_unit_price,
use_durable_nonce,
instruction_padding_config,
num_conflict_groups,
..
} = config;

Expand All @@ -364,6 +394,7 @@ where
tx_count,
use_randomized_compute_unit_price,
instruction_padding_config,
num_conflict_groups,
);

let first_tx_count = loop {
Expand Down
16 changes: 16 additions & 0 deletions bench-tps/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub struct Config {
pub use_randomized_compute_unit_price: bool,
pub use_durable_nonce: bool,
pub instruction_padding_config: Option<InstructionPaddingConfig>,
pub num_conflict_groups: Option<usize>,
}

impl Default for Config {
Expand Down Expand Up @@ -95,6 +96,7 @@ impl Default for Config {
use_randomized_compute_unit_price: false,
use_durable_nonce: false,
instruction_padding_config: None,
num_conflict_groups: None,
}
}
}
Expand Down Expand Up @@ -342,6 +344,12 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> {
.takes_value(true)
.help("If set, wraps all instructions in the instruction padding program, with the given amount of padding bytes in instruction data."),
)
.arg(
Arg::with_name("num_conflict_groups")
.long("num-conflict-groups")
.takes_value(true)
.help("If set, creates different sets of conflicting transactions")
)
}

/// Parses a clap `ArgMatches` structure into a `Config`
Expand Down Expand Up @@ -494,5 +502,13 @@ pub fn extract_args(matches: &ArgMatches) -> Config {
});
}

if let Some(num_conflict_groups) = matches.value_of("num_conflict_groups") {
args.num_conflict_groups = Some(
num_conflict_groups
.parse()
.expect("Can't parse conflict groups"),
);
}

args
}
Loading