From dc8b5b2ea202f4f6a2d04aba632c2a1031ce13d9 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Mon, 13 May 2024 11:44:47 -0400 Subject: [PATCH] db: add SharedLowerUserKeyPrefix and WriteSharedWithStrictObsolete options SharedLowerUserKeyPrefix, if specified, is an additional lower bound on constraint on key prefixes that should be written to shared files. It applies only when CreateOnShared permits some shared file creation. It will be used by CockroachDB to exclude keys below TableDataMin from shared files, for both correctness (they can contain MERGEs for which the obsolete bit does not work) and performance reasons (low latency is more important and the data volume is tiny). WriteSharedWithStrictObsolete, when true, causes shared files to be written with WriterOptions.IsStrictObsolete set to true. This adds an extra measure of configuration protection to accidentally sharing files where the MERGE could become visible (we currently share such files, but file virtualization hides these MERGEs). The enforcement of SharedLowerUserKeyPrefix changes how PreferSharedStorage is computed during flushes and compactions. It will only be set if the next key to be written by compact.Runner permits writing to shared storage. compact.Runner optimizes this computation for when a compaction is fully within the shared or unshared bounds. Additionally compact.Runner uses the existing OutputSplitter to decide when a split should happen when transitioning from unshared to shared. While here, we do a tiny optimization in OutputSplitter to remove a key comparison on each iteration key. Fixes #2756 --- compaction.go | 20 ++++++++-- db.go | 11 +++++ internal/compact/run.go | 75 +++++++++++++++++++++++++++++++---- internal/compact/splitting.go | 19 +++++++-- options.go | 12 ++++++ 5 files changed, 123 insertions(+), 14 deletions(-) diff --git a/compaction.go b/compaction.go index 0389f514b82..f26413992c9 100644 --- a/compaction.go +++ b/compaction.go @@ -2518,14 +2518,23 @@ func (d *DB) compactAndWrite( MaxGrandparentOverlapBytes: c.maxOverlapBytes, TargetOutputFileSize: c.maxOutputFileSize, } + considerCreateShared := remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level) + if considerCreateShared { + runnerCfg.ConsiderCreateShared = true + runnerCfg.SharedLowerUserKeyPrefix = d.opts.Experimental.SharedLowerUserKeyPrefix + } runner := compact.NewRunner(runnerCfg, iter) - for runner.MoreDataToWrite() { + for { + moreData, keyShouldBeWrittenToShared := runner.MoreDataToWrite() + if !moreData { + break + } if c.cancel.Load() { return runner.Finish().WithError(ErrCancelledCompaction) } // Create a new table. writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level, tableFormat) - objMeta, tw, cpuWorkHandle, err := d.newCompactionOutput(jobID, c, writerOpts) + objMeta, tw, cpuWorkHandle, err := d.newCompactionOutput(jobID, c, writerOpts, keyShouldBeWrittenToShared) if err != nil { return runner.Finish().WithError(err) } @@ -2652,7 +2661,7 @@ func (c *compaction) makeVersionEdit(result compact.Result) (*versionEdit, error // newCompactionOutput creates an object for a new table produced by a // compaction or flush. func (d *DB) newCompactionOutput( - jobID JobID, c *compaction, writerOpts sstable.WriterOptions, + jobID JobID, c *compaction, writerOpts sstable.WriterOptions, preferSharedStorage bool, ) (objstorage.ObjectMetadata, *sstable.Writer, CPUWorkHandle, error) { d.mu.Lock() diskFileNum := d.mu.versions.getNextDiskFileNum() @@ -2689,7 +2698,7 @@ func (d *DB) newCompactionOutput( // Prefer shared storage if present. createOpts := objstorage.CreateOptions{ - PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level), + PreferSharedStorage: preferSharedStorage, WriteCategory: writeCategory, } writable, objMeta, err := d.objProvider.Create(ctx, fileTypeTable, diskFileNum, createOpts) @@ -2721,6 +2730,9 @@ func (d *DB) newCompactionOutput( d.opts.Experimental.MaxWriterConcurrency > 0 && (cpuWorkHandle.Permitted() || d.opts.Experimental.ForceWriterParallelism) + if d.opts.Experimental.WriteSharedWithStrictObsolete && objMeta.IsShared() { + writerOpts.IsStrictObsolete = true + } tw := sstable.NewWriter(writable, writerOpts, cacheOpts) return objMeta, tw, cpuWorkHandle, nil } diff --git a/db.go b/db.go index 34c4e443196..a08850ec210 100644 --- a/db.go +++ b/db.go @@ -1250,6 +1250,17 @@ func finishInitializingIter(ctx context.Context, buf *iterAlloc) *Iterator { // creator ID was set (as creator IDs are necessary to enable shared storage) // resulting in some lower level SSTs being on non-shared storage. Skip-shared // iteration is invalid in those cases. +// +// The above error handling implies that ScanInternal with a non-nil +// VisitSharedFile can only be called without error if both the following +// conditions are true: +// +// - All files in the LSM conform to remote.CreateOnSharedLower strategy (they +// can of course conform to the stronger remote.CreateOnSharedAll). +// +// - If Options.Experimental.SharedLowerUserKeyPrefix is non-nil, the lower +// parameter must have a prefix that is greater than or equal to this lower +// bound. func (d *DB) ScanInternal( ctx context.Context, categoryAndQoS sstable.CategoryAndQoS, diff --git a/internal/compact/run.go b/internal/compact/run.go index f7591e9dad9..a4f613fda13 100644 --- a/internal/compact/run.go +++ b/internal/compact/run.go @@ -79,6 +79,14 @@ type RunnerConfig struct { // during compaction. In practice, the sizes can vary between 50%-200% of this // value. TargetOutputFileSize uint64 + + // ConsiderCreateShared is true if this compaction can write shared files if + // SharedLowerUserKeyPrefix allows. + ConsiderCreateShared bool + + // Set equal to Options.Experimental.SharedLowerUserKeyPrefix, when + // ConsiderCreateShared is true, else nil. + SharedLowerUserKeyPrefix []byte } // Runner is a helper for running the "data" part of a compaction (where we use @@ -97,6 +105,11 @@ type Runner struct { cfg RunnerConfig iter *Iter + split base.Split + // At most one state transition from false => true, when the next key should + // be written to shared storage. + keyShouldBeWrittenToShared bool + tables []OutputTable // Stores any error encountered. err error @@ -113,20 +126,63 @@ type Runner struct { // NewRunner creates a new Runner. func NewRunner(cfg RunnerConfig, iter *Iter) *Runner { r := &Runner{ - cmp: iter.cmp, - cfg: cfg, - iter: iter, + cmp: iter.cmp, + cfg: cfg, + iter: iter, + split: iter.cfg.Comparer.Split, + } + if cfg.SharedLowerUserKeyPrefix == nil { + r.keyShouldBeWrittenToShared = cfg.ConsiderCreateShared + } else if r.cmp(cfg.CompactionBounds.Start[:r.split(cfg.CompactionBounds.Start)], cfg.SharedLowerUserKeyPrefix) >= 0 { + r.keyShouldBeWrittenToShared = true + // No more need to do key comparisons, since compaction can always write to shared. + r.cfg.SharedLowerUserKeyPrefix = nil + } else { + // Is the compaction always writing non-shared keys. + endKeyPrefix := base.UserKeyBoundary{ + Key: cfg.CompactionBounds.End.Key[:r.split(cfg.CompactionBounds.End.Key)], + Kind: cfg.CompactionBounds.End.Kind, + } + // By taking the prefix, we can turn an exclusive user-key bound into an inclusive user-key prefix bound. + if endKeyPrefix.Kind == base.Exclusive && len(endKeyPrefix.Key) < len(cfg.CompactionBounds.End.Key) { + endKeyPrefix.Kind = base.Inclusive + } + c := r.cmp(endKeyPrefix.Key, cfg.SharedLowerUserKeyPrefix) + if c < 0 || c == 0 && endKeyPrefix.Kind == base.Exclusive { + r.keyShouldBeWrittenToShared = false + // No more need to do key comparisons, since compaction can never write to shared. + r.cfg.SharedLowerUserKeyPrefix = nil + } } r.key, r.value = r.iter.First() return r } // MoreDataToWrite returns true if there is more data to be written. -func (r *Runner) MoreDataToWrite() bool { +func (r *Runner) MoreDataToWrite() (moreData bool, keyShouldBeWrittenToShared bool) { if r.err != nil { - return false + return false, false + } + moreData = r.key != nil || !r.lastRangeDelSpan.Empty() || !r.lastRangeKeySpan.Empty() + if moreData && !r.keyShouldBeWrittenToShared && r.cfg.SharedLowerUserKeyPrefix != nil { + firstKey := base.MinUserKey(r.cmp, spanStartOrNil(&r.lastRangeDelSpan), spanStartOrNil(&r.lastRangeKeySpan)) + if r.key != nil && firstKey == nil { + firstKey = r.key.UserKey + } + if firstKey == nil { + panic(base.AssertionFailedf("no data to write")) + } + cmp := r.cmp(r.cfg.SharedLowerUserKeyPrefix, firstKey[:r.split(firstKey)]) + if cmp <= 0 { + r.keyShouldBeWrittenToShared = true + // No more need to do key comparisons to stop writing to current sstable. + r.cfg.SharedLowerUserKeyPrefix = nil + } + // Else cmp > 0, so r.cmp(r.cfg.SharedLowerUserKeyPrefix, firstKey) > 0, + // and we can safely use the former as the split-limit in writeKeysToTable + // below. } - return r.key != nil || !r.lastRangeDelSpan.Empty() || !r.lastRangeKeySpan.Empty() + return moreData, r.keyShouldBeWrittenToShared } // WriteTable writes a new output table. This table will be part of @@ -168,8 +224,13 @@ func (r *Runner) writeKeysToTable(tw *sstable.Writer) (splitKey []byte, _ error) if firstKey == nil { return nil, base.AssertionFailedf("no data to write") } + tableSplitLimit := r.TableSplitLimit(firstKey) + if r.cfg.SharedLowerUserKeyPrefix != nil && + (tableSplitLimit == nil || r.cmp(r.cfg.SharedLowerUserKeyPrefix, tableSplitLimit) < 0) { + tableSplitLimit = r.cfg.SharedLowerUserKeyPrefix + } splitter := NewOutputSplitter( - r.cmp, firstKey, r.TableSplitLimit(firstKey), + r.cmp, firstKey, tableSplitLimit, r.cfg.TargetOutputFileSize, r.cfg.Grandparents.Iter(), r.iter.Frontiers(), ) lastUserKeyFn := func() []byte { diff --git a/internal/compact/splitting.go b/internal/compact/splitting.go index e63564b48d5..94fd4639c4e 100644 --- a/internal/compact/splitting.go +++ b/internal/compact/splitting.go @@ -102,6 +102,8 @@ type OutputSplitter struct { shouldSplitCalled bool + pastStartKey bool + nextBoundary splitterBoundary // reachedBoundary is set when the frontier reaches a boundary and is cleared // in the first ShouldSplitBefore call after that. @@ -212,8 +214,18 @@ func (s *OutputSplitter) ShouldSplitBefore( panic("ShouldSplitBefore called after it returned SplitNow") } if !s.shouldSplitCalled { - // The boundary could have been advanced to nextUserKey before the splitter - // was created. So one single time, we advance the boundary manually. + // The boundary could have been advanced to nextUserKey before the + // splitter was created (the compact.Iter was at nextUserKey when a + // previous OutputSplitter decided to split-before). So one single time, + // we advance the boundary manually. + // + // Note that this first nextUserKey can be ahead of + // OutputSplitter.startKey, since the startKey is decided by the previous + // split key. For example, the preceding file was split at c, resulting in + // splitting of a rangedel [a,f) into [a,c) and [c,f) where [a,c) is + // included in the preceding file. The compact.Iter is at key e (which + // happens to be a point key). The startKey will be c, and nextUserKey + // will be e. We have the opportunity here to split at d. s.shouldSplitCalled = true for s.nextBoundary.key != nil && s.cmp(s.nextBoundary.key, nextUserKey) <= 0 { s.boundaryReached(nextUserKey) @@ -250,9 +262,10 @@ func (s *OutputSplitter) ShouldSplitBefore( // When the target file size limit is very small (in tests), we could end up // splitting at the first key, which is not allowed. - if s.cmp(nextUserKey, s.startKey) <= 0 { + if !s.pastStartKey && s.cmp(nextUserKey, s.startKey) <= 0 { return NoSplit } + s.pastStartKey = true // TODO(radu): it would make for a cleaner interface if we didn't rely on a // lastUserKeyFn. We could make a copy of the key here and split at the next diff --git a/options.go b/options.go index 545d561e08e..13f264135a7 100644 --- a/options.go +++ b/options.go @@ -697,6 +697,18 @@ type Options struct { // CreateOnSharedLocator). CreateOnShared remote.CreateOnSharedStrategy CreateOnSharedLocator remote.Locator + // SharedLowerUserKeyPrefix, if specified, is an additional lower bound + // constraint on key prefixes that should be written to shared files. + SharedLowerUserKeyPrefix []byte + // WriteSharedWithStrictObsolete specifies that shared sstables are + // written with WriterOptions.IsStrictObsolete set to true. Strict + // obsolete tables do not permit merge keys. + WriteSharedWithStrictObsolete bool + // TODO(sumeer): add ReadSharedRequiresStrictObsolete to require that + // shared files visited via ScanInternal parameter func(sst + // *SharedSSTMeta) must be strict obsolete. That visit only has access to + // FileMetadata, so we will need to encode the StrictObsolete bit in + // there. // CacheSizeBytesBytes is the size of the on-disk block cache for objects // on shared storage in bytes. If it is 0, no cache is used.