Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Scheduler: throttle queued CUs #34890

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,25 @@ impl PrioGraphScheduler {
) -> Result<SchedulingSummary, SchedulerError> {
let num_threads = self.consume_work_senders.len();
let mut batches = Batches::new(num_threads);

const MAX_OUTSTANDING_CUS_PER_THREAD: u64 = 12_000_000;
let mut schedulable_threads = ThreadSet::any(num_threads);
for thread_id in 0..num_threads {
if self.in_flight_tracker.cus_in_flight_per_thread()[thread_id]
>= MAX_OUTSTANDING_CUS_PER_THREAD
{
schedulable_threads.remove(thread_id);
}
}
if schedulable_threads.is_empty() {
return Ok(SchedulingSummary {
num_scheduled: 0,
num_unschedulable: 0,
num_filtered_out: 0,
filter_time_us: 0,
});
}

// Some transactions may be unschedulable due to multi-thread conflicts.
// These transactions cannot be scheduled until some conflicting work is completed.
// However, the scheduler should not allow other transactions that conflict with
Expand Down Expand Up @@ -140,8 +159,11 @@ impl PrioGraphScheduler {
let mut num_sent: usize = 0;
let mut num_unschedulable: usize = 0;
while num_scheduled < MAX_TRANSACTIONS_PER_SCHEDULING_PASS {
// If nothing is in the main-queue of the `PrioGraph` then there's nothing left to schedule.
if prio_graph.is_empty() {
// If nothing is in the main-queue of the `PrioGraph` then there's
// nothing left to schedule.
// Or if there are no threads that can be scheduled to, due to
// queueing limits, then return early
if prio_graph.is_empty() || schedulable_threads.is_empty() {
break;
}

Expand Down Expand Up @@ -177,8 +199,9 @@ impl PrioGraphScheduler {
|thread_set| {
Self::select_thread(
thread_set,
&batches.transactions,
&batches,
self.in_flight_tracker.num_in_flight_per_thread(),
self.in_flight_tracker.cus_in_flight_per_thread(),
)
},
) else {
Expand All @@ -191,9 +214,7 @@ impl PrioGraphScheduler {
saturating_add_assign!(num_scheduled, 1);

let sanitized_transaction_ttl = transaction_state.transition_to_pending();
let cu_limit = transaction_state
.transaction_priority_details()
.compute_unit_limit;
let cost = transaction_state.transaction_cost().sum();

let SanitizedTransactionTTL {
transaction,
Expand All @@ -203,7 +224,18 @@ impl PrioGraphScheduler {
batches.transactions[thread_id].push(transaction);
batches.ids[thread_id].push(id.id);
batches.max_age_slots[thread_id].push(max_age_slot);
saturating_add_assign!(batches.total_cus[thread_id], cu_limit);
saturating_add_assign!(batches.total_cus[thread_id], cost);

// If thread has too many cus in flight, remove from schedulable threads, and
// send the batch immediately.
if batches.total_cus[thread_id] >= MAX_OUTSTANDING_CUS_PER_THREAD {
schedulable_threads.remove(thread_id);
saturating_add_assign!(num_sent, self.send_batch(&mut batches, thread_id)?);

if schedulable_threads.is_empty() {
break;
}
}

// If target batch size is reached, send only this batch.
if batches.ids[thread_id].len() >= TARGET_NUM_TRANSACTIONS_PER_BATCH {
Expand Down Expand Up @@ -385,27 +417,27 @@ impl PrioGraphScheduler {

/// Given the schedulable `thread_set`, select the thread with the least amount
/// of work queued up.
/// Currently, "work" is just defined as the number of transactions.
///
/// If the `chain_thread` is available, this thread will be selected, regardless of
/// load-balancing.
/// Amount of work is determined by the TransactionCost in CUs, ties are broken by
/// the number of transactions.
///
/// Panics if the `thread_set` is empty.
fn select_thread(
thread_set: ThreadSet,
batches_per_thread: &[Vec<SanitizedTransaction>],
in_flight_per_thread: &[usize],
batches: &Batches,
count_in_flight_per_thread: &[usize],
cus_in_flight_per_thread: &[u64],
) -> ThreadId {
thread_set
.contained_threads_iter()
.map(|thread_id| {
(
thread_id,
batches_per_thread[thread_id].len() + in_flight_per_thread[thread_id],
batches.transactions[thread_id].len() + count_in_flight_per_thread[thread_id],
batches.total_cus[thread_id] + cus_in_flight_per_thread[thread_id],
)
})
.min_by(|a, b| a.1.cmp(&b.1))
.map(|(thread_id, _)| thread_id)
.min_by(|a, b| a.2.cmp(&b.2).then_with(|| a.1.cmp(&b.1)))
.map(|(thread_id, _, _)| thread_id)
.unwrap()
}

Expand Down Expand Up @@ -492,10 +524,12 @@ mod tests {
crate::banking_stage::consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH,
crossbeam_channel::{unbounded, Receiver},
itertools::Itertools,
solana_cost_model::cost_model::CostModel,
solana_runtime::transaction_priority_details::TransactionPriorityDetails,
solana_sdk::{
compute_budget::ComputeBudgetInstruction, hash::Hash, message::Message, pubkey::Pubkey,
signature::Keypair, signer::Signer, system_instruction, transaction::Transaction,
compute_budget::ComputeBudgetInstruction, feature_set::FeatureSet, hash::Hash,
message::Message, pubkey::Pubkey, signature::Keypair, signer::Signer,
system_instruction, transaction::Transaction,
},
std::borrow::Borrow,
};
Expand Down Expand Up @@ -568,6 +602,7 @@ mod tests {
let id = TransactionId::new(index as u64);
let transaction =
prioritized_tranfers(from_keypair.borrow(), to_pubkeys, lamports, priority);
let transaction_cost = CostModel::calculate_cost(&transaction, &FeatureSet::default());
let transaction_ttl = SanitizedTransactionTTL {
transaction,
max_age_slot: Slot::MAX,
Expand All @@ -579,6 +614,7 @@ mod tests {
priority,
compute_unit_limit: 1,
},
transaction_cost,
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use {
},
crossbeam_channel::RecvTimeoutError,
solana_accounts_db::transaction_error_metrics::TransactionErrorMetrics,
solana_cost_model::cost_model::CostModel,
solana_measure::measure_us,
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{
Expand Down Expand Up @@ -342,6 +343,8 @@ impl SchedulerController {
{
saturating_add_assign!(post_transaction_check_count, 1);
let transaction_id = self.transaction_id_generator.next();

let transaction_cost = CostModel::calculate_cost(&transaction, &bank.feature_set);
let transaction_ttl = SanitizedTransactionTTL {
transaction,
max_age_slot: last_slot_in_epoch,
Expand All @@ -351,6 +354,7 @@ impl SchedulerController {
transaction_id,
transaction_ttl,
priority_details,
transaction_cost,
) {
saturating_add_assign!(self.count_metrics.num_dropped_on_capacity, 1);
}
Expand Down
30 changes: 30 additions & 0 deletions core/src/banking_stage/transaction_scheduler/transaction_state.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use {
solana_cost_model::transaction_cost::TransactionCost,
solana_runtime::transaction_priority_details::TransactionPriorityDetails,
solana_sdk::{slot_history::Slot, transaction::SanitizedTransaction},
};
Expand Down Expand Up @@ -34,11 +35,13 @@ pub(crate) enum TransactionState {
Unprocessed {
transaction_ttl: SanitizedTransactionTTL,
transaction_priority_details: TransactionPriorityDetails,
transaction_cost: TransactionCost,
forwarded: bool,
},
/// The transaction is currently scheduled or being processed.
Pending {
transaction_priority_details: TransactionPriorityDetails,
transaction_cost: TransactionCost,
forwarded: bool,
},
}
Expand All @@ -48,10 +51,12 @@ impl TransactionState {
pub(crate) fn new(
transaction_ttl: SanitizedTransactionTTL,
transaction_priority_details: TransactionPriorityDetails,
transaction_cost: TransactionCost,
) -> Self {
Self::Unprocessed {
transaction_ttl,
transaction_priority_details,
transaction_cost,
forwarded: false,
}
}
Expand All @@ -70,6 +75,18 @@ impl TransactionState {
}
}

/// Returns a reference to the transaction cost of the transaction.
pub(crate) fn transaction_cost(&self) -> &TransactionCost {
match self {
Self::Unprocessed {
transaction_cost, ..
} => transaction_cost,
Self::Pending {
transaction_cost, ..
} => transaction_cost,
}
}

/// Returns the priority of the transaction.
pub(crate) fn priority(&self) -> u64 {
self.transaction_priority_details().priority
Expand Down Expand Up @@ -103,10 +120,12 @@ impl TransactionState {
TransactionState::Unprocessed {
transaction_ttl,
transaction_priority_details,
transaction_cost,
forwarded,
} => {
*self = TransactionState::Pending {
transaction_priority_details,
transaction_cost,
forwarded,
};
transaction_ttl
Expand All @@ -128,11 +147,13 @@ impl TransactionState {
TransactionState::Unprocessed { .. } => panic!("already unprocessed"),
TransactionState::Pending {
transaction_priority_details,
transaction_cost,
forwarded,
} => {
*self = Self::Unprocessed {
transaction_ttl,
transaction_priority_details,
transaction_cost,
forwarded,
}
}
Expand Down Expand Up @@ -162,6 +183,9 @@ impl TransactionState {
priority: 0,
compute_unit_limit: 0,
},
transaction_cost: TransactionCost::SimpleVote {
writable_accounts: vec![],
},
forwarded: false,
},
)
Expand All @@ -172,6 +196,7 @@ impl TransactionState {
mod tests {
use {
super::*,
solana_cost_model::transaction_cost::UsageCostDetails,
solana_sdk::{
compute_budget::ComputeBudgetInstruction, hash::Hash, message::Message,
signature::Keypair, signer::Signer, system_instruction, transaction::Transaction,
Expand All @@ -190,6 +215,10 @@ mod tests {
];
let message = Message::new(&ixs, Some(&from_keypair.pubkey()));
let tx = Transaction::new(&[&from_keypair], message, Hash::default());
let transaction_cost = TransactionCost::Transaction(UsageCostDetails {
signature_cost: 5000,
..UsageCostDetails::default()
});

let transaction_ttl = SanitizedTransactionTTL {
transaction: SanitizedTransaction::from_transaction_for_tests(tx),
Expand All @@ -202,6 +231,7 @@ mod tests {
priority,
compute_unit_limit: 0,
},
transaction_cost,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use {
},
crate::banking_stage::scheduler_messages::TransactionId,
min_max_heap::MinMaxHeap,
solana_cost_model::transaction_cost::TransactionCost,
solana_runtime::transaction_priority_details::TransactionPriorityDetails,
std::collections::HashMap,
};
Expand Down Expand Up @@ -125,12 +126,17 @@ impl TransactionStateContainer {
transaction_id: TransactionId,
transaction_ttl: SanitizedTransactionTTL,
transaction_priority_details: TransactionPriorityDetails,
transaction_cost: TransactionCost,
) -> bool {
let priority_id =
TransactionPriorityId::new(transaction_priority_details.priority, transaction_id);
self.id_to_transaction_state.insert(
transaction_id,
TransactionState::new(transaction_ttl, transaction_priority_details),
TransactionState::new(
transaction_ttl,
transaction_priority_details,
transaction_cost,
),
);
self.push_id_into_queue(priority_id)
}
Expand Down Expand Up @@ -176,8 +182,10 @@ impl TransactionStateContainer {
mod tests {
use {
super::*,
solana_cost_model::cost_model::CostModel,
solana_sdk::{
compute_budget::ComputeBudgetInstruction,
feature_set::FeatureSet,
hash::Hash,
message::Message,
signature::Keypair,
Expand All @@ -188,7 +196,13 @@ mod tests {
},
};

fn test_transaction(priority: u64) -> (SanitizedTransactionTTL, TransactionPriorityDetails) {
fn test_transaction(
priority: u64,
) -> (
SanitizedTransactionTTL,
TransactionPriorityDetails,
TransactionCost,
) {
let from_keypair = Keypair::new();
let ixs = vec![
system_instruction::transfer(
Expand All @@ -199,10 +213,14 @@ mod tests {
ComputeBudgetInstruction::set_compute_unit_price(priority),
];
let message = Message::new(&ixs, Some(&from_keypair.pubkey()));
let tx = Transaction::new(&[&from_keypair], message, Hash::default());

let tx = SanitizedTransaction::from_transaction_for_tests(Transaction::new(
&[&from_keypair],
message,
Hash::default(),
));
let transaction_cost = CostModel::calculate_cost(&tx, &FeatureSet::default());
let transaction_ttl = SanitizedTransactionTTL {
transaction: SanitizedTransaction::from_transaction_for_tests(tx),
transaction: tx,
max_age_slot: Slot::MAX,
};
(
Expand All @@ -211,17 +229,20 @@ mod tests {
priority,
compute_unit_limit: 0,
},
transaction_cost,
)
}

fn push_to_container(container: &mut TransactionStateContainer, num: usize) {
for id in 0..num as u64 {
let priority = id;
let (transaction_ttl, transaction_priority_details) = test_transaction(priority);
let (transaction_ttl, transaction_priority_details, transaction_cost) =
test_transaction(priority);
container.insert_new_transaction(
TransactionId::new(id),
transaction_ttl,
transaction_priority_details,
transaction_cost,
);
}
}
Expand Down