Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: add additional testing to multiqueue #86630

Merged
merged 1 commit into from
Aug 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 19 additions & 18 deletions pkg/kv/kvserver/multiqueue/multi_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ type Permit struct {
}

// tryRunNextLocked will run the next task in order round-robin through the
// queues and in priority order within a queue. It will return true if it ran a
// task. The MultiQueue.mu lock must be held before calling this func.
// queues and in priority order within a queue.
// MultiQueue.mu lock must be held before calling this function.
func (m *MultiQueue) tryRunNextLocked() {
// If no permits are left, then we can't run anything.
if m.remainingRuns <= 0 {
Expand All @@ -130,7 +130,7 @@ func (m *MultiQueue) tryRunNextLocked() {

for i := 0; i < len(m.outstanding); i++ {
// Start with the next queue in order and iterate through all empty queues.
// If all queues are empty then return false signaling that nothing was run.
// If all queues are empty then return, as there is nothing to run.
index := (m.lastQueueIndex + i + 1) % len(m.outstanding)
if m.outstanding[index].Len() > 0 {
task := heap.Pop(&m.outstanding[index]).(*Task)
Expand All @@ -142,7 +142,7 @@ func (m *MultiQueue) tryRunNextLocked() {
}
}

// Add returns a Task that must be closed (calling Task.Close) to
// Add returns a Task that must be closed (calling m.Release(..)) to
// release the Permit. The number of types is expected to
// be relatively small and not be changing over time.
func (m *MultiQueue) Add(queueType int, priority float64) *Task {
Expand All @@ -166,10 +166,7 @@ func (m *MultiQueue) Add(queueType int, priority float64) *Task {
}
heap.Push(&m.outstanding[pos], &newTask)

// Once we are done adding a task, signal the main loop in case it finished
// all its work and was waiting for more work. We are holding the mu lock when
// signaling, so we guarantee that it will not be able to respond to the
// signal until after we release the lock.
// Once we are done adding a task, attempt to signal the next waiting task.
m.tryRunNextLocked()

return &newTask
Expand All @@ -184,21 +181,25 @@ func (m *MultiQueue) Cancel(task *Task) {
// Task will track its position within the queue.
queueIdx := m.mapping[task.queueType]
ok := m.outstanding[queueIdx].tryRemove(task)
// Close the permit channel so that waiters stop blocking.
if ok {
close(task.permitC)
return
}
// If we get here, we are racing with the task being started. The concern is
// that the caller may also call MultiQueue.Release since the task was
// started. Either we get the permit or the caller, so we guarantee only one
// release will be called.
if !ok {
select {
case p, ok := <-task.permitC:
// Only release if the channel is open, and we can get the permit.
if ok {
m.releaseLocked(p)
}
default:
// If we are not able to get the permit, this means the permit has already
// been given to the caller, and they must call Release on it.
select {
case p, ok := <-task.permitC:
// Only release if the channel is open, and we can get the permit.
if ok {
close(task.permitC)
m.releaseLocked(p)
}
default:
// If we are not able to get the permit, this means the permit has already
// been given to the caller, and they must call Release on it.
}
}

Expand Down
94 changes: 94 additions & 0 deletions pkg/kv/kvserver/multiqueue/multi_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,100 @@ func TestMultiQueueRemove(t *testing.T) {
verifyOrder(t, queue, a3, b3, c3, a2, c2)
}

func TestMultiQueueCancelOne(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
queue := NewMultiQueue(1)
task := queue.Add(1, 1)
queue.Cancel(task)
}

func TestMultiQueueCancelInProgress(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

queue := NewMultiQueue(1)

const a = 1
const b = 2
const c = 3

a3 := queue.Add(a, 5.0)
a2 := queue.Add(a, 4.0)
b1 := queue.Add(b, 1.1)
b2 := queue.Add(b, 2.1)
c3 := queue.Add(c, 2.2)
b3 := queue.Add(b, 6.1)

queue.Cancel(b2)
queue.Cancel(b1)

started := 0
completed := 0
startTask := func(task *Task) (*Permit, bool) {
select {
case permit, ok := <-task.GetWaitChan():
if ok {
started++
return permit, true
}
case <-time.After(time.Second):
t.Fatalf(`should not wait for task on queue %d with priority %f to start`,
task.queueType, task.priority,
)
}
return nil, false
}

completeTask := func(task *Task, permit *Permit) {
releaseStarted := make(chan struct{})
releaseFinished := make(chan struct{})
go func() {
close(releaseStarted)
queue.Release(permit)
close(releaseFinished)
}()
<-releaseStarted
select {
case <-releaseFinished:
completed++
case <-time.After(time.Second):
t.Fatalf(`should not wait for task on queue %d with priority %f to complete`,
task.queueType, task.priority,
)
}
}

// Execute a3.
a3Permit, ok := startTask(a3)
require.True(t, ok)
completeTask(a3, a3Permit)

// Cancel b3 before starting. Should not be able to get permit.
queue.Cancel(b3)
_, ok = startTask(b3)
require.False(t, ok)

// Now, should be able to execute c3 immediately.
c3Permit, ok := startTask(c3)
require.True(t, ok)

// A and C started
require.Equal(t, 2, started)
// A completed
require.Equal(t, 1, completed)

// Complete c3 and cancel after completion.
completeTask(c3, c3Permit)
queue.Cancel(c3)

// Start a2, which is the final item and also should not block to start.
startTask(a2)

require.Equal(t, 3, started)
require.Equal(t, 2, completed)
}

// TestMultiQueueStress calls Add from multiple threads. It chooses different
// names and different priorities for the requests. The goal is simply to make
// sure that all the requests are serviced and nothing hangs or fails.
Expand Down