-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
A multi-queue allows multiple clients to each add to the queue with their own priorities. The queue will round-robin between the queues and prioritize within a queue. Release note: None
- Loading branch information
1 parent
5d7e232
commit fd4be08
Showing
4 changed files
with
505 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") | ||
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") | ||
|
||
go_library( | ||
name = "multiqueue", | ||
srcs = ["multi_queue.go"], | ||
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/multiqueue", | ||
visibility = ["//visibility:public"], | ||
deps = [ | ||
"//pkg/util/syncutil", | ||
"@com_github_cockroachdb_redact//:redact", | ||
], | ||
) | ||
|
||
go_test( | ||
name = "multiqueue_test", | ||
srcs = ["multi_queue_test.go"], | ||
embed = [":multiqueue"], | ||
deps = [ | ||
"//pkg/testutils", | ||
"//pkg/util/leaktest", | ||
"//pkg/util/log", | ||
"@com_github_stretchr_testify//require", | ||
], | ||
) | ||
|
||
get_x_data(name = "get_x_data") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,230 @@ | ||
// Copyright 2022 The Cockroach Authors. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.txt. | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0, included in the file | ||
// licenses/APL.txt. | ||
|
||
package multiqueue | ||
|
||
import ( | ||
"container/heap" | ||
|
||
"github.com/cockroachdb/cockroach/pkg/util/syncutil" | ||
"github.com/cockroachdb/redact" | ||
) | ||
|
||
// Task represents a request for a Permit for a piece of work that needs to be | ||
// done. It is created by a call to MultiQueue.Add. After creation, | ||
// Task.GetWaitChan is called to get a permit, and after all work related to | ||
// this task is done, MultiQueue.Release must be called so future tasks can run. | ||
// Alternatively, if the user decides they no longer want to run their work, | ||
// MultiQueue.Cancel can be called to release the permit without waiting for the | ||
// permit. | ||
type Task struct { | ||
priority float64 | ||
queueName int | ||
heapIdx int | ||
permitC chan Permit | ||
} | ||
|
||
// GetWaitChan returns a permit channel which is used to wait for the permit to | ||
// become available. | ||
func (t *Task) GetWaitChan() <-chan Permit { | ||
return t.permitC | ||
} | ||
|
||
func (t *Task) String() string { | ||
return redact.Sprintf("{Queue name : %d, Priority :%f}", t.queueName, t.priority).StripMarkers() | ||
} | ||
|
||
// notifyHeap is a standard go heap over tasks. | ||
type notifyHeap []*Task | ||
|
||
func (h notifyHeap) Len() int { | ||
return len(h) | ||
} | ||
|
||
func (h notifyHeap) Less(i, j int) bool { | ||
return h[j].priority < h[i].priority | ||
} | ||
|
||
func (h notifyHeap) Swap(i, j int) { | ||
h[i], h[j] = h[j], h[i] | ||
h[i].heapIdx = i | ||
h[j].heapIdx = j | ||
} | ||
|
||
func (h *notifyHeap) Push(x interface{}) { | ||
t := x.(*Task) | ||
// Set the index to the end, it will be moved later | ||
t.heapIdx = h.Len() | ||
*h = append(*h, t) | ||
} | ||
|
||
func (h *notifyHeap) Pop() interface{} { | ||
old := *h | ||
n := len(old) | ||
x := old[n-1] | ||
old[n-1] = nil | ||
*h = old[0 : n-1] | ||
// No longer in the heap so clear the index | ||
x.heapIdx = -1 | ||
|
||
return x | ||
} | ||
|
||
// tryRemove attempts to remove the task from this queue by iterating through | ||
// the queue. Will returns true if the task was successfully removed. | ||
func (h *notifyHeap) tryRemove(task *Task) bool { | ||
if task.heapIdx < 0 { | ||
return false | ||
} | ||
heap.Remove(h, task.heapIdx) | ||
return true | ||
} | ||
|
||
// MultiQueue is a type that round-robins through a set of named queues, each | ||
// independently prioritized. A MultiQueue is constructed with a concurrencySem | ||
// which is the number of concurrent jobs this queue will allow to run. Tasks | ||
// are added to the queue using MultiQueue.Add. That will return a channel that | ||
// should be received from. It will be notified when the waiting job is ready to | ||
// be run. Once the job is completed, MultiQueue.TaskDone must be called to | ||
// return the Permit to the queue so that the next Task can be started. | ||
type MultiQueue struct { | ||
mu syncutil.Mutex | ||
remainingRuns int | ||
mapping map[int]int | ||
lastQueueIndex int | ||
outstanding []notifyHeap | ||
} | ||
|
||
// NewMultiQueue creates a new queue. The queue is not started, and start needs | ||
// to be called on it first. | ||
func NewMultiQueue(maxConcurrency int) *MultiQueue { | ||
queue := MultiQueue{ | ||
remainingRuns: maxConcurrency, | ||
mapping: make(map[int]int), | ||
} | ||
queue.lastQueueIndex = -1 | ||
|
||
return &queue | ||
} | ||
|
||
// Permit is a token which is returned from a Task.GetWaitChan call. | ||
type Permit struct { | ||
valid bool | ||
} | ||
|
||
// tryRunNext will run the next task in order round-robin through the queues and in | ||
// priority order within a queue. It will return true if it ran a task. The | ||
// MultiQueue.mu lock must be held before calling this func. | ||
func (m *MultiQueue) tryRunNext() { | ||
// If no permits are left, then we can't run anything. | ||
if m.remainingRuns <= 0 { | ||
return | ||
} | ||
|
||
for i := 0; i < len(m.outstanding); i++ { | ||
// Start with the next queue in order and iterate through all empty queues. | ||
// If all queues are empty then return false signaling that nothing was run. | ||
index := (m.lastQueueIndex + i + 1) % len(m.outstanding) | ||
if m.outstanding[index].Len() > 0 { | ||
task := heap.Pop(&m.outstanding[index]).(*Task) | ||
task.permitC <- Permit{valid: true} | ||
m.remainingRuns-- | ||
m.lastQueueIndex = index | ||
return | ||
} | ||
} | ||
} | ||
|
||
// Add returns a Task that must be closed (calling Task.Close) to | ||
// release the Permit. The number of names is expected to | ||
// be relatively small and not be changing over time. | ||
func (m *MultiQueue) Add(name int, priority float64) *Task { | ||
m.mu.Lock() | ||
defer m.mu.Unlock() | ||
|
||
// The mutex starts locked, unlock it when we are ready to run. | ||
pos, ok := m.mapping[name] | ||
if !ok { | ||
// Append a new entry to both mapping and outstanding each time there is | ||
// a new queue name. | ||
pos = len(m.outstanding) | ||
m.mapping[name] = pos | ||
m.outstanding = append(m.outstanding, notifyHeap{}) | ||
} | ||
newTask := Task{ | ||
priority: priority, | ||
permitC: make(chan Permit, 1), | ||
heapIdx: -1, | ||
queueName: name, | ||
} | ||
heap.Push(&m.outstanding[pos], &newTask) | ||
|
||
// Once we are done adding a task, signal the main loop in case it finished | ||
// all its work and was waiting for more work. We are holding the mu lock when | ||
// signaling, so we guarantee that it will not be able to respond to the | ||
// signal until after we release the lock. | ||
m.tryRunNext() | ||
|
||
return &newTask | ||
} | ||
|
||
// Cancel will cancel a Task that may not have started yet. This is useful if it | ||
// is determined that it is no longer required to run this Task. | ||
func (m *MultiQueue) Cancel(task *Task) { | ||
m.mu.Lock() | ||
defer m.mu.Unlock() | ||
// Find the right queue and try to remove it. Queues monotonically grow, and a | ||
// Task will track its position within the queue. | ||
queueIdx := m.mapping[task.queueName] | ||
ok := m.outstanding[queueIdx].tryRemove(task) | ||
// If we get here, we are racing with the task being started. The concern is | ||
// that the caller may also call MultiQueue.Release since the task was | ||
// started. Either we get the permit or the caller, so we guarantee only one | ||
// release will be called. | ||
if !ok { | ||
select { | ||
case p, ok := <-task.permitC: | ||
// Only release if the channel is open, and we can get the permit. | ||
if ok { | ||
p.valid = false | ||
m.remainingRuns++ | ||
} | ||
default: | ||
// If we are not able to get the permit, this means the permit has already | ||
// been given to the caller, and they must call Release on it. | ||
} | ||
} | ||
m.tryRunNext() | ||
} | ||
|
||
// Release needs to be called once the Task that was running has completed and | ||
// is no longer using system resources. This allows the MultiQueue to call the | ||
// next Task. | ||
func (m *MultiQueue) Release(permit *Permit) { | ||
if !permit.valid { | ||
panic("double release of permit") | ||
} | ||
permit.valid = false | ||
|
||
m.mu.Lock() | ||
defer m.mu.Unlock() | ||
|
||
// We released one, so we can run one more now. | ||
m.remainingRuns++ | ||
m.tryRunNext() | ||
} | ||
|
||
// Len returns the number of additional tasks that can be added without queueing. | ||
// This will return 0 if there is anything queued. | ||
func (m *MultiQueue) Len() int { | ||
m.mu.Lock() | ||
defer m.mu.Unlock() | ||
return m.remainingRuns | ||
} |
Oops, something went wrong.