forked from cockroachdb/pebble
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pacer.go
219 lines (190 loc) · 7.33 KB
/
pacer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
// Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.
package pebble
import (
"context"
"errors"
"time"
)
var nilPacer = &noopPacer{}
type limiter interface {
WaitN(ctx context.Context, n int) (err error)
AllowN(now time.Time, n int) bool
Burst() int
}
// pacer is the interface for flush and compaction rate limiters. The rate limiter
// is possible applied on each iteration step of a flush or compaction. This is to
// limit background IO usage so that it does not contend with foreground traffic.
type pacer interface {
maybeThrottle(bytesIterated uint64) error
}
// internalPacer contains fields and methods common to both compactionPacer and
// flushPacer.
type internalPacer struct {
limiter limiter
iterCount uint64
prevBytesIterated uint64
refreshBytesThreshold uint64
slowdownThreshold uint64
}
// limit applies rate limiting if the current byte level is below the configured
// threshold.
func (p *internalPacer) limit(amount, currentLevel uint64) error {
if currentLevel <= p.slowdownThreshold {
burst := p.limiter.Burst()
for amount > uint64(burst) {
err := p.limiter.WaitN(context.Background(), burst)
if err != nil {
return err
}
amount -= uint64(burst)
}
err := p.limiter.WaitN(context.Background(), int(amount))
if err != nil {
return err
}
} else {
burst := p.limiter.Burst()
for amount > uint64(burst) {
p.limiter.AllowN(time.Now(), burst)
amount -= uint64(burst)
}
p.limiter.AllowN(time.Now(), int(amount))
}
return nil
}
// compactionPacerInfo contains information necessary for compaction pacing.
type compactionPacerInfo struct {
// slowdownThreshold is the low watermark for compaction debt. If compaction debt is
// below this threshold, we slow down compactions. If compaction debt is above this
// threshold, we let compactions continue as fast as possible. We want to keep
// compaction speed as slow as possible to match the speed of flushes. This threshold
// is set so that a single flush cannot contribute enough compaction debt to overshoot
// the threshold.
slowdownThreshold uint64
totalCompactionDebt uint64
}
// compactionPacerEnv defines the environment in which the compaction rate limiter
// is applied.
type compactionPacerEnv struct {
limiter limiter
memTableSize uint64
getInfo func() compactionPacerInfo
}
// compactionPacer rate limits compactions depending on compaction debt. The rate
// limiter is applied at a rate that keeps compaction debt at a steady level. If
// compaction debt increases at a rate that is faster than the system can handle,
// no rate limit is applied.
type compactionPacer struct {
internalPacer
env compactionPacerEnv
totalCompactionDebt uint64
}
func newCompactionPacer(env compactionPacerEnv) *compactionPacer {
return &compactionPacer{
env: env,
internalPacer: internalPacer{
limiter: env.limiter,
},
}
}
// maybeThrottle slows down compactions to match memtable flush rate. The DB
// provides a compaction debt estimate and a slowdown threshold. We subtract the
// compaction debt estimate by the bytes iterated in the current compaction. If
// the new compaction debt estimate is below the threshold, the rate limiter is
// applied. If the new compaction debt is above the threshold, the rate limiter
// is not applied.
func (p *compactionPacer) maybeThrottle(bytesIterated uint64) error {
if bytesIterated == 0 {
return errors.New("pebble: maybeThrottle supplied with invalid bytesIterated")
}
// Recalculate total compaction debt and the slowdown threshold only once
// every 1000 iterations or when the refresh threshold is hit since it
// requires grabbing DB.mu which is expensive.
if p.iterCount == 0 || bytesIterated > p.refreshBytesThreshold {
pacerInfo := p.env.getInfo()
p.slowdownThreshold = pacerInfo.slowdownThreshold
p.totalCompactionDebt = pacerInfo.totalCompactionDebt
p.refreshBytesThreshold = bytesIterated + (p.env.memTableSize*5/100)
p.iterCount = 1000
}
p.iterCount--
var curCompactionDebt uint64
if p.totalCompactionDebt > bytesIterated {
curCompactionDebt = p.totalCompactionDebt - bytesIterated
}
compactAmount := bytesIterated - p.prevBytesIterated
p.prevBytesIterated = bytesIterated
// We slow down compactions when the compaction debt falls below the slowdown
// threshold, which is set dynamically based on the number of non-empty levels.
// This will only occur if compactions can keep up with the pace of flushes. If
// bytes are flushed faster than how fast compactions can occur, compactions
// proceed at maximum (unthrottled) speed.
return p.limit(compactAmount, curCompactionDebt)
}
// flushPacerInfo contains information necessary for compaction pacing.
type flushPacerInfo struct {
totalBytes uint64
}
// flushPacerEnv defines the environment in which the compaction rate limiter is
// applied.
type flushPacerEnv struct {
limiter limiter
memTableSize uint64
getInfo func() flushPacerInfo
}
// flushPacer rate limits memtable flushing to match the speed of incoming user
// writes. If user writes come in faster than the memtable can be flushed, no
// rate limit is applied.
type flushPacer struct {
internalPacer
env flushPacerEnv
totalBytes uint64
}
func newFlushPacer(env flushPacerEnv) *flushPacer {
return &flushPacer{
env: env,
internalPacer: internalPacer{
limiter: env.limiter,
slowdownThreshold: env.memTableSize*105/100,
},
}
}
// maybeThrottle slows down memtable flushing to match user write rate. The DB
// provides the total number of bytes in all the memtables. We subtract this total
// by the number of bytes flushed in the current flush to get a "dirty byte" count.
// If the dirty byte count is below the watermark (105% memtable size), the rate
// limiter is applied. If the dirty byte count is above the watermark, the rate
// limiter is not applied.
func (p *flushPacer) maybeThrottle(bytesIterated uint64) error {
if bytesIterated == 0 {
return errors.New("pebble: maybeThrottle supplied with invalid bytesIterated")
}
// Recalculate total memtable bytes only once every 1000 iterations or
// when the refresh threshold is hit since getting the total memtable
// byte count requires grabbing DB.mu which is expensive.
if p.iterCount == 0 || bytesIterated > p.refreshBytesThreshold {
pacerInfo := p.env.getInfo()
p.totalBytes = pacerInfo.totalBytes
p.refreshBytesThreshold = bytesIterated + (p.env.memTableSize*5/100)
p.iterCount = 1000
}
p.iterCount--
// dirtyBytes is the total number of bytes in the memtables minus the number of
// bytes flushed. It represents unflushed bytes in all the memtables, even the
// ones which aren't being flushed such as the mutable memtable.
dirtyBytes := p.totalBytes - bytesIterated
flushAmount := bytesIterated - p.prevBytesIterated
p.prevBytesIterated = bytesIterated
// We slow down memtable flushing when the dirty bytes indicator falls
// below the low watermark, which is 105% memtable size. This will only
// occur if memtable flushing can keep up with the pace of incoming
// writes. If writes come in faster than how fast the memtable can flush,
// flushing proceeds at maximum (unthrottled) speed.
return p.limit(flushAmount, dirtyBytes)
}
type noopPacer struct {}
func (p *noopPacer) maybeThrottle(_ uint64) error {
return nil
}