Skip to content

Commit

Permalink
db: remove compaction/flush pacing
Browse files Browse the repository at this point in the history
Compaction and flush pacing have been disabled for several years. Remove the
code to avoid maintaining dead code. If or when we attempt to re-enable pacing,
we can revert this commit.

Informs cockroachdb#687.
  • Loading branch information
jbowens committed Jul 15, 2022
1 parent b821342 commit 9f3691a
Show file tree
Hide file tree
Showing 11 changed files with 12 additions and 620 deletions.
55 changes: 3 additions & 52 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1378,35 +1378,6 @@ func (d *DB) removeInProgressCompaction(c *compaction) {
d.mu.versions.currentVersion().L0Sublevels.InitCompactingFileInfo(l0InProgress)
}

func (d *DB) getCompactionPacerInfo() compactionPacerInfo {
d.mu.Lock()

estimatedMaxWAmp := d.mu.versions.picker.getEstimatedMaxWAmp()
pacerInfo := compactionPacerInfo{
slowdownThreshold: uint64(estimatedMaxWAmp * float64(d.opts.MemTableSize)),
// TODO(jackson): bytesFlushed is no longer maintained. To re-enable
// pacing, we'll need to restructure the code to produce a current
// `bytesFlushed` total.
//totalCompactionDebt: d.mu.versions.picker.estimatedCompactionDebt(bytesFlushed),
}
for _, m := range d.mu.mem.queue {
pacerInfo.totalDirtyBytes += m.inuseBytes()
}
d.mu.Unlock()

return pacerInfo
}

func (d *DB) getFlushPacerInfo() flushPacerInfo {
var pacerInfo flushPacerInfo
d.mu.Lock()
for _, m := range d.mu.mem.queue {
pacerInfo.inuseBytes += m.inuseBytes()
}
d.mu.Unlock()
return pacerInfo
}

func (d *DB) calculateDiskAvailableBytes() uint64 {
if space, err := d.opts.FS.GetDiskUsage(d.dirname); err == nil {
atomic.StoreUint64(&d.atomic.diskAvailBytes, space.AvailBytes)
Expand Down Expand Up @@ -1598,17 +1569,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
})
startTime := d.timeNow()

flushPacer := (pacer)(nilPacer)
if d.opts.private.enablePacing {
// TODO(peter): Flush pacing is disabled until we figure out why it impacts
// throughput.
flushPacer = newFlushPacer(flushPacerEnv{
limiter: d.flushLimiter,
memTableSize: uint64(d.opts.MemTableSize),
getInfo: d.getFlushPacerInfo,
})
}
ve, pendingOutputs, err := d.runCompaction(jobID, c, flushPacer)
ve, pendingOutputs, err := d.runCompaction(jobID, c)

