Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sstable: make Category an enum #4119

Merged
merged 1 commit into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,11 +733,8 @@ func (c *compaction) newInputIters(
}
}()
iterOpts := IterOptions{
CategoryAndQoS: sstable.CategoryAndQoS{
Category: "pebble-compaction",
QoSLevel: sstable.NonLatencySensitiveQoSLevel,
},
logger: c.logger,
Category: categoryCompaction,
logger: c.logger,
}

// Populate iters, rangeDelIters and rangeKeyIters with the appropriate
Expand Down Expand Up @@ -1246,11 +1243,8 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
comparer: d.opts.Comparer,
newIters: d.newIters,
opts: IterOptions{
logger: d.opts.Logger,
CategoryAndQoS: sstable.CategoryAndQoS{
Category: "pebble-ingest",
QoSLevel: sstable.LatencySensitiveQoSLevel,
},
logger: d.opts.Logger,
Category: categoryIngest,
},
v: c.version,
}
Expand Down
13 changes: 5 additions & 8 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,10 +579,7 @@ func (d *DB) getInternal(key []byte, b *Batch, s *Snapshot) ([]byte, io.Closer,
snapshot: seqNum,
iterOpts: IterOptions{
// TODO(sumeer): replace with a parameter provided by the caller.
CategoryAndQoS: sstable.CategoryAndQoS{
Category: "pebble-get",
QoSLevel: sstable.LatencySensitiveQoSLevel,
},
Category: categoryGet,
logger: d.opts.Logger,
snapshotForHideObsoletePoints: seqNum,
},
Expand Down Expand Up @@ -1262,7 +1259,7 @@ func finishInitializingIter(ctx context.Context, buf *iterAlloc) *Iterator {
// iteration is invalid in those cases.
func (d *DB) ScanInternal(
ctx context.Context,
categoryAndQoS sstable.CategoryAndQoS,
category sstable.Category,
lower, upper []byte,
visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
visitRangeDel func(start, end []byte, seqNum SeqNum) error,
Expand All @@ -1271,7 +1268,7 @@ func (d *DB) ScanInternal(
visitExternalFile func(sst *ExternalFile) error,
) error {
scanInternalOpts := &scanInternalOptions{
CategoryAndQoS: categoryAndQoS,
category: category,
visitPointKey: visitPointKey,
visitRangeDel: visitRangeDel,
visitRangeKey: visitRangeKey,
Expand Down Expand Up @@ -1369,7 +1366,7 @@ func finishInitializingInternalIter(
}
i.initializeBoundBufs(i.opts.LowerBound, i.opts.UpperBound)

if err := i.constructPointIter(i.opts.CategoryAndQoS, memtables, buf); err != nil {
if err := i.constructPointIter(i.opts.category, memtables, buf); err != nil {
return nil, err
}

Expand Down Expand Up @@ -1411,7 +1408,7 @@ func (i *Iterator) constructPointIter(
if collector := i.tc.dbOpts.sstStatsCollector; collector != nil {
internalOpts.iterStatsAccumulator = collector.Accumulator(
uint64(uintptr(unsafe.Pointer(i))),
i.opts.CategoryAndQoS,
i.opts.Category,
)
}
if i.opts.RangeKeyMasking.Filter != nil {
Expand Down
6 changes: 1 addition & 5 deletions get_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/testkeys"
"github.com/cockroachdb/pebble/sstable"
)

func TestGetIter(t *testing.T) {
Expand Down Expand Up @@ -455,10 +454,7 @@ func TestGetIter(t *testing.T) {
get.version = v
get.snapshot = ikey.SeqNum() + 1
get.iterOpts = IterOptions{
CategoryAndQoS: sstable.CategoryAndQoS{
Category: "pebble-get",
QoSLevel: sstable.LatencySensitiveQoSLevel,
},
Category: categoryGet,
logger: testLogger{t},
snapshotForHideObsoletePoints: get.snapshot,
}
Expand Down
14 changes: 4 additions & 10 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1752,11 +1752,8 @@ func (d *DB) excise(
}
var err error
iters, err = d.newIters(ctx, m, &IterOptions{
CategoryAndQoS: sstable.CategoryAndQoS{
Category: "pebble-ingest",
QoSLevel: sstable.LatencySensitiveQoSLevel,
},
layer: manifest.Level(level),
Category: categoryIngest,
layer: manifest.Level(level),
}, internalIterOpts{}, iterPointKeys|iterRangeDeletions|iterRangeKeys)
itersLoaded = true
return err
Expand Down Expand Up @@ -2154,11 +2151,8 @@ func (d *DB) ingestApply(
comparer: d.opts.Comparer,
newIters: d.newIters,
opts: IterOptions{
logger: d.opts.Logger,
CategoryAndQoS: sstable.CategoryAndQoS{
Category: "pebble-ingest",
QoSLevel: sstable.LatencySensitiveQoSLevel,
},
logger: d.opts.Logger,
Category: categoryIngest,
},
v: current,
}
Expand Down
13 changes: 5 additions & 8 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1133,7 +1133,7 @@ func testIngestSharedImpl(
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), writeOpts)

var sharedSSTs []SharedSSTMeta
err = from.ScanInternal(context.TODO(), sstable.CategoryAndQoS{}, startKey, endKey,
err = from.ScanInternal(context.TODO(), sstable.CategoryUnknown, startKey, endKey,
func(key *InternalKey, value LazyValue, _ IteratorLevel) error {
val, _, err := value.Value(nil)
require.NoError(t, err)
Expand Down Expand Up @@ -1634,7 +1634,7 @@ func TestConcurrentExcise(t *testing.T) {
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), writeOpts)

var sharedSSTs []SharedSSTMeta
err = from.ScanInternal(context.TODO(), sstable.CategoryAndQoS{}, startKey, endKey,
err = from.ScanInternal(context.TODO(), sstable.CategoryUnknown, startKey, endKey,
func(key *InternalKey, value LazyValue, _ IteratorLevel) error {
val, _, err := value.Value(nil)
require.NoError(t, err)
Expand Down Expand Up @@ -2071,7 +2071,7 @@ func TestIngestExternal(t *testing.T) {
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), writeOpts)

var externalFiles []ExternalFile
err = from.ScanInternal(context.TODO(), sstable.CategoryAndQoS{}, startKey, endKey,
err = from.ScanInternal(context.TODO(), sstable.CategoryUnknown, startKey, endKey,
func(key *InternalKey, value LazyValue, _ IteratorLevel) error {
val, _, err := value.Value(nil)
require.NoError(t, err)
Expand Down Expand Up @@ -2356,11 +2356,8 @@ func TestIngestTargetLevel(t *testing.T) {
comparer: d.opts.Comparer,
newIters: d.newIters,
opts: IterOptions{
logger: d.opts.Logger,
CategoryAndQoS: sstable.CategoryAndQoS{
Category: "pebble-ingest",
QoSLevel: sstable.LatencySensitiveQoSLevel,
},
logger: d.opts.Logger,
Category: categoryIngest,
},
v: d.mu.versions.currentVersion(),
}
Expand Down
2 changes: 1 addition & 1 deletion level_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (l *levelIter) init(
l.tableOpts.PointKeyFilters = l.filtersBuf[:0:1]
}
l.tableOpts.UseL6Filters = opts.UseL6Filters
l.tableOpts.CategoryAndQoS = opts.CategoryAndQoS
l.tableOpts.Category = opts.Category
l.tableOpts.layer = l.layer
l.tableOpts.snapshotForHideObsoletePoints = opts.snapshotForHideObsoletePoints
l.comparer = comparer
Expand Down
4 changes: 2 additions & 2 deletions metamorphic/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -1914,7 +1914,7 @@ func (r *replicateOp) runSharedReplicate(
) {
var sharedSSTs []pebble.SharedSSTMeta
var err error
err = source.ScanInternal(context.TODO(), sstable.CategoryAndQoS{}, r.start, r.end,
err = source.ScanInternal(context.TODO(), sstable.CategoryUnknown, r.start, r.end,
func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error {
val, _, err := value.Value(nil)
if err != nil {
Expand Down Expand Up @@ -1977,7 +1977,7 @@ func (r *replicateOp) runExternalReplicate(
) {
var externalSSTs []pebble.ExternalFile
var err error
err = source.ScanInternal(context.TODO(), sstable.CategoryAndQoS{}, r.start, r.end,
err = source.ScanInternal(context.TODO(), sstable.CategoryUnknown, r.start, r.end,
func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error {
val, _, err := value.Value(nil)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ func (m *LevelMetrics) WriteAmp() float64 {
return float64(m.BytesFlushed+m.BytesCompacted) / float64(m.BytesIn)
}

var categoryCompaction = sstable.RegisterCategory("pebble-compaction", sstable.NonLatencySensitiveQoSLevel)
var categoryIngest = sstable.RegisterCategory("pebble-ingest", sstable.LatencySensitiveQoSLevel)
var categoryGet = sstable.RegisterCategory("pebble-get", sstable.LatencySensitiveQoSLevel)

// Metrics holds metrics for various subsystems of the DB such as the Cache,
// Compactions, WAL, and per-Level metrics.
//
Expand Down
22 changes: 12 additions & 10 deletions metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ func exampleMetrics() Metrics {
return m
}

func init() {
// Register some categories for the purposes of the test.
sstable.RegisterCategory("a", sstable.NonLatencySensitiveQoSLevel)
sstable.RegisterCategory("b", sstable.LatencySensitiveQoSLevel)
sstable.RegisterCategory("c", sstable.NonLatencySensitiveQoSLevel)
}

func TestMetrics(t *testing.T) {
if runtime.GOARCH == "386" {
t.Skip("skipped on 32-bit due to slightly varied output")
Expand Down Expand Up @@ -306,18 +313,13 @@ func TestMetrics(t *testing.T) {
return err.Error()
}
}
var categoryAndQoS sstable.CategoryAndQoS
category := sstable.CategoryUnknown
if td.HasArg("category") {
var s string
td.ScanArgs(t, "category", &s)
categoryAndQoS.Category = sstable.Category(s)
}
if td.HasArg("qos") {
var qos string
td.ScanArgs(t, "qos", &qos)
categoryAndQoS.QoSLevel = sstable.StringToQoSForTesting(qos)
category = sstable.StringToCategoryForTesting(s)
}
iter, _ := d.NewIter(&IterOptions{CategoryAndQoS: categoryAndQoS})
iter, _ := d.NewIter(&IterOptions{Category: category})
// Some iterators (eg. levelIter) do not instantiate the underlying
// iterator until the first positioning call. Position the iterator
// so that levelIters will have loaded an sstable.
Expand All @@ -342,7 +344,7 @@ func TestMetrics(t *testing.T) {
m.TableCache = cache.Metrics{}
m.BlockCache = cache.Metrics{}
// Empirically, the unknown stats are also non-deterministic.
if len(m.CategoryStats) > 0 && m.CategoryStats[0].Category == "_unknown" {
if len(m.CategoryStats) > 0 && m.CategoryStats[0].Category == sstable.CategoryUnknown {
m.CategoryStats[0].CategoryStats = sstable.CategoryStats{}
}
}
Expand All @@ -352,7 +354,7 @@ func TestMetrics(t *testing.T) {
fmt.Fprintf(&buf, "Iter category stats:\n")
for _, stats := range m.CategoryStats {
fmt.Fprintf(&buf, "%20s, %11s: %+v\n", stats.Category,
redact.StringWithoutMarkers(stats.QoSLevel), stats.CategoryStats)
redact.StringWithoutMarkers(stats.Category.QoSLevel()), stats.CategoryStats)
}
}
return buf.String()
Expand Down
7 changes: 4 additions & 3 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,9 @@ type IterOptions struct {
// existing is not low or if we just expect a one-time Seek (where loading the
// data block directly is better).
UseL6Filters bool
// CategoryAndQoS is used for categorized iterator stats. This should not be
// Category is used for categorized iterator stats. This should not be
// changed by calling SetOptions.
sstable.CategoryAndQoS
Category sstable.Category

DebugRangeKeyStack bool

Expand Down Expand Up @@ -261,9 +261,10 @@ func (o *IterOptions) SpanIterOptions() keyspan.SpanIterOptions {
// scanInternalOptions is similar to IterOptions, meant for use with
// scanInternalIterator.
type scanInternalOptions struct {
sstable.CategoryAndQoS
IterOptions

category sstable.Category

visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error
visitRangeDel func(start, end []byte, seqNum SeqNum) error
visitRangeKey func(start, end []byte, keys []rangekey.Key) error
Expand Down
4 changes: 2 additions & 2 deletions scan_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ func (opts *scanInternalOptions) skipLevelForOpts() int {

// constructPointIter constructs a merging iterator and sets i.iter to it.
func (i *scanInternalIterator) constructPointIter(
categoryAndQoS sstable.CategoryAndQoS, memtables flushableList, buf *iterAlloc,
category sstable.Category, memtables flushableList, buf *iterAlloc,
) error {
// Merging levels and levels from iterAlloc.
mlevels := buf.mlevels[:0]
Expand Down Expand Up @@ -897,7 +897,7 @@ func (i *scanInternalIterator) constructPointIter(
levels = levels[:numLevelIters]
rangeDelLevels = rangeDelLevels[:numLevelIters]
i.opts.IterOptions.snapshotForHideObsoletePoints = i.seqNum
i.opts.IterOptions.CategoryAndQoS = categoryAndQoS
i.opts.IterOptions.Category = category
addLevelIterForFiles := func(files manifest.LevelIterator, level manifest.Layer) {
li := &levels[levelsIndex]
rli := &rangeDelLevels[levelsIndex]
Expand Down
4 changes: 2 additions & 2 deletions scan_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func TestScanInternal(t *testing.T) {
type scanInternalReader interface {
ScanInternal(
ctx context.Context,
categoryAndQoS sstable.CategoryAndQoS,
category sstable.Category,
lower, upper []byte,
visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
visitRangeDel func(start, end []byte, seqNum base.SeqNum) error,
Expand Down Expand Up @@ -561,7 +561,7 @@ func TestScanInternal(t *testing.T) {
}
}
}
err := reader.ScanInternal(context.TODO(), sstable.CategoryAndQoS{}, lower, upper,
err := reader.ScanInternal(context.TODO(), sstable.CategoryUnknown, lower, upper,
func(key *InternalKey, value LazyValue, _ IteratorLevel) error {
v := value.InPlaceValue()
fmt.Fprintf(&b, "%s (%s)\n", key, v)
Expand Down
8 changes: 4 additions & 4 deletions snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (s *Snapshot) NewIterWithContext(ctx context.Context, o *IterOptions) (*Ite
// point keys deleted by range dels and keys masked by range keys.
func (s *Snapshot) ScanInternal(
ctx context.Context,
categoryAndQoS sstable.CategoryAndQoS,
category sstable.Category,
lower, upper []byte,
visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
visitRangeDel func(start, end []byte, seqNum base.SeqNum) error,
Expand All @@ -87,7 +87,7 @@ func (s *Snapshot) ScanInternal(
panic(ErrClosed)
}
scanInternalOpts := &scanInternalOptions{
CategoryAndQoS: categoryAndQoS,
category: category,
visitPointKey: visitPointKey,
visitRangeDel: visitRangeDel,
visitRangeKey: visitRangeKey,
Expand Down Expand Up @@ -473,7 +473,7 @@ func (es *EventuallyFileOnlySnapshot) NewIterWithContext(
// point keys deleted by range dels and keys masked by range keys.
func (es *EventuallyFileOnlySnapshot) ScanInternal(
ctx context.Context,
categoryAndQoS sstable.CategoryAndQoS,
category sstable.Category,
lower, upper []byte,
visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
visitRangeDel func(start, end []byte, seqNum base.SeqNum) error,
Expand All @@ -486,7 +486,7 @@ func (es *EventuallyFileOnlySnapshot) ScanInternal(
}
var sOpts snapshotIterOpts
opts := &scanInternalOptions{
CategoryAndQoS: categoryAndQoS,
category: category,
IterOptions: IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
LowerBound: lower,
Expand Down
Loading
Loading