Skip to content

Commit

Permalink
kvserver: add additional testing to multiqueue
Browse files Browse the repository at this point in the history
Add testing for cancelation of multi-queue requests
and fix a bug where the channel wasn't closed on task
cancelation.

Release justification: Test-only change.
Release note: None
  • Loading branch information
AlexTalks committed Aug 25, 2022
1 parent 003c036 commit 73aa6f2
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 18 deletions.
37 changes: 19 additions & 18 deletions pkg/kv/kvserver/multiqueue/multi_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ type Permit struct {
}

// tryRunNextLocked will run the next task in order round-robin through the
// queues and in priority order within a queue. It will return true if it ran a
// task. The MultiQueue.mu lock must be held before calling this func.
// queues and in priority order within a queue.
// MultiQueue.mu lock must be held before calling this function.
func (m *MultiQueue) tryRunNextLocked() {
// If no permits are left, then we can't run anything.
if m.remainingRuns <= 0 {
Expand All @@ -130,7 +130,7 @@ func (m *MultiQueue) tryRunNextLocked() {

for i := 0; i < len(m.outstanding); i++ {
// Start with the next queue in order and iterate through all empty queues.
// If all queues are empty then return false signaling that nothing was run.
// If all queues are empty then return, as there is nothing to run.
index := (m.lastQueueIndex + i + 1) % len(m.outstanding)
if m.outstanding[index].Len() > 0 {
task := heap.Pop(&m.outstanding[index]).(*Task)
Expand All @@ -142,7 +142,7 @@ func (m *MultiQueue) tryRunNextLocked() {
}
}

// Add returns a Task that must be closed (calling Task.Close) to
// Add returns a Task that must be closed (calling m.Release(..)) to
// release the Permit. The number of types is expected to
// be relatively small and not be changing over time.
func (m *MultiQueue) Add(queueType int, priority float64) *Task {
Expand All @@ -166,10 +166,7 @@ func (m *MultiQueue) Add(queueType int, priority float64) *Task {
}
heap.Push(&m.outstanding[pos], &newTask)

// Once we are done adding a task, signal the main loop in case it finished
// all its work and was waiting for more work. We are holding the mu lock when
// signaling, so we guarantee that it will not be able to respond to the
// signal until after we release the lock.
// Once we are done adding a task, attempt to signal the next waiting task.
m.tryRunNextLocked()

return &newTask
Expand All @@ -184,21 +181,25 @@ func (m *MultiQueue) Cancel(task *Task) {
// Task will track its position within the queue.
queueIdx := m.mapping[task.queueType]
ok := m.outstanding[queueIdx].tryRemove(task)
// Close the permit channel so that waiters stop blocking.
if ok {
close(task.permitC)
return
}
// If we get here, we are racing with the task being started. The concern is
// that the caller may also call MultiQueue.Release since the task was
// started. Either we get the permit or the caller, so we guarantee only one
// release will be called.
if !ok {
select {
case p, ok := <-task.permitC:
// Only release if the channel is open, and we can get the permit.
if ok {
m.releaseLocked(p)
}
default:
// If we are not able to get the permit, this means the permit has already
// been given to the caller, and they must call Release on it.
select {
case p, ok := <-task.permitC:
// Only release if the channel is open, and we can get the permit.
if ok {
close(task.permitC)
m.releaseLocked(p)
}
default:
// If we are not able to get the permit, this means the permit has already
// been given to the caller, and they must call Release on it.
}
}

Expand Down
94 changes: 94 additions & 0 deletions pkg/kv/kvserver/multiqueue/multi_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,100 @@ func TestMultiQueueRemove(t *testing.T) {
verifyOrder(t, queue, a3, b3, c3, a2, c2)
}

func TestMultiQueueCancelOne(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
queue := NewMultiQueue(1)
task := queue.Add(1, 1)
queue.Cancel(task)
}

func TestMultiQueueCancelInProgress(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

queue := NewMultiQueue(1)

const a = 1
const b = 2
const c = 3

a3 := queue.Add(a, 5.0)
a2 := queue.Add(a, 4.0)
b1 := queue.Add(b, 1.1)
b2 := queue.Add(b, 2.1)
c3 := queue.Add(c, 2.2)
b3 := queue.Add(b, 6.1)

queue.Cancel(b2)
queue.Cancel(b1)

started := 0
completed := 0
startTask := func(task *Task) (*Permit, bool) {
select {
case permit, ok := <-task.GetWaitChan():
if ok {
started++
return permit, true
}
case <-time.After(time.Second):
t.Fatalf(`should not wait for task on queue %d with priority %f to start`,
task.queueType, task.priority,
)
}
return nil, false
}

completeTask := func(task *Task, permit *Permit) {
releaseStarted := make(chan struct{})
releaseFinished := make(chan struct{})
go func() {
close(releaseStarted)
queue.Release(permit)
close(releaseFinished)
}()
<-releaseStarted
select {
case <-releaseFinished:
completed++
case <-time.After(time.Second):
t.Fatalf(`should not wait for task on queue %d with priority %f to complete`,
task.queueType, task.priority,
)
}
}

// Execute a3.
a3Permit, ok := startTask(a3)
require.True(t, ok)
completeTask(a3, a3Permit)

// Cancel b3 before starting. Should not be able to get permit.
queue.Cancel(b3)
_, ok = startTask(b3)
require.False(t, ok)

// Now, should be able to execute c3 immediately.
c3Permit, ok := startTask(c3)
require.True(t, ok)

// A and C started
require.Equal(t, 2, started)
// A completed
require.Equal(t, 1, completed)

// Complete c3 and cancel after completion.
completeTask(c3, c3Permit)
queue.Cancel(c3)

// Start a2, which is the final item and also should not block to start.
startTask(a2)

require.Equal(t, 3, started)
require.Equal(t, 2, completed)
}

// TestMultiQueueStress calls Add from multiple threads. It chooses different
// names and different priorities for the requests. The goal is simply to make
// sure that all the requests are serviced and nothing hangs or fails.
Expand Down

0 comments on commit 73aa6f2

Please sign in to comment.