Skip to content

Commit

Permalink
db: use function for Experimental.DisableIngestAsFlushable
Browse files Browse the repository at this point in the history
Convert the Experimental.DisableIngestAsFlushable setting to a function,
allowing the client to disable ingesting sstables as flushable at runtime. This
will be used to tie the setting to a cluster setting in CockroachDB.

This will be backported to 23.1.

Informs cockroachdb/cockroach#97194.
  • Loading branch information
jbowens committed Mar 16, 2023
1 parent 81d8d3e commit d44c908
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 6 deletions.
2 changes: 1 addition & 1 deletion ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ func (d *DB) ingest(
if ingestMemtableOverlaps(d.cmp, m, meta) {
if (len(d.mu.mem.queue) > d.opts.MemTableStopWritesThreshold-1) ||
d.mu.formatVers.vers < FormatFlushableIngest ||
d.opts.Experimental.DisableIngestAsFlushable {
d.opts.Experimental.DisableIngestAsFlushable() {
mem = m
if mem.flushable == d.mu.mem.mutable {
err = d.makeRoomForWrite(nil)
Expand Down
4 changes: 3 additions & 1 deletion internal/metamorphic/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,9 @@ func randomOptions(rng *rand.Rand) *testOptions {
opts.Experimental.MaxWriterConcurrency = 2
opts.Experimental.ForceWriterParallelism = true
}
opts.Experimental.DisableIngestAsFlushable = rng.Intn(2) == 0
if rng.Intn(2) == 0 {
opts.Experimental.DisableIngestAsFlushable = func() bool { return true }
}
var lopts pebble.LevelOptions
lopts.BlockRestartInterval = 1 + rng.Intn(64) // 1 - 64
lopts.BlockSize = 1 << uint(rng.Intn(24)) // 1 - 16MB
Expand Down
5 changes: 5 additions & 0 deletions internal/metamorphic/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func TestOptionsRoundtrip(t *testing.T) {
"EventListener:",
"MaxConcurrentCompactions:",
"Experimental.EnableValueBlocks:",
"Experimental.DisableIngestAsFlushable:",
// Floating points
"Experimental.PointTombstoneWeight:",
}
Expand All @@ -91,6 +92,10 @@ func TestOptionsRoundtrip(t *testing.T) {
if o.opts.Experimental.EnableValueBlocks != nil {
require.Equal(t, o.opts.Experimental.EnableValueBlocks(), parsed.opts.Experimental.EnableValueBlocks())
}
require.Equal(t, o.opts.Experimental.DisableIngestAsFlushable == nil, parsed.opts.Experimental.DisableIngestAsFlushable == nil)
if o.opts.Experimental.DisableIngestAsFlushable != nil {
require.Equal(t, o.opts.Experimental.DisableIngestAsFlushable(), parsed.opts.Experimental.DisableIngestAsFlushable())
}
require.Equal(t, o.opts.MaxConcurrentCompactions(), parsed.opts.MaxConcurrentCompactions())

diff := pretty.Diff(o.opts, parsed.opts)
Expand Down
15 changes: 11 additions & 4 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ type Options struct {
// DisableIngestAsFlushable disables lazy ingestion of sstables through
// a WAL write and memtable rotation. Only effectual if the the format
// major version is at least `FormatFlushableIngest`.
DisableIngestAsFlushable bool
DisableIngestAsFlushable func() bool

// SharedStorage is a second FS-like storage medium that can be shared
// between multiple Pebble instances. It is used to store sstables only, and
Expand Down Expand Up @@ -901,6 +901,9 @@ func (o *Options) EnsureDefaults() *Options {
if o.Comparer == nil {
o.Comparer = DefaultComparer
}
if o.Experimental.DisableIngestAsFlushable == nil {
o.Experimental.DisableIngestAsFlushable = func() bool { return false }
}
if o.Experimental.L0CompactionConcurrency <= 0 {
o.Experimental.L0CompactionConcurrency = 10
}
Expand Down Expand Up @@ -1124,8 +1127,8 @@ func (o *Options) String() string {
fmt.Fprintf(&buf, " compaction_debt_concurrency=%d\n", o.Experimental.CompactionDebtConcurrency)
fmt.Fprintf(&buf, " comparer=%s\n", o.Comparer.Name)
fmt.Fprintf(&buf, " disable_wal=%t\n", o.DisableWAL)
if o.Experimental.DisableIngestAsFlushable {
fmt.Fprintf(&buf, " disable_ingest_as_flushable=%t\n", o.Experimental.DisableIngestAsFlushable)
if o.Experimental.DisableIngestAsFlushable != nil && o.Experimental.DisableIngestAsFlushable() {
fmt.Fprintf(&buf, " disable_ingest_as_flushable=%t\n", true)
}
fmt.Fprintf(&buf, " flush_delay_delete_range=%s\n", o.FlushDelayDeleteRange)
fmt.Fprintf(&buf, " flush_delay_range_key=%s\n", o.FlushDelayRangeKey)
Expand Down Expand Up @@ -1329,7 +1332,11 @@ func (o *Options) Parse(s string, hooks *ParseHooks) error {
case "disable_elision_only_compactions":
o.private.disableElisionOnlyCompactions, err = strconv.ParseBool(value)
case "disable_ingest_as_flushable":
o.Experimental.DisableIngestAsFlushable, err = strconv.ParseBool(value)
var v bool
v, err = strconv.ParseBool(value)
if err == nil {
o.Experimental.DisableIngestAsFlushable = func() bool { return v }
}
case "disable_lazy_combined_iteration":
o.private.disableLazyCombinedIteration, err = strconv.ParseBool(value)
case "disable_wal":
Expand Down

0 comments on commit d44c908

Please sign in to comment.