-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
multi_queue.go
230 lines (201 loc) · 6.83 KB
/
multi_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package multiqueue
import (
"container/heap"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/redact"
)
// Task represents a request for a Permit for a piece of work that needs to be
// done. It is created by a call to MultiQueue.Add. After creation,
// Task.GetWaitChan is called to get a permit, and after all work related to
// this task is done, MultiQueue.Release must be called so future tasks can run.
// Alternatively, if the user decides they no longer want to run their work,
// MultiQueue.Cancel can be called to release the permit without waiting for the
// permit.
type Task struct {
priority float64
queueName int
heapIdx int
permitC chan Permit
}
// GetWaitChan returns a permit channel which is used to wait for the permit to
// become available.
func (t *Task) GetWaitChan() <-chan Permit {
return t.permitC
}
func (t *Task) String() string {
return redact.Sprintf("{Queue name : %d, Priority :%f}", t.queueName, t.priority).StripMarkers()
}
// notifyHeap is a standard go heap over tasks.
type notifyHeap []*Task
func (h notifyHeap) Len() int {
return len(h)
}
func (h notifyHeap) Less(i, j int) bool {
return h[j].priority < h[i].priority
}
func (h notifyHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].heapIdx = i
h[j].heapIdx = j
}
func (h *notifyHeap) Push(x interface{}) {
t := x.(*Task)
// Set the index to the end, it will be moved later
t.heapIdx = h.Len()
*h = append(*h, t)
}
func (h *notifyHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
old[n-1] = nil
*h = old[0 : n-1]
// No longer in the heap so clear the index
x.heapIdx = -1
return x
}
// tryRemove attempts to remove the task from this queue by iterating through
// the queue. Will returns true if the task was successfully removed.
func (h *notifyHeap) tryRemove(task *Task) bool {
if task.heapIdx < 0 {
return false
}
heap.Remove(h, task.heapIdx)
return true
}
// MultiQueue is a type that round-robins through a set of named queues, each
// independently prioritized. A MultiQueue is constructed with a concurrencySem
// which is the number of concurrent jobs this queue will allow to run. Tasks
// are added to the queue using MultiQueue.Add. That will return a channel that
// should be received from. It will be notified when the waiting job is ready to
// be run. Once the job is completed, MultiQueue.TaskDone must be called to
// return the Permit to the queue so that the next Task can be started.
type MultiQueue struct {
mu syncutil.Mutex
remainingRuns int
mapping map[int]int
lastQueueIndex int
outstanding []notifyHeap
}
// NewMultiQueue creates a new queue. The queue is not started, and start needs
// to be called on it first.
func NewMultiQueue(maxConcurrency int) *MultiQueue {
queue := MultiQueue{
remainingRuns: maxConcurrency,
mapping: make(map[int]int),
}
queue.lastQueueIndex = -1
return &queue
}
// Permit is a token which is returned from a Task.GetWaitChan call.
type Permit struct {
valid bool
}
// tryRunNext will run the next task in order round-robin through the queues and in
// priority order within a queue. It will return true if it ran a task. The
// MultiQueue.mu lock must be held before calling this func.
func (m *MultiQueue) tryRunNext() {
// If no permits are left, then we can't run anything.
if m.remainingRuns <= 0 {
return
}
for i := 0; i < len(m.outstanding); i++ {
// Start with the next queue in order and iterate through all empty queues.
// If all queues are empty then return false signaling that nothing was run.
index := (m.lastQueueIndex + i + 1) % len(m.outstanding)
if m.outstanding[index].Len() > 0 {
task := heap.Pop(&m.outstanding[index]).(*Task)
task.permitC <- Permit{valid: true}
m.remainingRuns--
m.lastQueueIndex = index
return
}
}
}
// Add returns a Task that must be closed (calling Task.Close) to
// release the Permit. The number of names is expected to
// be relatively small and not be changing over time.
func (m *MultiQueue) Add(name int, priority float64) *Task {
m.mu.Lock()
defer m.mu.Unlock()
// The mutex starts locked, unlock it when we are ready to run.
pos, ok := m.mapping[name]
if !ok {
// Append a new entry to both mapping and outstanding each time there is
// a new queue name.
pos = len(m.outstanding)
m.mapping[name] = pos
m.outstanding = append(m.outstanding, notifyHeap{})
}
newTask := Task{
priority: priority,
permitC: make(chan Permit, 1),
heapIdx: -1,
queueName: name,
}
heap.Push(&m.outstanding[pos], &newTask)
// Once we are done adding a task, signal the main loop in case it finished
// all its work and was waiting for more work. We are holding the mu lock when
// signaling, so we guarantee that it will not be able to respond to the
// signal until after we release the lock.
m.tryRunNext()
return &newTask
}
// Cancel will cancel a Task that may not have started yet. This is useful if it
// is determined that it is no longer required to run this Task.
func (m *MultiQueue) Cancel(task *Task) {
m.mu.Lock()
defer m.mu.Unlock()
// Find the right queue and try to remove it. Queues monotonically grow, and a
// Task will track its position within the queue.
queueIdx := m.mapping[task.queueName]
ok := m.outstanding[queueIdx].tryRemove(task)
// If we get here, we are racing with the task being started. The concern is
// that the caller may also call MultiQueue.Release since the task was
// started. Either we get the permit or the caller, so we guarantee only one
// release will be called.
if !ok {
select {
case p, ok := <-task.permitC:
// Only release if the channel is open, and we can get the permit.
if ok {
p.valid = false
m.remainingRuns++
}
default:
// If we are not able to get the permit, this means the permit has already
// been given to the caller, and they must call Release on it.
}
}
m.tryRunNext()
}
// Release needs to be called once the Task that was running has completed and
// is no longer using system resources. This allows the MultiQueue to call the
// next Task.
func (m *MultiQueue) Release(permit *Permit) {
if !permit.valid {
panic("double release of permit")
}
permit.valid = false
m.mu.Lock()
defer m.mu.Unlock()
// We released one, so we can run one more now.
m.remainingRuns++
m.tryRunNext()
}
// Len returns the number of additional tasks that can be added without queueing.
// This will return 0 if there is anything queued.
func (m *MultiQueue) Len() int {
m.mu.Lock()
defer m.mu.Unlock()
return m.remainingRuns
}