diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index f5e6bf86ce9e15..82b7bbfe524dda 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -225,12 +225,12 @@ mod utils { } } - // Safety: Access to TokenCell is assumed to be only from a single thread by proper use of - // Token once after TokenCell is sent to the thread from other threads; So, both implementing + // Safety: Access to `TokenCell` is assumed to be only from a single thread by proper use of + // Token once after `TokenCell` is sent to the thread from other threads; So, both implementing // Send and Sync can be thought as safe. // - // In other words, TokenCell is technicall still !Send and !Sync. But there should be no legal - // use happening which requires !Send or !Sync to avoid undefined behavior. + // In other words, TokenCell is technically still `!Send` and `!Sync`. But there should be no + // legalized usage which depends on real `Send` and `Sync` to avoid undefined behaviors. unsafe impl Send for TokenCell {} unsafe impl Sync for TokenCell {} @@ -413,9 +413,11 @@ enum RequestedUsage { #[derive(Debug)] struct UsageQueueInner { current_usage: Usage, - blocked_usages_from_tasks: VecDeque<(RequestedUsage, Task)>, + blocked_usages_from_tasks: VecDeque, } +type UsageFromTask = (RequestedUsage, Task); + impl Default for UsageQueueInner { fn default() -> Self { Self { @@ -435,38 +437,30 @@ impl Default for UsageQueueInner { } impl UsageQueueInner { - fn push_blocked_task(&mut self, requested_usage: RequestedUsage, task: Task) { - self.blocked_usages_from_tasks - .push_back((requested_usage, task)); - } - - fn has_no_blocked_task(&self) -> bool { - self.blocked_usages_from_tasks.is_empty() + fn push_blocked_usage_from_task(&mut self, usage_from_task: UsageFromTask) { + self.blocked_usages_from_tasks.push_back(usage_from_task); } #[must_use] - fn pop_unblocked_next_task(&mut self) -> Option<(RequestedUsage, Task)> { + fn pop_unblocked_usage_from_task(&mut self) -> Option { self.blocked_usages_from_tasks.pop_front() } #[must_use] - fn blocked_next_task(&self) -> Option<(RequestedUsage, &Task)> { - self.blocked_usages_from_tasks - .front() - .map(|(requested_usage, task)| (*requested_usage, task)) - } - - #[must_use] - fn pop_blocked_next_readonly_task(&mut self) -> Option<(RequestedUsage, Task)> { + fn pop_unblocked_readonly_usage_from_task(&mut self) -> Option { if matches!( - self.blocked_next_task(), + self.blocked_usages_from_tasks.front(), Some((RequestedUsage::Readonly, _)) ) { - self.pop_unblocked_next_task() + self.pop_unblocked_usage_from_task() } else { None } } + + fn has_no_blocked_usage(&self) -> bool { + self.blocked_usages_from_tasks.is_empty() + } } const_assert_eq!(mem::size_of::>(), 40); @@ -497,6 +491,10 @@ impl SchedulingStateMachine { self.active_task_count.is_zero() } + pub fn has_unblocked_task(&self) -> bool { + !self.unblocked_task_queue.is_empty() + } + pub fn unblocked_task_queue_count(&self) -> usize { self.unblocked_task_queue.len() } @@ -538,15 +536,10 @@ impl SchedulingStateMachine { self.try_lock_for_task(task) } - pub fn has_unblocked_task(&self) -> bool { - !self.unblocked_task_queue.is_empty() - } - #[must_use] pub fn schedule_unblocked_task(&mut self) -> Option { - self.unblocked_task_queue.pop_front().map(|task| { + self.unblocked_task_queue.pop_front().inspect(|_| { self.unblocked_task_count.increment_self(); - task }) } @@ -606,7 +599,7 @@ impl SchedulingStateMachine { if is_unused_now { usage_queue.current_usage = Usage::Unused; - usage_queue.pop_unblocked_next_task() + usage_queue.pop_unblocked_usage_from_task() } else { None } @@ -618,7 +611,7 @@ impl SchedulingStateMachine { for attempt in task.lock_attempts() { let usage_queue = attempt.usage_queue_mut(&mut self.usage_queue_token); - let lock_result = if usage_queue.has_no_blocked_task() { + let lock_result = if usage_queue.has_no_blocked_usage() { Self::try_lock_usage_queue(usage_queue, attempt.requested_usage) } else { LockResult::Err(()) @@ -630,12 +623,13 @@ impl SchedulingStateMachine { } LockResult::Err(()) => { blocked_usage_count.increment_self(); - usage_queue.push_blocked_task(attempt.requested_usage, task.clone()); + let usage_from_task = (attempt.requested_usage, task.clone()); + usage_queue.push_blocked_usage_from_task(usage_from_task); } } } - // no blocked usage_queue means success + // no blocked usage count means success if blocked_usage_count.is_zero() { Some(task) } else { @@ -645,10 +639,9 @@ impl SchedulingStateMachine { } fn unlock_for_task(&mut self, task: &Task) { - for unlock_attempt in task.lock_attempts() { - let usage_queue = unlock_attempt.usage_queue_mut(&mut self.usage_queue_token); - let mut unblocked_task_from_queue = - Self::unlock_usage_queue(usage_queue, unlock_attempt); + for attempt in task.lock_attempts() { + let usage_queue = attempt.usage_queue_mut(&mut self.usage_queue_token); + let mut unblocked_task_from_queue = Self::unlock_usage_queue(usage_queue, attempt); while let Some((requested_usage, task_with_unblocked_queue)) = unblocked_task_from_queue { @@ -663,7 +656,7 @@ impl SchedulingStateMachine { // Try to further schedule blocked task for parallelism in the case of // readonly usages unblocked_task_from_queue = if matches!(new_usage, Usage::Readonly(_)) { - usage_queue.pop_blocked_next_readonly_task() + usage_queue.pop_unblocked_readonly_usage_from_task() } else { None };