From fb465b378827fe1f04ce261863587413249a7d31 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 18 Apr 2024 13:15:25 +0900 Subject: [PATCH] Prioritize blocked task messaging over idle tasks (#627) * Prioritize blocked task messaging over idle tasks * Add test_scheduler_schedule_execution_blocked * Add commentary * Reword commentary a bit * Document not-chosen approach in detail * Use {crossbeam,chained}_channel::unbounded() * Add test for race condition around SchedulingContext * Fix race condition around SchedulingContext * Comment about finished channels * Clean up the scheduling context race test * Codify hidden assumption with extra 1 recv/session (EDIT there's actually no extra recv) --- unified-scheduler-pool/src/lib.rs | 349 +++++++++++++++++++++++++++--- 1 file changed, 320 insertions(+), 29 deletions(-) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index eed740025bcad4..36c1721836820e 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -15,7 +15,7 @@ use { assert_matches::assert_matches, - crossbeam_channel::{never, select, unbounded, Receiver, RecvError, SendError, Sender}, + crossbeam_channel::{self, never, select, Receiver, RecvError, SendError, Sender}, dashmap::DashMap, derivative::Derivative, log::*, @@ -294,30 +294,42 @@ type NewTaskPayload = SubchanneledPayload; // Overall, this greatly simplifies the code, reduces CAS/syscall overhead per messaging to the // minimum at the cost of a single channel recreation per switching. Needless to say, such an // allocation can be amortized to be negligible. +// +// Lastly, there's an auxiliary channel to realize a 2-level priority queue. See comment before +// runnable_task_sender. mod chained_channel { use super::*; // hide variants by putting this inside newtype enum ChainedChannelPrivate { Payload(P), - ContextAndChannel(C, Receiver>), + ContextAndChannels(C, Receiver>, Receiver

), } pub(super) struct ChainedChannel(ChainedChannelPrivate); impl ChainedChannel { - fn chain_to_new_channel(context: C, receiver: Receiver) -> Self { - Self(ChainedChannelPrivate::ContextAndChannel(context, receiver)) + fn chain_to_new_channel( + context: C, + receiver: Receiver, + aux_receiver: Receiver

, + ) -> Self { + Self(ChainedChannelPrivate::ContextAndChannels( + context, + receiver, + aux_receiver, + )) } } pub(super) struct ChainedChannelSender { sender: Sender>, + aux_sender: Sender

, } impl ChainedChannelSender { - fn new(sender: Sender>) -> Self { - Self { sender } + fn new(sender: Sender>, aux_sender: Sender

) -> Self { + Self { sender, aux_sender } } pub(super) fn send_payload( @@ -328,19 +340,26 @@ mod chained_channel { .send(ChainedChannel(ChainedChannelPrivate::Payload(payload))) } + pub(super) fn send_aux_payload(&self, payload: P) -> std::result::Result<(), SendError

> { + self.aux_sender.send(payload) + } + pub(super) fn send_chained_channel( &mut self, context: C, count: usize, ) -> std::result::Result<(), SendError>> { let (chained_sender, chained_receiver) = crossbeam_channel::unbounded(); + let (chained_aux_sender, chained_aux_receiver) = crossbeam_channel::unbounded(); for _ in 0..count { self.sender.send(ChainedChannel::chain_to_new_channel( context.clone(), chained_receiver.clone(), + chained_aux_receiver.clone(), ))? } self.sender = chained_sender; + self.aux_sender = chained_aux_sender; Ok(()) } } @@ -351,13 +370,19 @@ mod chained_channel { #[derivative(Clone(bound = "C: Clone"))] pub(super) struct ChainedChannelReceiver { receiver: Receiver>, + aux_receiver: Receiver

, context: C, } impl ChainedChannelReceiver { - fn new(receiver: Receiver>, initial_context: C) -> Self { + fn new( + receiver: Receiver>, + aux_receiver: Receiver

, + initial_context: C, + ) -> Self { Self { receiver, + aux_receiver, context: initial_context, } } @@ -370,12 +395,17 @@ mod chained_channel { &self.receiver } + pub(super) fn aux_for_select(&self) -> &Receiver

{ + &self.aux_receiver + } + pub(super) fn after_select(&mut self, message: ChainedChannel) -> Option

{ match message.0 { ChainedChannelPrivate::Payload(payload) => Some(payload), - ChainedChannelPrivate::ContextAndChannel(context, channel) => { + ChainedChannelPrivate::ContextAndChannels(context, channel, idle_channel) => { self.context = context; self.receiver = channel; + self.aux_receiver = idle_channel; None } } @@ -386,9 +416,10 @@ mod chained_channel { initial_context: C, ) -> (ChainedChannelSender, ChainedChannelReceiver) { let (sender, receiver) = crossbeam_channel::unbounded(); + let (aux_sender, aux_receiver) = crossbeam_channel::unbounded(); ( - ChainedChannelSender::new(sender), - ChainedChannelReceiver::new(receiver, initial_context), + ChainedChannelSender::new(sender, aux_sender), + ChainedChannelReceiver::new(receiver, aux_receiver, initial_context), ) } } @@ -415,7 +446,7 @@ impl UsageQueueLoader { // https://github.com/crossbeam-rs/crossbeam/pull/1047) fn disconnected() -> Receiver { // drop the sender residing at .0, returning an always-disconnected receiver. - unbounded().1 + crossbeam_channel::unbounded().1 } fn initialized_result_with_timings() -> ResultWithTimings { @@ -466,8 +497,8 @@ impl PooledScheduler { impl, TH: TaskHandler> ThreadManager { fn new(pool: Arc>) -> Self { - let (new_task_sender, new_task_receiver) = unbounded(); - let (session_result_sender, session_result_receiver) = unbounded(); + let (new_task_sender, new_task_receiver) = crossbeam_channel::unbounded(); + let (session_result_sender, session_result_receiver) = crossbeam_channel::unbounded(); let handler_count = pool.handler_count; Self { @@ -530,9 +561,96 @@ impl, TH: TaskHandler> ThreadManager { } fn start_threads(&mut self, context: &SchedulingContext) { + // Firstly, setup bi-directional messaging between the scheduler and handlers to pass + // around tasks, by creating 2 channels (one for to-be-handled tasks from the scheduler to + // the handlers and the other for finished tasks from the handlers to the scheduler). + // Furthermore, this pair of channels is duplicated to work as a primitive 2-level priority + // queue, totalling 4 channels. Note that the two scheduler-to-handler channels are managed + // behind chained_channel to avoid race conditions relating to contexts. + // + // This quasi-priority-queue arrangement is desired as an optimization to prioritize + // blocked tasks. + // + // As a quick background, SchedulingStateMachine doesn't throttle runnable tasks at all. + // Thus, it's likely for to-be-handled tasks to be stalled for extended duration due to + // excessive buffering (commonly known as buffer bloat). Normally, this buffering isn't + // problematic and actually intentional to fully saturate all the handler threads. + // + // However, there's one caveat: task dependencies. It can be hinted with tasks being + // blocked, that there could be more similarly-blocked tasks in the future. Empirically, + // clearing these linearized long runs of blocking tasks out of the buffer is delaying bank + // freezing while only using 1 handler thread or two near the end of slot, deteriorating + // the overall concurrency. + // + // To alleviate the situation, blocked tasks are exchanged via independent communication + // pathway as a heuristic for expedite processing. Without prioritization of these tasks, + // progression of clearing these runs would be severely hampered due to interleaved + // not-blocked tasks (called _idle_ here; typically, voting transactions) in the single + // buffer. + // + // Concurrent priority queue isn't used to avoid penalized throughput due to higher + // overhead than crossbeam channel, even considering the doubled processing of the + // crossbeam channel. Fortunately, just 2-level prioritization is enough. Also, sticking to + // crossbeam was convenient and there was no popular and promising crate for concurrent + // priority queue as of writing. + // + // It's generally harmless for the blocked task buffer to be flooded, stalling the idle + // tasks completely. Firstly, it's unlikely without malice, considering all blocked tasks + // must have independently been blocked for each isolated linearized runs. That's because + // all to-be-handled tasks of the blocked and idle buffers must not be conflicting with + // each other by definition. Furthermore, handler threads would still be saturated to + // maximum even under such a block-verification situation, meaning no remotely-controlled + // performance degradation. + // + // Overall, while this is merely a heuristic, it's effective and adaptive while not + // vulnerable, merely reusing existing information without any additional runtime cost. + // + // One known caveat, though, is that this heuristic is employed under a sub-optimal + // setting, considering scheduling is done in real-time. Namely, prioritization enforcement + // isn't immediate, in a sense that the first task of a long run is buried in the middle of + // a large idle task buffer. Prioritization of such a run will be realized only after the + // first task is handled with the priority of an idle task. To overcome this, some kind of + // re-prioritization or look-ahead scheduling mechanism would be needed. However, both + // isn't implemented. The former is due to complex implementation and the later is due to + // delayed (NOT real-time) processing, which is against the unified scheduler design goal. + // + // Finally, note that this optimization should be combined with biased select (i.e. + // `select_biased!`), which isn't for now... However, consistent performance improvement is + // observed just with this priority queuing alone. + // + // Alternatively, more faithful prioritization can be realized by checking blocking + // statuses of all addresses immediately before sending to the handlers. This would prevent + // false negatives of the heuristics approach (i.e. the last task of a run doesn't need to + // be handled with the higher priority). Note that this is the only improvement, compared + // to the heuristics. That's because this underlying information asymmetry between the 2 + // approaches doesn't exist for all other cases, assuming no look-ahead: idle tasks are + // always unblocked by definition, and other blocked tasks should always be calculated as + // blocked by the very existence of the last blocked task. + // + // The faithful approach incurs a considerable overhead: O(N), where N is the number of + // locked addresses in a task, adding to the current bare-minimum complexity of O(2*N) for + // both scheduling and descheduling. This means 1.5x increase. Furthermore, this doesn't + // nicely work in practice with a real-time streamed scheduler. That's because these + // linearized runs could be intermittent in the view with little or no look-back, albeit + // actually forming a far more longer runs in longer time span. These access patterns are + // very common, considering existence of well-known hot accounts. + // + // Thus, intentionally allowing these false-positives by the heuristic approach is actually + // helping to extend the logical prioritization session for the invisible longer runs, as + // long as the last task of the current run is being handled by the handlers, hoping yet + // another blocking new task is arriving to finalize the tentatively extended + // prioritization further. Consequently, this also contributes to alleviate the known + // heuristic's caveat for the first task of linearized runs, which is described above. let (mut runnable_task_sender, runnable_task_receiver) = chained_channel::unbounded::(context.clone()); - let (finished_task_sender, finished_task_receiver) = unbounded::>(); + // Create two handler-to-scheduler channels to prioritize the finishing of blocked tasks, + // because it is more likely that a blocked task will have more blocked tasks behind it, + // which should be scheduled while minimizing the delay to clear buffered linearized runs + // as fast as possible. + let (finished_blocked_task_sender, finished_blocked_task_receiver) = + crossbeam_channel::unbounded::>(); + let (finished_idle_task_sender, finished_idle_task_receiver) = + crossbeam_channel::unbounded::>(); let mut result_with_timings = self.session_result_with_timings.take(); @@ -603,6 +721,19 @@ impl, TH: TaskHandler> ThreadManager { }; loop { + if let Ok(NewTaskPayload::OpenSubchannel(context)) = new_task_receiver.recv() { + // signal about new SchedulingContext to handler threads + runnable_task_sender + .send_chained_channel(context, handler_count) + .unwrap(); + assert_matches!( + result_with_timings.replace(initialized_result_with_timings()), + None + ); + } else { + unreachable!(); + } + let mut is_finished = false; while !is_finished { // ALL recv selectors are eager-evaluated ALWAYS by current crossbeam impl, @@ -626,7 +757,7 @@ impl, TH: TaskHandler> ThreadManager { // into busy looping to seek lowest latency eventually. However, not now, // to measure _actual_ cpu usage easily with the select approach. select! { - recv(finished_task_receiver) -> executed_task => { + recv(finished_blocked_task_receiver) -> executed_task => { let executed_task = executed_task.unwrap(); state_machine.deschedule_task(&executed_task.task); @@ -647,24 +778,24 @@ impl, TH: TaskHandler> ThreadManager { match message.unwrap() { NewTaskPayload::Payload(task) => { if let Some(task) = state_machine.schedule_task(task) { - runnable_task_sender.send_payload(task).unwrap(); + runnable_task_sender.send_aux_payload(task).unwrap(); } } - NewTaskPayload::OpenSubchannel(context) => { - // signal about new SchedulingContext to handler threads - runnable_task_sender - .send_chained_channel(context, handler_count) - .unwrap(); - assert_matches!( - result_with_timings.replace(initialized_result_with_timings()), - None - ); - } NewTaskPayload::CloseSubchannel => { session_ending = true; } + NewTaskPayload::OpenSubchannel(_context) => { + unreachable!(); + } } }, + recv(finished_idle_task_receiver) -> executed_task => { + let executed_task = executed_task.unwrap(); + + state_machine.deschedule_task(&executed_task.task); + let result_with_timings = result_with_timings.as_mut().unwrap(); + Self::accumulate_result_with_timings(result_with_timings, executed_task); + }, }; is_finished = session_ending && state_machine.has_no_active_task(); @@ -688,13 +819,21 @@ impl, TH: TaskHandler> ThreadManager { let handler_main_loop = || { let pool = self.pool.clone(); let mut runnable_task_receiver = runnable_task_receiver.clone(); - let finished_task_sender = finished_task_sender.clone(); + let finished_blocked_task_sender = finished_blocked_task_sender.clone(); + let finished_idle_task_sender = finished_idle_task_sender.clone(); move || loop { let (task, sender) = select! { recv(runnable_task_receiver.for_select()) -> message => { if let Some(task) = runnable_task_receiver.after_select(message.unwrap()) { - (task, &finished_task_sender) + (task, &finished_blocked_task_sender) + } else { + continue; + } + }, + recv(runnable_task_receiver.aux_for_select()) -> task => { + if let Ok(task) = task { + (task, &finished_idle_task_sender) } else { continue; } @@ -851,7 +990,7 @@ mod tests { prioritization_fee_cache::PrioritizationFeeCache, }, solana_sdk::{ - clock::MAX_PROCESSING_AGE, + clock::{Slot, MAX_PROCESSING_AGE}, pubkey::Pubkey, signer::keypair::Keypair, system_transaction, @@ -1111,6 +1250,158 @@ mod tests { ); } + #[test] + fn test_scheduler_schedule_execution_blocked() { + solana_logger::setup(); + + const STALLED_TRANSACTION_INDEX: usize = 0; + const BLOCKED_TRANSACTION_INDEX: usize = 1; + static LOCK_TO_STALL: Mutex<()> = Mutex::new(()); + + #[derive(Debug)] + struct StallingHandler; + impl TaskHandler for StallingHandler { + fn handle( + result: &mut Result<()>, + timings: &mut ExecuteTimings, + bank: &Arc, + transaction: &SanitizedTransaction, + index: usize, + handler_context: &HandlerContext, + ) { + match index { + STALLED_TRANSACTION_INDEX => *LOCK_TO_STALL.lock().unwrap(), + BLOCKED_TRANSACTION_INDEX => {} + _ => unreachable!(), + }; + DefaultTaskHandler::handle( + result, + timings, + bank, + transaction, + index, + handler_context, + ); + } + } + + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config(10_000); + + // tx0 and tx1 is definitely conflicting to write-lock the mint address + let tx0 = &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + &mint_keypair, + &solana_sdk::pubkey::new_rand(), + 2, + genesis_config.hash(), + )); + let tx1 = &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + &mint_keypair, + &solana_sdk::pubkey::new_rand(), + 2, + genesis_config.hash(), + )); + + let bank = Bank::new_for_tests(&genesis_config); + let bank = setup_dummy_fork_graph(bank); + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let pool = SchedulerPool::, _>::new_dyn( + None, + None, + None, + None, + ignored_prioritization_fee_cache, + ); + let context = SchedulingContext::new(bank.clone()); + + assert_eq!(bank.transaction_count(), 0); + let scheduler = pool.take_scheduler(context); + + // Stall handling tx0 and tx1 + let lock_to_stall = LOCK_TO_STALL.lock().unwrap(); + scheduler.schedule_execution(&(tx0, STALLED_TRANSACTION_INDEX)); + scheduler.schedule_execution(&(tx1, BLOCKED_TRANSACTION_INDEX)); + + // Wait a bit for the scheduler thread to decide to block tx1 + std::thread::sleep(std::time::Duration::from_secs(1)); + + // Resume handling by unlocking LOCK_TO_STALL + drop(lock_to_stall); + let bank = BankWithScheduler::new(bank, Some(scheduler)); + assert_matches!(bank.wait_for_completed_scheduler(), Some((Ok(()), _))); + assert_eq!(bank.transaction_count(), 2); + } + + #[test] + fn test_scheduler_mismatched_scheduling_context_race() { + solana_logger::setup(); + + #[derive(Debug)] + struct TaskAndContextChecker; + impl TaskHandler for TaskAndContextChecker { + fn handle( + _result: &mut Result<()>, + _timings: &mut ExecuteTimings, + bank: &Arc, + _transaction: &SanitizedTransaction, + index: usize, + _handler_context: &HandlerContext, + ) { + // The task index must always be matched to the slot. + assert_eq!(index as Slot, bank.slot()); + } + } + + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config(10_000); + + // Create two banks for two contexts + let bank0 = Bank::new_for_tests(&genesis_config); + let bank0 = setup_dummy_fork_graph(bank0); + let bank1 = Arc::new(Bank::new_from_parent( + bank0.clone(), + &Pubkey::default(), + bank0.slot().checked_add(1).unwrap(), + )); + + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let pool = SchedulerPool::, _>::new( + Some(4), // spawn 4 threads + None, + None, + None, + ignored_prioritization_fee_cache, + ); + + // Create a dummy tx and two contexts + let dummy_tx = + &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + &mint_keypair, + &solana_sdk::pubkey::new_rand(), + 2, + genesis_config.hash(), + )); + let context0 = &SchedulingContext::new(bank0.clone()); + let context1 = &SchedulingContext::new(bank1.clone()); + + // Exercise the scheduler by busy-looping to expose the race condition + for (context, index) in [(context0, 0), (context1, 1)] + .into_iter() + .cycle() + .take(10000) + { + let scheduler = pool.take_scheduler(context.clone()); + scheduler.schedule_execution(&(dummy_tx, index)); + scheduler.wait_for_termination(false).1.return_to_pool(); + } + } + #[derive(Debug)] struct AsyncScheduler( Mutex,