From 03a0bd1a1a5d3007bd29f701fe43f37cf125653b Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 29 Feb 2024 16:10:52 +0900 Subject: [PATCH] Big rename: Page => UsageQueue --- unified-scheduler-logic/src/lib.rs | 349 ++++++++++++++++------------- unified-scheduler-pool/src/lib.rs | 18 +- 2 files changed, 201 insertions(+), 166 deletions(-) diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index 0e8f63c599b924..a7a0b872d94e41 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -63,28 +63,28 @@ //! Its algorithm is very fast for high throughput, real-time for low latency. The whole //! unified-scheduler architecture is designed from grounds up to support the fastest execution of //! this scheduling code. For that end, unified scheduler pre-loads address-specific locking state -//! data structures (called [`Page`]) for all of transaction's accounts, in order to offload the -//! job to other threads from the scheduler thread. This preloading is done inside +//! data structures (called [`UsageQueue`]) for all of transaction's accounts, in order to offload +//! the job to other threads from the scheduler thread. This preloading is done inside //! [`create_task()`](SchedulingStateMachine::create_task). In this way, task scheduling //! computational complexity is basically reduced to several word-sized loads and stores in the //! schduler thread (i.e. constant; no allocations nor syscalls), while being proportional to the //! number of addresses in a given transaction. Note that this statement is held true, regardless //! of conflicts. This is because the preloading also pre-allocates some scratch-pad area -//! ([`blocked_tasks`](PageInner::blocked_tasks)) to stash blocked ones. So, a conflict only incurs -//! some additional fixed number of mem stores, within error margin of the constant complexity. And -//! additional memory allocation for the scratchpad could said to be amortized, if such an unusual -//! event should occur. +//! ([`blocked_usages_from_tasks`](UsageQueueInner::blocked_usages_from_tasks)) to stash blocked +//! ones. So, a conflict only incurs some additional fixed number of mem stores, within error +//! margin of the constant complexity. And additional memory allocation for the scratchpad could +//! said to be amortized, if such an unusual event should occur. //! -//! [`Arc`] is used to implement this preloading mechanism, because `Page`s are shared across tasks -//! accessing the same account, and among threads due to the preloading. Also, interior mutability -//! is needed. However, `SchedulingStateMachine` doesn't use conventional locks like RwLock. -//! Leveraging the fact it's the only state-mutating exclusive thread, it instead uses +//! [`Arc`] is used to implement this preloading mechanism, because `UsageQueue`s are shared across +//! tasks accessing the same account, and among threads due to the preloading. Also, interior +//! mutability is needed. However, `SchedulingStateMachine` doesn't use conventional locks like +//! RwLock. Leveraging the fact it's the only state-mutating exclusive thread, it instead uses //! `UnsafeCell`, which is sugar-coated by a tailored wrapper called [`TokenCell`]. `TokenCell` //! imposes an overly restrictive aliasing rule via rust type system to maintain the memory safety. //! By localizing any synchronization to the message passing, the scheduling code itself attains //! maximally possible single-threaed execution without stalling cpu pipelines at all, only //! constrained to mem access latency, while efficiently utilizing L1-L3 cpu cache with full of -//! `Page`s. +//! `UsageQueue`s. //! //! ### Buffer bloat insignificance //! @@ -293,21 +293,22 @@ mod utils { } } -/// [`Result`] for locking a [page](Page) with particular [usage](RequestedUsage). -type LockResult = Result; +/// [`Result`] for locking a [usage_queue](UsageQueue) with particular +/// [current_usage](RequestedUsage). +type LockResult = Result; const_assert_eq!(mem::size_of::(), 8); /// Something to be scheduled; usually a wrapper of [`SanitizedTransaction`]. pub type Task = Arc; const_assert_eq!(mem::size_of::(), 8); -/// [`Token`] for [`Page`]. -type PageToken = Token; -const_assert_eq!(mem::size_of::(), 0); +/// [`Token`] for [`UsageQueue`]. +type UsageQueueToken = Token; +const_assert_eq!(mem::size_of::(), 0); -/// [`Token`] for [task](Task)'s [internal mutable data](`TaskInner::blocked_page_count`). -type BlockedPageCountToken = Token; -const_assert_eq!(mem::size_of::(), 0); +/// [`Token`] for [task](Task)'s [internal mutable data](`TaskInner::blocked_usage_count`). +type BlockedUsageCountToken = Token; +const_assert_eq!(mem::size_of::(), 0); /// Internal scheduling data about a particular task. #[derive(Debug)] @@ -315,7 +316,7 @@ pub struct TaskInner { transaction: SanitizedTransaction, index: usize, lock_attempts: Vec, - blocked_page_count: TokenCell, + blocked_usage_count: TokenCell, } impl TaskInner { @@ -331,70 +332,73 @@ impl TaskInner { &self.lock_attempts } - fn blocked_page_count_mut<'t>( + fn blocked_usage_count_mut<'t>( &self, - token: &'t mut BlockedPageCountToken, + token: &'t mut BlockedUsageCountToken, ) -> &'t mut ShortCounter { - self.blocked_page_count.borrow_mut(token) + self.blocked_usage_count.borrow_mut(token) } - fn set_blocked_page_count(&self, token: &mut BlockedPageCountToken, count: ShortCounter) { - *self.blocked_page_count_mut(token) = count; + fn set_blocked_usage_count(&self, token: &mut BlockedUsageCountToken, count: ShortCounter) { + *self.blocked_usage_count_mut(token) = count; } #[must_use] - fn try_unblock(self: Task, token: &mut BlockedPageCountToken) -> Option { - self.blocked_page_count_mut(token) + fn try_unblock(self: Task, token: &mut BlockedUsageCountToken) -> Option { + self.blocked_usage_count_mut(token) .decrement_self() .is_zero() .then_some(self) } } -/// [`Task`]'s per-address attempt to use a [page](Page) with [certain kind of +/// [`Task`]'s per-address attempt to use a [usage_queue](UsageQueue) with [certain kind of /// request](RequestedUsage). #[derive(Debug)] struct LockAttempt { - page: Page, + usage_queue: UsageQueue, requested_usage: RequestedUsage, } const_assert_eq!(mem::size_of::(), 16); impl LockAttempt { - fn new(page: Page, requested_usage: RequestedUsage) -> Self { + fn new(usage_queue: UsageQueue, requested_usage: RequestedUsage) -> Self { Self { - page, + usage_queue, requested_usage, } } - fn page_mut<'t>(&self, page_token: &'t mut PageToken) -> &'t mut PageInner { - self.page.0.borrow_mut(page_token) + fn usage_queue_mut<'t>( + &self, + usage_queue_token: &'t mut UsageQueueToken, + ) -> &'t mut UsageQueueInner { + self.usage_queue.0.borrow_mut(usage_queue_token) } } -/// Status about how the [`Page`] is used currently. Unlike [`RequestedUsage`], it has additional -/// variant of [`Unused`](`PageUsage::Unused`). +/// Status about how the [`UsageQueue`] is used currently. Unlike [`RequestedUsage`], it has +/// additional variant of [`Unused`](`Usage::Unused`). #[derive(Copy, Clone, Debug, Default)] -enum PageUsage { +enum Usage { #[default] Unused, Readonly(ShortCounter), Writable, } -const_assert_eq!(mem::size_of::(), 8); +const_assert_eq!(mem::size_of::(), 8); -impl From for PageUsage { +impl From for Usage { fn from(requested_usage: RequestedUsage) -> Self { match requested_usage { - RequestedUsage::Readonly => PageUsage::Readonly(ShortCounter::one()), - RequestedUsage::Writable => PageUsage::Writable, + RequestedUsage::Readonly => Usage::Readonly(ShortCounter::one()), + RequestedUsage::Writable => Usage::Writable, } } } -/// Status about how a task is requesting to use a particular [`Page`]. Unlike [`PageUsage`], -/// it has only two unit variants. +/// Status about how a task is requesting to use a particular [`UsageQueue`]. Unlike [`Usage`], it +/// has only two unit variants. #[derive(Clone, Copy, Debug)] enum RequestedUsage { Readonly, @@ -403,19 +407,19 @@ enum RequestedUsage { /// Internal scheduling data about a particular address. /// -/// Specifically, it holds the current [`PageUsage`] (or no usage with [`PageUsage::Unused`]) and -/// which [`Task`]s are blocked to be executed after the current task is notified to be finished -/// via [`::deschedule_task`](`SchedulingStateMachine::deschedule_task`) +/// Specifically, it holds the current [`Usage`] (or no usage with [`Usage::Unused`]) and which +/// [`Task`]s are blocked to be executed after the current task is notified to be finished via +/// [`::deschedule_task`](`SchedulingStateMachine::deschedule_task`) #[derive(Debug)] -struct PageInner { - usage: PageUsage, - blocked_tasks: VecDeque<(Task, RequestedUsage)>, +struct UsageQueueInner { + current_usage: Usage, + blocked_usages_from_tasks: VecDeque<(RequestedUsage, Task)>, } -impl Default for PageInner { +impl Default for UsageQueueInner { fn default() -> Self { Self { - usage: PageUsage::default(), + current_usage: Usage::default(), // Capacity should be configurable to create with large capacity like 1024 inside the // (multi-threaded) closures passed to create_task(). In this way, reallocs can be // avoided happening in the scheduler thread. Also, this configurability is desired for @@ -425,37 +429,38 @@ impl Default for PageInner { // // Note that large cap should be accompanied with proper scheduler cleaning after use, // which should be handled by higher layers (i.e. scheduler pool). - blocked_tasks: VecDeque::with_capacity(128), + blocked_usages_from_tasks: VecDeque::with_capacity(128), } } } -impl PageInner { - fn push_blocked_task(&mut self, task: Task, requested_usage: RequestedUsage) { - self.blocked_tasks.push_back((task, requested_usage)); +impl UsageQueueInner { + fn push_blocked_task(&mut self, requested_usage: RequestedUsage, task: Task) { + self.blocked_usages_from_tasks + .push_back((requested_usage, task)); } fn has_no_blocked_task(&self) -> bool { - self.blocked_tasks.is_empty() + self.blocked_usages_from_tasks.is_empty() } #[must_use] - fn pop_unblocked_next_task(&mut self) -> Option<(Task, RequestedUsage)> { - self.blocked_tasks.pop_front() + fn pop_unblocked_next_task(&mut self) -> Option<(RequestedUsage, Task)> { + self.blocked_usages_from_tasks.pop_front() } #[must_use] - fn blocked_next_task(&self) -> Option<(&Task, RequestedUsage)> { - self.blocked_tasks + fn blocked_next_task(&self) -> Option<(RequestedUsage, &Task)> { + self.blocked_usages_from_tasks .front() - .map(|(task, requested_usage)| (task, *requested_usage)) + .map(|(requested_usage, task)| (*requested_usage, task)) } #[must_use] - fn pop_blocked_next_readonly_task(&mut self) -> Option<(Task, RequestedUsage)> { + fn pop_blocked_next_readonly_task(&mut self) -> Option<(RequestedUsage, Task)> { if matches!( self.blocked_next_task(), - Some((_, RequestedUsage::Readonly)) + Some((RequestedUsage::Readonly, _)) ) { self.pop_unblocked_next_task() } else { @@ -464,14 +469,14 @@ impl PageInner { } } -const_assert_eq!(mem::size_of::>(), 40); +const_assert_eq!(mem::size_of::>(), 40); /// Scheduler's internal data for each address ([`Pubkey`](`solana_sdk::pubkey::Pubkey`)). Very /// opaque wrapper type; no methods just with [`::clone()`](Clone::clone) and /// [`::default()`](Default::default). #[derive(Debug, Clone, Default)] -pub struct Page(Arc>); -const_assert_eq!(mem::size_of::(), 8); +pub struct UsageQueue(Arc>); +const_assert_eq!(mem::size_of::(), 8); /// A high-level `struct`, managing the overall scheduling of [tasks](Task), to be used by /// `solana-unified-scheduler-pool`. @@ -482,8 +487,8 @@ pub struct SchedulingStateMachine { handled_task_count: ShortCounter, unblocked_task_count: ShortCounter, total_task_count: ShortCounter, - count_token: BlockedPageCountToken, - page_token: PageToken, + count_token: BlockedUsageCountToken, + usage_queue_token: UsageQueueToken, } const_assert_eq!(mem::size_of::(), 64); @@ -559,22 +564,28 @@ impl SchedulingStateMachine { self.unlock_for_task(task); } - fn try_lock_page(page: &PageInner, requested_usage: RequestedUsage) -> LockResult { - match page.usage { - PageUsage::Unused => LockResult::Ok(PageUsage::from(requested_usage)), - PageUsage::Readonly(count) => match requested_usage { - RequestedUsage::Readonly => LockResult::Ok(PageUsage::Readonly(count.increment())), + fn try_lock_usage_queue( + usage_queue: &UsageQueueInner, + requested_usage: RequestedUsage, + ) -> LockResult { + match usage_queue.current_usage { + Usage::Unused => LockResult::Ok(Usage::from(requested_usage)), + Usage::Readonly(count) => match requested_usage { + RequestedUsage::Readonly => LockResult::Ok(Usage::Readonly(count.increment())), RequestedUsage::Writable => LockResult::Err(()), }, - PageUsage::Writable => LockResult::Err(()), + Usage::Writable => LockResult::Err(()), } } #[must_use] - fn unlock_page(page: &mut PageInner, attempt: &LockAttempt) -> Option<(Task, RequestedUsage)> { + fn unlock_usage_queue( + usage_queue: &mut UsageQueueInner, + attempt: &LockAttempt, + ) -> Option<(RequestedUsage, Task)> { let mut is_unused_now = false; - match &mut page.usage { - PageUsage::Readonly(ref mut count) => match attempt.requested_usage { + match &mut usage_queue.current_usage { + Usage::Readonly(ref mut count) => match attempt.requested_usage { RequestedUsage::Readonly => { if count.is_one() { is_unused_now = true; @@ -584,18 +595,18 @@ impl SchedulingStateMachine { } RequestedUsage::Writable => unreachable!(), }, - PageUsage::Writable => match attempt.requested_usage { + Usage::Writable => match attempt.requested_usage { RequestedUsage::Writable => { is_unused_now = true; } RequestedUsage::Readonly => unreachable!(), }, - PageUsage::Unused => unreachable!(), + Usage::Unused => unreachable!(), } if is_unused_now { - page.usage = PageUsage::Unused; - page.pop_unblocked_next_task() + usage_queue.current_usage = Usage::Unused; + usage_queue.pop_unblocked_next_task() } else { None } @@ -603,54 +614,56 @@ impl SchedulingStateMachine { #[must_use] fn try_lock_for_task(&mut self, task: Task) -> Option { - let mut blocked_page_count = ShortCounter::zero(); + let mut blocked_usage_count = ShortCounter::zero(); for attempt in task.lock_attempts() { - let page = attempt.page_mut(&mut self.page_token); - let lock_result = if page.has_no_blocked_task() { - Self::try_lock_page(page, attempt.requested_usage) + let usage_queue = attempt.usage_queue_mut(&mut self.usage_queue_token); + let lock_result = if usage_queue.has_no_blocked_task() { + Self::try_lock_usage_queue(usage_queue, attempt.requested_usage) } else { LockResult::Err(()) }; match lock_result { - LockResult::Ok(PageUsage::Unused) => unreachable!(), + LockResult::Ok(Usage::Unused) => unreachable!(), LockResult::Ok(new_usage) => { - page.usage = new_usage; + usage_queue.current_usage = new_usage; } LockResult::Err(()) => { - blocked_page_count.increment_self(); - page.push_blocked_task(task.clone(), attempt.requested_usage); + blocked_usage_count.increment_self(); + usage_queue.push_blocked_task(attempt.requested_usage, task.clone()); } } } - // no blocked page means success - if blocked_page_count.is_zero() { + // no blocked usage_queue means success + if blocked_usage_count.is_zero() { Some(task) } else { - task.set_blocked_page_count(&mut self.count_token, blocked_page_count); + task.set_blocked_usage_count(&mut self.count_token, blocked_usage_count); None } } fn unlock_for_task(&mut self, task: &Task) { for unlock_attempt in task.lock_attempts() { - let page = unlock_attempt.page_mut(&mut self.page_token); - let mut unblocked_task_from_page = Self::unlock_page(page, unlock_attempt); + let usage_queue = unlock_attempt.usage_queue_mut(&mut self.usage_queue_token); + let mut unblocked_task_from_queue = + Self::unlock_usage_queue(usage_queue, unlock_attempt); - while let Some((task_with_unblocked_page, requested_usage)) = unblocked_task_from_page { - if let Some(task) = task_with_unblocked_page.try_unblock(&mut self.count_token) { + while let Some((requested_usage, task_with_unblocked_queue)) = unblocked_task_from_queue + { + if let Some(task) = task_with_unblocked_queue.try_unblock(&mut self.count_token) { self.unblocked_task_queue.push_back(task); } - match Self::try_lock_page(page, requested_usage) { - LockResult::Ok(PageUsage::Unused) => unreachable!(), + match Self::try_lock_usage_queue(usage_queue, requested_usage) { + LockResult::Ok(Usage::Unused) => unreachable!(), LockResult::Ok(new_usage) => { - page.usage = new_usage; + usage_queue.current_usage = new_usage; // Try to further schedule blocked task for parallelism in the case of // readonly usages - unblocked_task_from_page = if matches!(new_usage, PageUsage::Readonly(_)) { - page.pop_blocked_next_readonly_task() + unblocked_task_from_queue = if matches!(new_usage, Usage::Readonly(_)) { + usage_queue.pop_blocked_next_readonly_task() } else { None }; @@ -661,23 +674,23 @@ impl SchedulingStateMachine { } } - /// Creates a new task with [`SanitizedTransaction`] with all of its corresponding [`Page`]s - /// preloaded. + /// Creates a new task with [`SanitizedTransaction`] with all of its corresponding + /// [`UsageQueue`]s preloaded. /// - /// Closure (`page_loader`) is used to delegate the (possibly multi-threaded) - /// implementation of [`Page`] look-up by [`pubkey`](Pubkey) to callers. It's the caller's - /// responsibility to ensure the same instance is returned from the closure, given a particular - /// pubkey. + /// Closure (`usage_queue_loader`) is used to delegate the (possibly multi-threaded) + /// implementation of [`UsageQueue`] look-up by [`pubkey`](Pubkey) to callers. It's the + /// caller's responsibility to ensure the same instance is returned from the closure, given a + /// particular pubkey. /// - /// Closure is used here to delegate the responsibility of general ownership of `Page` (and - /// caching/pruning if any) to the caller. `SchedulingStateMachine` guarantees that all of - /// shared owndership of `Page`s are released and Page state is identical to just after - /// created, if `has_no_active_task()` is `true`. Also note that this is desired for separation - /// of concern. + /// Closure is used here to delegate the responsibility of general ownership of `UsageQueue` + /// (and caching/pruning if any) to the caller. `SchedulingStateMachine` guarantees that all of + /// shared owndership of `UsageQueue`s are released and UsageQueue state is identical to just + /// after created, if `has_no_active_task()` is `true`. Also note that this is desired for + /// separation of concern. pub fn create_task( transaction: SanitizedTransaction, index: usize, - page_loader: &mut impl FnMut(Pubkey) -> Page, + usage_queue_loader: &mut impl FnMut(Pubkey) -> UsageQueue, ) -> Task { // Calling the _unchecked() version here is safe for faster operation, because // `get_account_locks()` (the safe variant) is ensured to be called in @@ -707,7 +720,7 @@ impl SchedulingStateMachine { let lock_attempts = writable_locks .chain(readonly_locks) .map(|(address, requested_usage)| { - LockAttempt::new(page_loader(**address), requested_usage) + LockAttempt::new(usage_queue_loader(**address), requested_usage) }) .collect(); @@ -715,7 +728,7 @@ impl SchedulingStateMachine { transaction, index, lock_attempts, - blocked_page_count: TokenCell::new(ShortCounter::zero()), + blocked_usage_count: TokenCell::new(ShortCounter::zero()), }) } @@ -723,12 +736,12 @@ impl SchedulingStateMachine { /// /// This isn't called _reset_ to indicate this isn't safe to call this at any given moment. /// This panics if the state machine hasn't properly been finished (i.e. there should be no - /// active task) to uphold invariants of [`Page`]s. + /// active task) to uphold invariants of [`UsageQueue`]s. /// /// This method is intended to reuse SchedulingStateMachine instance (to avoid its `unsafe` /// [constructor](SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling) - /// as much as possible) and its (possbily cached) associated [`Page`]s for processing other - /// slots. + /// as much as possible) and its (possbily cached) associated [`UsageQueue`]s for processing + /// other slots. pub fn reinitialize(&mut self) { assert!(self.has_no_active_task()); assert_eq!(self.unblocked_task_queue.len(), 0); @@ -749,14 +762,14 @@ impl SchedulingStateMachine { Self { last_task_index: None, // It's very unlikely this is desired to be configurable, like - // `PageInner::blocked_tasks`'s cap. + // `UsageQueueInner::blocked_usages_from_tasks`'s cap. unblocked_task_queue: VecDeque::with_capacity(1024), active_task_count: ShortCounter::zero(), handled_task_count: ShortCounter::zero(), unblocked_task_count: ShortCounter::zero(), total_task_count: ShortCounter::zero(), - count_token: unsafe { BlockedPageCountToken::assume_exclusive_mutating_thread() }, - page_token: unsafe { PageToken::assume_exclusive_mutating_thread() }, + count_token: unsafe { BlockedUsageCountToken::assume_exclusive_mutating_thread() }, + usage_queue_token: unsafe { UsageQueueToken::assume_exclusive_mutating_thread() }, } } } @@ -807,29 +820,33 @@ mod tests { } fn create_address_loader( - pages: Option>>>, - ) -> impl FnMut(Pubkey) -> Page { - let pages = pages.unwrap_or_default(); - move |address| pages.borrow_mut().entry(address).or_default().clone() + usage_queues: Option>>>, + ) -> impl FnMut(Pubkey) -> UsageQueue { + let usage_queues = usage_queues.unwrap_or_default(); + move |address| { + usage_queues + .borrow_mut() + .entry(address) + .or_default() + .clone() + } } #[test] fn test_debug() { // these are almost meaningless just to see eye-pleasing coverage report.... assert_eq!( - format!( - "{:?}", - LockResult::Ok(PageUsage::Readonly(ShortCounter::one())) - ), + format!("{:?}", LockResult::Ok(Usage::Readonly(ShortCounter::one()))), "Ok(Readonly(ShortCounter(1)))" ); let sanitized = simplest_transaction(); - let task = SchedulingStateMachine::create_task(sanitized, 0, &mut |_| Page::default()); + let task = + SchedulingStateMachine::create_task(sanitized, 0, &mut |_| UsageQueue::default()); assert!(format!("{:?}", task).contains("TaskInner")); assert_eq!( - format!("{:?}", PageInner::default()), - "PageInner { usage: Unused, blocked_tasks: [] }" + format!("{:?}", UsageQueueInner::default()), + "UsageQueueInner { current_usage: Unused, blocked_usages_from_tasks: [] }" ) } @@ -859,8 +876,9 @@ mod tests { #[test] fn test_create_task() { let sanitized = simplest_transaction(); - let task = - SchedulingStateMachine::create_task(sanitized.clone(), 3, &mut |_| Page::default()); + let task = SchedulingStateMachine::create_task(sanitized.clone(), 3, &mut |_| { + UsageQueue::default() + }); assert_eq!(task.task_index(), 3); assert_eq!(task.transaction(), &sanitized); } @@ -1242,8 +1260,8 @@ mod tests { let conflicting_address = Pubkey::new_unique(); let sanitized1 = transaction_with_writable_address(conflicting_address); let sanitized2 = transaction_with_writable_address(conflicting_address); - let pages = Rc::new(RefCell::new(HashMap::new())); - let address_loader = &mut create_address_loader(Some(pages.clone())); + let usage_queues = Rc::new(RefCell::new(HashMap::new())); + let address_loader = &mut create_address_loader(Some(usage_queues.clone())); let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader); let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader); @@ -1257,19 +1275,25 @@ mod tests { Some(101) ); assert_matches!(state_machine.schedule_task(task2.clone()), None); - let pages = pages.borrow_mut(); - let page = pages.get(&conflicting_address).unwrap(); + let usage_queues = usage_queues.borrow_mut(); + let usage_queue = usage_queues.get(&conflicting_address).unwrap(); assert_matches!( - page.0.borrow_mut(&mut state_machine.page_token).usage, - PageUsage::Writable + usage_queue + .0 + .borrow_mut(&mut state_machine.usage_queue_token) + .current_usage, + Usage::Writable ); // task2's fee payer should have been locked already even if task2 is blocked still via the // above the schedule_task(task2) call let fee_payer = task2.transaction().message().fee_payer(); - let page = pages.get(fee_payer).unwrap(); + let usage_queue = usage_queues.get(fee_payer).unwrap(); assert_matches!( - page.0.borrow_mut(&mut state_machine.page_token).usage, - PageUsage::Writable + usage_queue + .0 + .borrow_mut(&mut state_machine.usage_queue_token) + .current_usage, + Usage::Writable ); state_machine.deschedule_task(&task1); assert_matches!( @@ -1288,10 +1312,12 @@ mod tests { let mut state_machine = unsafe { SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() }; - let page = Page::default(); - let _ = SchedulingStateMachine::unlock_page( - page.0.borrow_mut(&mut state_machine.page_token), - &LockAttempt::new(page, RequestedUsage::Writable), + let usage_queue = UsageQueue::default(); + let _ = SchedulingStateMachine::unlock_usage_queue( + usage_queue + .0 + .borrow_mut(&mut state_machine.usage_queue_token), + &LockAttempt::new(usage_queue, RequestedUsage::Writable), ); } @@ -1301,11 +1327,16 @@ mod tests { let mut state_machine = unsafe { SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() }; - let page = Page::default(); - page.0.borrow_mut(&mut state_machine.page_token).usage = PageUsage::Writable; - let _ = SchedulingStateMachine::unlock_page( - page.0.borrow_mut(&mut state_machine.page_token), - &LockAttempt::new(page, RequestedUsage::Readonly), + let usage_queue = UsageQueue::default(); + usage_queue + .0 + .borrow_mut(&mut state_machine.usage_queue_token) + .current_usage = Usage::Writable; + let _ = SchedulingStateMachine::unlock_usage_queue( + usage_queue + .0 + .borrow_mut(&mut state_machine.usage_queue_token), + &LockAttempt::new(usage_queue, RequestedUsage::Readonly), ); } @@ -1315,12 +1346,16 @@ mod tests { let mut state_machine = unsafe { SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() }; - let page = Page::default(); - page.0.borrow_mut(&mut state_machine.page_token).usage = - PageUsage::Readonly(ShortCounter::one()); - let _ = SchedulingStateMachine::unlock_page( - page.0.borrow_mut(&mut state_machine.page_token), - &LockAttempt::new(page, RequestedUsage::Writable), + let usage_queue = UsageQueue::default(); + usage_queue + .0 + .borrow_mut(&mut state_machine.usage_queue_token) + .current_usage = Usage::Readonly(ShortCounter::one()); + let _ = SchedulingStateMachine::unlock_usage_queue( + usage_queue + .0 + .borrow_mut(&mut state_machine.usage_queue_token), + &LockAttempt::new(usage_queue, RequestedUsage::Writable), ); } diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index be1918807892b2..ed4b354fbd8658 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -36,7 +36,7 @@ use { pubkey::Pubkey, transaction::{Result, SanitizedTransaction}, }, - solana_unified_scheduler_logic::{Page, SchedulingStateMachine, Task}, + solana_unified_scheduler_logic::{SchedulingStateMachine, Task, UsageQueue}, solana_vote::vote_sender_types::ReplayVoteSender, std::{ fmt::Debug, @@ -394,13 +394,13 @@ mod chained_channel { } #[derive(Default, Debug)] -pub struct AddressBook { - book: DashMap, +pub struct UsageQueueLoader { + usage_queues: DashMap, } -impl AddressBook { - pub fn load(&self, address: Pubkey) -> Page { - self.book.entry(address).or_default().clone() +impl UsageQueueLoader { + pub fn load(&self, address: Pubkey) -> UsageQueue { + self.usage_queues.entry(address).or_default().clone() } } @@ -425,7 +425,7 @@ pub struct PooledScheduler { #[derive(Debug)] pub struct PooledSchedulerInner, TH: TaskHandler> { thread_manager: ThreadManager, - address_book: AddressBook, + usage_queue_loader: UsageQueueLoader, } // This type manages the OS threads for scheduling and executing transactions. The term @@ -451,7 +451,7 @@ impl PooledScheduler { Self::from_inner( PooledSchedulerInner:: { thread_manager: ThreadManager::new(pool), - address_book: AddressBook::default(), + usage_queue_loader: UsageQueueLoader::default(), }, initial_context, ) @@ -802,7 +802,7 @@ impl InstalledScheduler for PooledScheduler { fn schedule_execution(&self, &(transaction, index): &(&SanitizedTransaction, usize)) { let task = SchedulingStateMachine::create_task(transaction.clone(), index, &mut |pubkey| { - self.inner.address_book.load(pubkey) + self.inner.usage_queue_loader.load(pubkey) }); self.inner.thread_manager.send_task(task); }