-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
queue.go
110 lines (98 loc) · 3.49 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package queue
import (
"context"
"time"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
)
// RangeQueue presents an interface to interact with a single consumer
// queue, which processes replicas for replication updates if they meet the
// criteria.
// TODO(kvoli): When replicas are enqueued into multiple queues, and they are
// processed in the same Tick() - both pushing state updates to the state
// changer, then only one will be successful, as there may be at most one
// pending change per range. The split queue currently goes first and therefore
// has priority over the replication queue. We should implement a wait or retry
// next tick mechanism to address this, to match the real code.
type RangeQueue interface {
// MaybeAdd proposes a replica for inclusion into the ReplicateQueue, if it
// meets the criteria it is enqueued.
MaybeAdd(ctx context.Context, repl state.Replica, state state.State) bool
// Tick proceses updates in the ReplicateQueue. Only one replica is
// processed at a time and the duration taken to process a replica depends
// on the action taken. Replicas in the queue are processed in order of
// priority, then in FIFO order on ties.
Tick(ctx context.Context, tick time.Time, state state.State)
}
// replicaItem represents an item in the replica queue.
type replicaItem struct {
rangeID roachpb.RangeID
replicaID roachpb.ReplicaID
// Enforce FIFO order for equal priorities.
seq int
// Fields used when a replicaItem is enqueued in a priority queue.
priority float64
// The index of the item in the heap, maintained by the heap.Interface
// methods.
index int
}
type priorityQueue struct {
seqGen int
items []*replicaItem
}
// Len is part of the container.Heap interface.
func (pq priorityQueue) Len() int { return len(pq.items) }
// Less is part of the container.Heap interface.
func (pq priorityQueue) Less(i, j int) bool {
a, b := pq.items[i], pq.items[j]
if a.priority == b.priority {
// When priorities are equal, we want the lower sequence number to show
// up first (FIFO).
return a.seq < b.seq
}
// We want Pop to give us the highest, not lowest, priority so we use
// greater than here.
return a.priority > b.priority
}
// Swap is part of the container.Heap interface.
func (pq priorityQueue) Swap(i, j int) {
pq.items[i], pq.items[j] = pq.items[j], pq.items[i]
pq.items[i].index, pq.items[j].index = i, j
}
// Push is part of the container.Heap interface.
func (pq *priorityQueue) Push(x interface{}) {
n := len(pq.items)
item := x.(*replicaItem)
item.index = n
pq.seqGen++
item.seq = pq.seqGen
pq.items = append(pq.items, item)
}
// Pop is part of the container.Heap interface.
func (pq *priorityQueue) Pop() interface{} {
old := pq.items
n := len(old)
item := old[n-1]
item.index = -1 // for safety
old[n-1] = nil // for gc
pq.items = old[0 : n-1]
return item
}
// baseQueue is an implementation of the ReplicateQueue interface.
type baseQueue struct {
log.AmbientContext
priorityQueue
storeID state.StoreID
stateChanger state.Changer
next, lastTick time.Time
}