info := FlushInfo{
JobID: jobID,
Expand Down Expand Up @@ -2112,17 +2073,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
d.opts.EventListener.CompactionBegin(info)
startTime := d.timeNow()

compactionPacer := (pacer)(nilPacer)
if d.opts.private.enablePacing {
// TODO(peter): Compaction pacing is disabled until we figure out why it
// impacts throughput.
compactionPacer = newCompactionPacer(compactionPacerEnv{
limiter: d.compactionLimiter,
memTableSize: uint64(d.opts.MemTableSize),
getInfo: d.getCompactionPacerInfo,
})
}
ve, pendingOutputs, err := d.runCompaction(jobID, c, compactionPacer)
ve, pendingOutputs, err := d.runCompaction(jobID, c)

info.Duration = d.timeNow().Sub(startTime)
if err == nil {
Expand Down Expand Up @@ -2173,7 +2124,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
// d.mu must be held when calling this, but the mutex may be dropped and
// re-acquired during the course of this method.
func (d *DB) runCompaction(
jobID int, c *compaction, pacer pacer,
jobID int, c *compaction,
) (ve *versionEdit, pendingOutputs []*fileMetadata, retErr error) {
// As a sanity check, confirm that the smallest / largest keys for new and
// deleted files in the new versionEdit pass a validation function before
Expand Down
1 change: 0 additions & 1 deletion compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,7 +936,6 @@ func TestCompaction(t *testing.T) {
DebugCheck: DebugCheckLevels,
L0CompactionThreshold: 8,
}
opts.private.enablePacing = true
opts.testingRandomized()
d, err := Open("", opts)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) {
// to the user-defined boundaries.
c.maxOutputFileSize = math.MaxUint64

newVE, _, err := d.runCompaction(0, c, nilPacer)
newVE, _, err := d.runCompaction(0, c)
if err != nil {
return err
}
Expand Down
4 changes: 1 addition & 3 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,7 @@ type DB struct {
closed *atomic.Value
closedCh chan struct{}

compactionLimiter limiter
flushLimiter limiter
deletionLimiter limiter
deletionLimiter limiter

// Async deletion jobs spawned by cleaners increment this WaitGroup, and
// call Done when completed. Once `d.mu.cleaning` is false, the db.Close()
Expand Down
8 changes: 1 addition & 7 deletions open.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,6 @@ func Open(dirname string, opts *Options) (db *DB, _ error) {
apply: d.commitApply,
write: d.commitWrite,
})
d.compactionLimiter = rate.NewLimiter(
rate.Limit(d.opts.private.minCompactionRate),
d.opts.private.minCompactionRate)
d.flushLimiter = rate.NewLimiter(
rate.Limit(d.opts.private.minFlushRate),
d.opts.private.minFlushRate)
d.deletionLimiter = rate.NewLimiter(
rate.Limit(d.opts.Experimental.MinDeletionRate),
d.opts.Experimental.MinDeletionRate)
Expand Down Expand Up @@ -707,7 +701,7 @@ func (d *DB) replayWAL(
if !d.opts.ReadOnly {
c := newFlush(d.opts, d.mu.versions.currentVersion(),
1 /* base level */, toFlush)
newVE, _, err := d.runCompaction(jobID, c, nilPacer)
newVE, _, err := d.runCompaction(jobID, c)
if err != nil {
return 0, err
}
Expand Down
29 changes: 4 additions & 25 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,24 +715,9 @@ type Options struct {
// RocksDB 6.2.1.
strictWALTail bool

// TODO(peter): A private option to enable flush/compaction pacing. Only used
// by tests. Compaction/flush pacing is disabled until we fix the impact on
// throughput.
enablePacing bool

// A private option to disable stats collection.
disableTableStats bool

// minCompactionRate sets the minimum rate at which compactions occur. The
// default is 4 MB/s. Currently disabled as this option has no effect while
// private.enablePacing is false.
minCompactionRate int

// minFlushRate sets the minimum rate at which the MemTables are flushed. The
// default is 1 MB/s. Currently disabled as this option has no effect while
// private.enablePacing is false.
minFlushRate int

// fsCloser holds a closer that should be invoked after a DB using these
// Options is closed. This is used to automatically stop the
// long-running goroutine associated with the disk-health-checking FS.
Expand Down Expand Up @@ -847,12 +832,6 @@ func (o *Options) EnsureDefaults() *Options {
o.Merger = DefaultMerger
}
o.private.strictWALTail = true
if o.private.minCompactionRate == 0 {
o.private.minCompactionRate = 4 << 20 // 4 MB/s
}
if o.private.minFlushRate == 0 {
o.private.minFlushRate = 1 << 20 // 1 MB/s
}
if o.MaxConcurrentCompactions <= 0 {
o.MaxConcurrentCompactions = 1
}
Expand Down Expand Up @@ -973,9 +952,7 @@ func (o *Options) String() string {
fmt.Fprintf(&buf, " max_open_files=%d\n", o.MaxOpenFiles)
fmt.Fprintf(&buf, " mem_table_size=%d\n", o.MemTableSize)
fmt.Fprintf(&buf, " mem_table_stop_writes_threshold=%d\n", o.MemTableStopWritesThreshold)
fmt.Fprintf(&buf, " min_compaction_rate=%d\n", o.private.minCompactionRate)
fmt.Fprintf(&buf, " min_deletion_rate=%d\n", o.Experimental.MinDeletionRate)
fmt.Fprintf(&buf, " min_flush_rate=%d\n", o.private.minFlushRate)
fmt.Fprintf(&buf, " merger=%s\n", o.Merger.Name)
fmt.Fprintf(&buf, " read_compaction_rate=%d\n", o.Experimental.ReadCompactionRate)
fmt.Fprintf(&buf, " read_sampling_multiplier=%d\n", o.Experimental.ReadSamplingMultiplier)
Expand Down Expand Up @@ -1175,11 +1152,13 @@ func (o *Options) Parse(s string, hooks *ParseHooks) error {
case "mem_table_stop_writes_threshold":
o.MemTableStopWritesThreshold, err = strconv.Atoi(value)
case "min_compaction_rate":
o.private.minCompactionRate, err = strconv.Atoi(value)
// Do nothing; option existed in older versions of pebble, and
// may be meaningful again eventually.
case "min_deletion_rate":
o.Experimental.MinDeletionRate, err = strconv.Atoi(value)
case "min_flush_rate":
o.private.minFlushRate, err = strconv.Atoi(value)
// Do nothing; option existed in older versions of pebble, and
// may be meaningful again eventually.
case "strict_wal_tail":
o.private.strictWALTail, err = strconv.ParseBool(value)
case "merger":
Expand Down
2 changes: 0 additions & 2 deletions options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,7 @@ func TestOptionsString(t *testing.T) {
max_open_files=1000
mem_table_size=4194304
mem_table_stop_writes_threshold=2
min_compaction_rate=4194304
min_deletion_rate=0
min_flush_rate=1048576
merger=pebble.concatenate
read_compaction_rate=16000
read_sampling_multiplier=16
Expand Down
Loading

0 comments on commit 9f3691a

Please sign in to comment.