Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PrioGraphSchedulerConfig #4064

Merged
merged 4 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use {
time::{Duration, Instant},
},
transaction_scheduler::{
prio_graph_scheduler::PrioGraphSchedulerConfig,
receive_and_buffer::SanitizedTransactionReceiveAndBuffer,
transaction_state_container::TransactionStateContainer,
},
Expand Down Expand Up @@ -621,7 +622,11 @@ impl BankingStage {
bank_forks.clone(),
forwarder.is_some(),
);
let scheduler = PrioGraphScheduler::new(work_senders, finished_work_receiver);
let scheduler = PrioGraphScheduler::new(
work_senders,
finished_work_receiver,
PrioGraphSchedulerConfig::default(),
);
let scheduler_controller = SchedulerController::new(
decision_maker.clone(),
receive_and_buffer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,47 @@ type SchedulerPrioGraph = PrioGraph<
fn(&TransactionPriorityId, &GraphNode<TransactionPriorityId>) -> TransactionPriorityId,
>;

pub(crate) struct PrioGraphSchedulerConfig {
pub max_scheduled_cus: u64,
pub max_transactions_per_scheduling_pass: usize,
pub look_ahead_window_size: usize,
pub target_transactions_per_batch: usize,
}

impl Default for PrioGraphSchedulerConfig {
fn default() -> Self {
Self {
max_scheduled_cus: MAX_BLOCK_UNITS,
max_transactions_per_scheduling_pass: 100_000,
look_ahead_window_size: 2048,
target_transactions_per_batch: TARGET_NUM_TRANSACTIONS_PER_BATCH,
}
}
}

pub(crate) struct PrioGraphScheduler<Tx> {
in_flight_tracker: InFlightTracker,
account_locks: ThreadAwareAccountLocks,
consume_work_senders: Vec<Sender<ConsumeWork<Tx>>>,
finished_consume_work_receiver: Receiver<FinishedConsumeWork<Tx>>,
look_ahead_window_size: usize,
prio_graph: SchedulerPrioGraph,
config: PrioGraphSchedulerConfig,
}

impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {
pub(crate) fn new(
consume_work_senders: Vec<Sender<ConsumeWork<Tx>>>,
finished_consume_work_receiver: Receiver<FinishedConsumeWork<Tx>>,
config: PrioGraphSchedulerConfig,
) -> Self {
let num_threads = consume_work_senders.len();
Self {
in_flight_tracker: InFlightTracker::new(num_threads),
account_locks: ThreadAwareAccountLocks::new(num_threads),
consume_work_senders,
finished_consume_work_receiver,
look_ahead_window_size: 2048,
prio_graph: PrioGraph::new(passthrough_priority),
config,
}
}

Expand All @@ -89,7 +108,7 @@ impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {
pre_lock_filter: impl Fn(&Tx) -> bool,
) -> Result<SchedulingSummary, SchedulerError> {
let num_threads = self.consume_work_senders.len();
let max_cu_per_thread = MAX_BLOCK_UNITS / num_threads as u64;
let max_cu_per_thread = self.config.max_scheduled_cus / num_threads as u64;

let mut schedulable_threads = ThreadSet::any(num_threads);
for thread_id in 0..num_threads {
Expand All @@ -106,7 +125,7 @@ impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {
});
}

let mut batches = Batches::new(num_threads);
let mut batches = Batches::new(num_threads, self.config.target_transactions_per_batch);
// Some transactions may be unschedulable due to multi-thread conflicts.
// These transactions cannot be scheduled until some conflicting work is completed.
// However, the scheduler should not allow other transactions that conflict with
Expand All @@ -118,7 +137,7 @@ impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {
let mut num_filtered_out: usize = 0;
let mut total_filter_time_us: u64 = 0;

let mut window_budget = self.look_ahead_window_size;
let mut window_budget = self.config.look_ahead_window_size;
let mut chunked_pops = |container: &mut S,
prio_graph: &mut PrioGraph<_, _, _, _>,
window_budget: &mut usize| {
Expand Down Expand Up @@ -170,13 +189,13 @@ impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {
// Check transactions against filter, remove from container if it fails.
chunked_pops(container, &mut self.prio_graph, &mut window_budget);

let mut unblock_this_batch =
Vec::with_capacity(self.consume_work_senders.len() * TARGET_NUM_TRANSACTIONS_PER_BATCH);
const MAX_TRANSACTIONS_PER_SCHEDULING_PASS: usize = 100_000;
let mut unblock_this_batch = Vec::with_capacity(
self.consume_work_senders.len() * self.config.target_transactions_per_batch,
);
let mut num_scheduled: usize = 0;
let mut num_sent: usize = 0;
let mut num_unschedulable: usize = 0;
while num_scheduled < MAX_TRANSACTIONS_PER_SCHEDULING_PASS {
while num_scheduled < self.config.max_transactions_per_scheduling_pass {
// If nothing is in the main-queue of the `PrioGraph` then there's nothing left to schedule.
if self.prio_graph.is_empty() {
break;
Expand Down Expand Up @@ -229,7 +248,8 @@ impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {
saturating_add_assign!(batches.total_cus[thread_id], cost);

// If target batch size is reached, send only this batch.
if batches.ids[thread_id].len() >= TARGET_NUM_TRANSACTIONS_PER_BATCH {
if batches.ids[thread_id].len() >= self.config.target_transactions_per_batch
{
saturating_add_assign!(
num_sent,
self.send_batch(&mut batches, thread_id)?
Expand All @@ -248,7 +268,7 @@ impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {
}
}

if num_scheduled >= MAX_TRANSACTIONS_PER_SCHEDULING_PASS {
if num_scheduled >= self.config.max_transactions_per_scheduling_pass {
break;
}
}
Expand Down Expand Up @@ -408,7 +428,8 @@ impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {
return Ok(0);
}

let (ids, transactions, max_ages, total_cus) = batches.take_batch(thread_index);
let (ids, transactions, max_ages, total_cus) =
batches.take_batch(thread_index, self.config.target_transactions_per_batch);

let batch_id = self
.in_flight_tracker
Expand Down Expand Up @@ -498,34 +519,35 @@ struct Batches<Tx> {
}

impl<Tx> Batches<Tx> {
fn new(num_threads: usize) -> Self {
fn new(num_threads: usize, target_num_transactions_per_batch: usize) -> Self {
Self {
ids: vec![Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH); num_threads],
ids: vec![Vec::with_capacity(target_num_transactions_per_batch); num_threads],

transactions: (0..num_threads)
.map(|_| Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH))
.map(|_| Vec::with_capacity(target_num_transactions_per_batch))
.collect(),
max_ages: vec![Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH); num_threads],
max_ages: vec![Vec::with_capacity(target_num_transactions_per_batch); num_threads],
total_cus: vec![0; num_threads],
}
}

fn take_batch(
&mut self,
thread_id: ThreadId,
target_num_transactions_per_batch: usize,
) -> (Vec<TransactionId>, Vec<Tx>, Vec<MaxAge>, u64) {
(
core::mem::replace(
&mut self.ids[thread_id],
Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH),
Vec::with_capacity(target_num_transactions_per_batch),
),
core::mem::replace(
&mut self.transactions[thread_id],
Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH),
Vec::with_capacity(target_num_transactions_per_batch),
),
core::mem::replace(
&mut self.max_ages[thread_id],
Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH),
Vec::with_capacity(target_num_transactions_per_batch),
),
core::mem::replace(&mut self.total_cus[thread_id], 0),
)
Expand Down Expand Up @@ -605,7 +627,6 @@ mod tests {
use {
super::*,
crate::banking_stage::{
consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH,
immutable_deserialized_packet::ImmutableDeserializedPacket,
transaction_scheduler::transaction_state_container::TransactionStateContainer,
},
Expand Down Expand Up @@ -637,8 +658,11 @@ mod tests {
let (consume_work_senders, consume_work_receivers) =
(0..num_threads).map(|_| unbounded()).unzip();
let (finished_consume_work_sender, finished_consume_work_receiver) = unbounded();
let scheduler =
PrioGraphScheduler::new(consume_work_senders, finished_consume_work_receiver);
let scheduler = PrioGraphScheduler::new(
consume_work_senders,
finished_consume_work_receiver,
PrioGraphSchedulerConfig::default(),
);
(
scheduler,
consume_work_receivers,
Expand Down Expand Up @@ -821,7 +845,7 @@ mod tests {
fn test_schedule_priority_guard() {
let (mut scheduler, work_receivers, finished_work_sender) = create_test_frame(2);
// intentionally shorten the look-ahead window to cause unschedulable conflicts
scheduler.look_ahead_window_size = 2;
scheduler.config.look_ahead_window_size = 2;

let accounts = (0..8).map(|_| Keypair::new()).collect_vec();
let mut container = create_container([
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,10 @@ mod tests {
packet_deserializer::PacketDeserializer,
scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionBatchId},
tests::create_slow_genesis_config,
transaction_scheduler::receive_and_buffer::SanitizedTransactionReceiveAndBuffer,
transaction_scheduler::{
prio_graph_scheduler::PrioGraphSchedulerConfig,
receive_and_buffer::SanitizedTransactionReceiveAndBuffer,
},
},
banking_trace::BankingPacketBatch,
sigverify::SigverifyTracerPacketStats,
Expand Down Expand Up @@ -550,11 +553,16 @@ mod tests {
false,
);

let scheduler = PrioGraphScheduler::new(
consume_work_senders,
finished_consume_work_receiver,
PrioGraphSchedulerConfig::default(),
);
let scheduler_controller = SchedulerController::new(
decision_maker,
receive_and_buffer,
bank_forks,
PrioGraphScheduler::new(consume_work_senders, finished_consume_work_receiver),
scheduler,
vec![], // no actual workers with metrics to report, this can be empty
None,
);
Expand Down
Loading