diff --git a/core/src/transaction_scheduler.rs b/core/src/transaction_scheduler.rs index 9d78fd6e107336..c99c4acec6ef53 100644 --- a/core/src/transaction_scheduler.rs +++ b/core/src/transaction_scheduler.rs @@ -3,6 +3,8 @@ //! - TPU vote transactions //! - Gossip vote transactions +use min_max_heap::MinMaxHeap; +use std::rc::Rc; use { crate::{ banking_stage::BatchedTransactionDetails, @@ -366,7 +368,7 @@ impl TransactionScheduler { /// Attempts to schedule a transaction to be executed. fn try_schedule( - deserialized_packet: &DeserializedPacket, + deserialized_packet: &Rc, bank: &Arc, highest_wl_blocked_account_fees: &mut HashMap, highest_rl_blocked_account_fees: &mut HashMap, @@ -375,14 +377,14 @@ impl TransactionScheduler { config: &TransactionSchedulerConfig, ) -> Result { let sanitized_tx = Self::transaction_from_deserialized_packet( - deserialized_packet.immutable_section(), + deserialized_packet, &bank.feature_set, bank.vote_only_bank(), bank.as_ref(), ) .ok_or_else(|| SchedulerError::InvalidSanitizedTransaction)?; - let priority = deserialized_packet.immutable_section().priority(); + let priority = deserialized_packet.priority(); let account_locks = sanitized_tx .get_account_locks(&bank.feature_set) @@ -665,60 +667,68 @@ impl TransactionScheduler { bank: &Arc, qos_service: &QosService, config: &TransactionSchedulerConfig, - ) -> (Vec, Vec) { + ) -> Vec { let mut sanitized_transactions = Vec::with_capacity(num_txs); - let mut rescheduled_packets = Vec::with_capacity(1_000); // hashmap representing the highest fee of currently write-locked and read-locked blocked accounts // almost a pseudo AccountLocks but fees instead of hashset/read lock count let mut highest_wl_blocked_account_fees = HashMap::with_capacity(10_000); let mut highest_rl_blocked_account_fees = HashMap::with_capacity(10_000); - let mut lookback_count = 0; + let mut retryable_packets = MinMaxHeap::with_capacity(unprocessed_packets.capacity()); + std::mem::swap( + &mut unprocessed_packets.packet_priority_queue, + &mut retryable_packets, + ); - while let Some(deserialized_packet) = unprocessed_packets.pop_max() { - match Self::try_schedule( - &deserialized_packet, - bank, - &mut highest_wl_blocked_account_fees, - &mut highest_rl_blocked_account_fees, - scheduled_accounts, - qos_service, - config, - ) { - Ok(sanitized_tx) => { - sanitized_transactions.push(sanitized_tx); - if sanitized_transactions.len() >= num_txs { - break; - } + let mut retryable_packets: MinMaxHeap<_> = retryable_packets + .drain_desc() + .into_iter() + .enumerate() + .filter_map(|(idx, deserialized_packet)| { + // drop the rest + if sanitized_transactions.len() >= num_txs { + return Some(deserialized_packet); } - Err(e) => { - trace!("e: {:?}", e); - match e { - SchedulerError::InvalidSanitizedTransaction - | SchedulerError::InvalidTransactionFormat(_) - | SchedulerError::TransactionCheckFailed(_) => { - // non-recoverable error, drop the packet - continue; - } - SchedulerError::AccountInUse | SchedulerError::AccountBlocked(_) => { - // need to reschedule - // error!("e: {}", e); - rescheduled_packets.push(deserialized_packet); - } - SchedulerError::QosExceeded => { - // TODO: probably want to move this packet to a separate queue + match Self::try_schedule( + &deserialized_packet, + bank, + &mut highest_wl_blocked_account_fees, + &mut highest_rl_blocked_account_fees, + scheduled_accounts, + qos_service, + config, + ) { + Ok(sanitized_tx) => { + sanitized_transactions.push(sanitized_tx); + return None; + } + Err(e) => { + trace!("e: {:?}", e); + match e { + SchedulerError::InvalidSanitizedTransaction + | SchedulerError::InvalidTransactionFormat(_) + | SchedulerError::TransactionCheckFailed(_) => { + // non-recoverable error, drop the packet + return None; + } + SchedulerError::AccountInUse + | SchedulerError::AccountBlocked(_) + | SchedulerError::QosExceeded => { + return Some(deserialized_packet); + } } } } - } - lookback_count += 1; - if lookback_count >= num_txs * 20 { - break; - } - } + }) + .collect(); + + std::mem::swap( + &mut retryable_packets, + &mut unprocessed_packets.packet_priority_queue, + ); - (sanitized_transactions, rescheduled_packets) + sanitized_transactions } /// Handles scheduler requests and sends back a response over the channel @@ -733,7 +743,7 @@ impl TransactionScheduler { match scheduler_request.msg { SchedulerMessage::RequestBatch { num_txs, bank } => { trace!("SchedulerMessage::RequestBatch num_txs: {}", num_txs); - let (sanitized_transactions, rescheduled_packets) = Self::get_scheduled_batch( + let sanitized_transactions = Self::get_scheduled_batch( unprocessed_packets, scheduled_accounts, num_txs, @@ -742,9 +752,8 @@ impl TransactionScheduler { config, ); trace!( - "sanitized_transactions num: {}, rescheduled_packets num: {}, unprocessed_packets num: {}", + "sanitized_transactions num: {}, unprocessed_packets num: {}", sanitized_transactions.len(), - rescheduled_packets.len(), unprocessed_packets.len() ); @@ -753,11 +762,6 @@ impl TransactionScheduler { sanitized_transactions, })) .unwrap(); - - // push rescheduled back on after sending so execution can start - for tx in rescheduled_packets { - unprocessed_packets.push(tx.clone()); - } } SchedulerMessage::Ping { id } => { let _ = response_sender diff --git a/transaction-scheduler-bench/src/main.rs b/transaction-scheduler-bench/src/main.rs index 11e2cab358860a..825d7ff0799eef 100644 --- a/transaction-scheduler-bench/src/main.rs +++ b/transaction-scheduler-bench/src/main.rs @@ -41,7 +41,7 @@ struct Args { packet_send_rate: usize, /// Number of packets per batch - #[clap(long, env, default_value_t = 64)] + #[clap(long, env, default_value_t = 128)] packets_per_batch: usize, /// Number of batches per message @@ -49,11 +49,11 @@ struct Args { batches_per_msg: usize, /// Number of consuming threads (number of threads requesting batches from scheduler) - #[clap(long, env, default_value_t = 20)] + #[clap(long, env, default_value_t = 4)] num_execution_threads: usize, /// How long each transaction takes to execution in microseconds - #[clap(long, env, default_value_t = 20)] + #[clap(long, env, default_value_t = 15)] execution_per_tx_us: u64, /// Duration of benchmark @@ -61,15 +61,15 @@ struct Args { duration: f32, /// Number of accounts to choose from when signing transactions - #[clap(long, env, default_value_t = 1000)] + #[clap(long, env, default_value_t = 100000)] num_accounts: usize, /// Number of read locks per tx - #[clap(long, env, default_value_t = 5)] + #[clap(long, env, default_value_t = 4)] num_read_locks_per_tx: usize, /// Number of write locks per tx - #[clap(long, env, default_value_t = 5)] + #[clap(long, env, default_value_t = 2)] num_read_write_locks_per_tx: usize, /// If true, enables state auction on accounts @@ -261,7 +261,7 @@ fn start_execution_threads( break; } - let batch = scheduler_handle.request_batch(64, &bank).unwrap(); + let batch = scheduler_handle.request_batch(128, &bank).unwrap(); let num_txs = batch.sanitized_transactions.len(); let sleep_duration = if num_txs > 0 { @@ -348,7 +348,7 @@ fn main() { let mut last_log_time = Instant::now(); for msg in stats_receiver { - // info!("msg: {}", msg); + info!("msg: {}", msg); count += msg; if last_log_time.elapsed() > Duration::from_secs(1) {