diff --git a/pkg/kv/kvserver/multiqueue/multi_queue.go b/pkg/kv/kvserver/multiqueue/multi_queue.go index 813c07bd3a6a..588c4556de46 100644 --- a/pkg/kv/kvserver/multiqueue/multi_queue.go +++ b/pkg/kv/kvserver/multiqueue/multi_queue.go @@ -120,8 +120,8 @@ type Permit struct { } // tryRunNextLocked will run the next task in order round-robin through the -// queues and in priority order within a queue. It will return true if it ran a -// task. The MultiQueue.mu lock must be held before calling this func. +// queues and in priority order within a queue. +// MultiQueue.mu lock must be held before calling this function. func (m *MultiQueue) tryRunNextLocked() { // If no permits are left, then we can't run anything. if m.remainingRuns <= 0 { @@ -130,7 +130,7 @@ func (m *MultiQueue) tryRunNextLocked() { for i := 0; i < len(m.outstanding); i++ { // Start with the next queue in order and iterate through all empty queues. - // If all queues are empty then return false signaling that nothing was run. + // If all queues are empty then return, as there is nothing to run. index := (m.lastQueueIndex + i + 1) % len(m.outstanding) if m.outstanding[index].Len() > 0 { task := heap.Pop(&m.outstanding[index]).(*Task) @@ -142,7 +142,7 @@ func (m *MultiQueue) tryRunNextLocked() { } } -// Add returns a Task that must be closed (calling Task.Close) to +// Add returns a Task that must be closed (calling m.Release(..)) to // release the Permit. The number of types is expected to // be relatively small and not be changing over time. func (m *MultiQueue) Add(queueType int, priority float64) *Task { @@ -166,10 +166,7 @@ func (m *MultiQueue) Add(queueType int, priority float64) *Task { } heap.Push(&m.outstanding[pos], &newTask) - // Once we are done adding a task, signal the main loop in case it finished - // all its work and was waiting for more work. We are holding the mu lock when - // signaling, so we guarantee that it will not be able to respond to the - // signal until after we release the lock. + // Once we are done adding a task, attempt to signal the next waiting task. m.tryRunNextLocked() return &newTask @@ -184,21 +181,25 @@ func (m *MultiQueue) Cancel(task *Task) { // Task will track its position within the queue. queueIdx := m.mapping[task.queueType] ok := m.outstanding[queueIdx].tryRemove(task) + // Close the permit channel so that waiters stop blocking. + if ok { + close(task.permitC) + return + } // If we get here, we are racing with the task being started. The concern is // that the caller may also call MultiQueue.Release since the task was // started. Either we get the permit or the caller, so we guarantee only one // release will be called. - if !ok { - select { - case p, ok := <-task.permitC: - // Only release if the channel is open, and we can get the permit. - if ok { - m.releaseLocked(p) - } - default: - // If we are not able to get the permit, this means the permit has already - // been given to the caller, and they must call Release on it. + select { + case p, ok := <-task.permitC: + // Only release if the channel is open, and we can get the permit. + if ok { + close(task.permitC) + m.releaseLocked(p) } + default: + // If we are not able to get the permit, this means the permit has already + // been given to the caller, and they must call Release on it. } } diff --git a/pkg/kv/kvserver/multiqueue/multi_queue_test.go b/pkg/kv/kvserver/multiqueue/multi_queue_test.go index ce7e676b3348..8e693a1e210f 100644 --- a/pkg/kv/kvserver/multiqueue/multi_queue_test.go +++ b/pkg/kv/kvserver/multiqueue/multi_queue_test.go @@ -121,6 +121,100 @@ func TestMultiQueueRemove(t *testing.T) { verifyOrder(t, queue, a3, b3, c3, a2, c2) } +func TestMultiQueueCancelOne(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + queue := NewMultiQueue(1) + task := queue.Add(1, 1) + queue.Cancel(task) +} + +func TestMultiQueueCancelInProgress(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + queue := NewMultiQueue(1) + + const a = 1 + const b = 2 + const c = 3 + + a3 := queue.Add(a, 5.0) + a2 := queue.Add(a, 4.0) + b1 := queue.Add(b, 1.1) + b2 := queue.Add(b, 2.1) + c3 := queue.Add(c, 2.2) + b3 := queue.Add(b, 6.1) + + queue.Cancel(b2) + queue.Cancel(b1) + + started := 0 + completed := 0 + startTask := func(task *Task) (*Permit, bool) { + select { + case permit, ok := <-task.GetWaitChan(): + if ok { + started++ + return permit, true + } + case <-time.After(time.Second): + t.Fatalf(`should not wait for task on queue %d with priority %f to start`, + task.queueType, task.priority, + ) + } + return nil, false + } + + completeTask := func(task *Task, permit *Permit) { + releaseStarted := make(chan struct{}) + releaseFinished := make(chan struct{}) + go func() { + close(releaseStarted) + queue.Release(permit) + close(releaseFinished) + }() + <-releaseStarted + select { + case <-releaseFinished: + completed++ + case <-time.After(time.Second): + t.Fatalf(`should not wait for task on queue %d with priority %f to complete`, + task.queueType, task.priority, + ) + } + } + + // Execute a3. + a3Permit, ok := startTask(a3) + require.True(t, ok) + completeTask(a3, a3Permit) + + // Cancel b3 before starting. Should not be able to get permit. + queue.Cancel(b3) + _, ok = startTask(b3) + require.False(t, ok) + + // Now, should be able to execute c3 immediately. + c3Permit, ok := startTask(c3) + require.True(t, ok) + + // A and C started + require.Equal(t, 2, started) + // A completed + require.Equal(t, 1, completed) + + // Complete c3 and cancel after completion. + completeTask(c3, c3Permit) + queue.Cancel(c3) + + // Start a2, which is the final item and also should not block to start. + startTask(a2) + + require.Equal(t, 3, started) + require.Equal(t, 2, completed) +} + // TestMultiQueueStress calls Add from multiple threads. It chooses different // names and different priorities for the requests. The goal is simply to make // sure that all the requests are serviced and nothing hangs or fails.