Skip to content

Commit

Permalink
Remove old rate-{controller,limiter} code
Browse files Browse the repository at this point in the history
This code was not actively being used and the mechanisms were
fragile. See #7 for the proposed direction for pacing user writes vs
flushes.
  • Loading branch information
petermattis committed Apr 15, 2019
1 parent 5c24615 commit 48e26ca
Show file tree
Hide file tree
Showing 7 changed files with 6 additions and 213 deletions.
9 changes: 0 additions & 9 deletions commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"sync"
"sync/atomic"
"unsafe"

"github.com/petermattis/pebble/internal/rate"
)

type commitQueueNode struct {
Expand Down Expand Up @@ -121,8 +119,6 @@ type commitEnv struct {
// The visible sequence number at which reads should be performed. Ratcheted
// upwards atomically as batches are applied to the memtable.
visibleSeqNum *uint64
// Controller for measuring and limiting the commit rate.
controller *controller

// Apply the batch to the specified memtable. Called concurrently.
apply func(b *Batch, mem *memTable) error
Expand Down Expand Up @@ -223,9 +219,6 @@ func newCommitPipeline(env commitEnv) *commitPipeline {
p := &commitPipeline{
env: env,
}
if p.env.controller == nil {
p.env.controller = newController(rate.NewLimiter(rate.Inf, 0))
}
p.cond.L = p.env.mu
p.pending.init()
p.syncer.cond.L = &p.syncer.Mutex
Expand Down Expand Up @@ -363,8 +356,6 @@ func (p *commitPipeline) prepare(b *Batch, writeWAL, syncWAL bool) (*memTable, e
}
b.commit.Add(count)

// p.env.controller.WaitN(len(b.data))

p.env.mu.Lock()

// Enqueue the batch in the pending queue. Note that while the pending queue
Expand Down
1 change: 0 additions & 1 deletion compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,6 @@ func (d *DB) writeLevel0Table(
if err != nil {
return fileMetadata{}, err
}
file = newRateLimitedFile(file, d.flushController)
tw = sstable.NewWriter(file, d.opts, d.opts.Level(0))

var count int
Expand Down
108 changes: 0 additions & 108 deletions controller.go

This file was deleted.

45 changes: 0 additions & 45 deletions controller_test.go

This file was deleted.

10 changes: 0 additions & 10 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,6 @@ type DB struct {
largeBatchThreshold int
optionsFileNum uint64

// Rate limiter for how much bandwidth to allow for commits, compactions, and
// flushes.
//
// TODO(peter): Add a controller module that balances the limits so that
// commits cannot happen faster than flushes and the backlog of compaction
// work does not grow too large.
commitController *controller
compactController *controller
flushController *controller

// TODO(peter): describe exactly what this mutex protects. So far: every
// field in the struct.
mu struct {
Expand Down
20 changes: 6 additions & 14 deletions open.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/petermattis/pebble/db"
"github.com/petermattis/pebble/internal/arenaskl"
"github.com/petermattis/pebble/internal/rate"
"github.com/petermattis/pebble/internal/record"
"github.com/petermattis/pebble/storage"
)
Expand Down Expand Up @@ -55,20 +54,14 @@ func createDB(dirname string, opts *db.Options) (retErr error) {

// Open opens a LevelDB whose files live in the given directory.
func Open(dirname string, opts *db.Options) (*DB, error) {
const defaultRateLimit = rate.Limit(50 << 20) // 50 MB/sec
const defaultBurst = 1 << 20 // 1 MB

opts = opts.EnsureDefaults()
d := &DB{
dirname: dirname,
opts: opts,
cmp: opts.Comparer.Compare,
equal: opts.Comparer.Equal,
merge: opts.Merger.Merge,
abbreviatedKey: opts.Comparer.AbbreviatedKey,
commitController: newController(rate.NewLimiter(defaultRateLimit, defaultBurst)),
compactController: newController(rate.NewLimiter(defaultRateLimit, defaultBurst)),
flushController: newController(rate.NewLimiter(rate.Inf, defaultBurst)),
dirname: dirname,
opts: opts,
cmp: opts.Comparer.Compare,
equal: opts.Comparer.Equal,
merge: opts.Merger.Merge,
abbreviatedKey: opts.Comparer.AbbreviatedKey,
}
if d.equal == nil {
d.equal = bytes.Equal
Expand All @@ -83,7 +76,6 @@ func Open(dirname string, opts *db.Options) (*DB, error) {
mu: &d.mu.Mutex,
logSeqNum: &d.mu.versions.logSeqNum,
visibleSeqNum: &d.mu.versions.visibleSeqNum,
controller: d.commitController,
apply: d.commitApply,
sync: d.commitSync,
write: d.commitWrite,
Expand Down
26 changes: 0 additions & 26 deletions rate_limited_file.go

This file was deleted.

0 comments on commit 48e26ca

Please sign in to comment.