diff --git a/compaction.go b/compaction.go index 27dcd253637..1e45e79a403 100644 --- a/compaction.go +++ b/compaction.go @@ -658,6 +658,33 @@ type compaction struct { pickerMetrics compactionPickerMetrics } +// objectCreator provides the subset of the objstorage.Provider interface +// necessary for compactions and flushes. It's typically satisfied by +// d.objProvider but may be satisfied by bufferedSSTables during flushes. +type objectCreator interface { + // Create creates a new object and opens it for writing. + // + // The object is not guaranteed to be durable (accessible in case of crashes) + // until Sync is called. + Create( + ctx context.Context, + fileType base.FileType, + FileNum base.DiskFileNum, + opts objstorage.CreateOptions, + ) (w objstorage.Writable, meta objstorage.ObjectMetadata, err error) + // Path returns an internal, implementation-dependent path for the object. It is + // meant to be used for informational purposes (like logging). + Path(meta objstorage.ObjectMetadata) string + // Remove removes an object. + // + // The object is not guaranteed to be durably removed until Sync is called. + Remove(fileType base.FileType, FileNum base.DiskFileNum) error + // Sync flushes the metadata from creation or removal of objects since the + // last Sync. This includes objects that have been Created but for which + // Writable.Finish() has not yet been called. + Sync() error +} + func (c *compaction) makeInfo(jobID int) CompactionInfo { info := CompactionInfo{ JobID: jobID, @@ -1916,6 +1943,9 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { }) startTime := d.timeNow() + // Compactions always write directly to the database's object provider. + // Flushes may write to an in-memory object provider first. + var objCreator objectCreator = d.objProvider var ve *manifest.VersionEdit var pendingOutputs []physicalMeta var stats compactStats @@ -1926,9 +1956,13 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { // runCompaction. For all other flush cases, we construct the VersionEdit // inside runCompaction. if c.kind != compactionKindIngestedFlushable { - ve, pendingOutputs, stats, err = d.runCompaction(jobID, c) + ve, pendingOutputs, stats, err = d.runCompaction(jobID, c, objCreator) } + // TODO(aadityas,jackson): If the buffered output sstables are too small, + // avoid linking them into the version and just update the flushable queue + // appropriately. + // Acquire logLock. This will be released either on an error, by way of // logUnlock, or through a call to logAndApply if there is no error. d.mu.versions.logLock() @@ -2626,7 +2660,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) { d.opts.EventListener.CompactionBegin(info) startTime := d.timeNow() - ve, pendingOutputs, stats, err := d.runCompaction(jobID, c) + ve, pendingOutputs, stats, err := d.runCompaction(jobID, c, d.objProvider) info.Duration = d.timeNow().Sub(startTime) if err == nil { @@ -2792,7 +2826,7 @@ func (d *DB) runCopyCompaction( // d.mu must be held when calling this, but the mutex may be dropped and // re-acquired during the course of this method. func (d *DB) runCompaction( - jobID int, c *compaction, + jobID int, c *compaction, objCreator objectCreator, ) (ve *versionEdit, pendingOutputs []physicalMeta, stats compactStats, retErr error) { // As a sanity check, confirm that the smallest / largest keys for new and // deleted files in the new versionEdit pass a validation function before @@ -2960,7 +2994,7 @@ func (d *DB) runCompaction( } if retErr != nil { for _, fileNum := range createdFiles { - _ = d.objProvider.Remove(fileTypeTable, fileNum) + _ = objCreator.Remove(fileTypeTable, fileNum) } } for _, closer := range c.closers { @@ -3055,7 +3089,8 @@ func (d *DB) runCompaction( PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level), } diskFileNum := base.PhysicalTableDiskFileNum(fileNum) - writable, objMeta, err := d.objProvider.Create(ctx, fileTypeTable, diskFileNum, createOpts) + + writable, objMeta, err := objCreator.Create(ctx, fileTypeTable, diskFileNum, createOpts) if err != nil { return err } @@ -3067,7 +3102,7 @@ func (d *DB) runCompaction( d.opts.EventListener.TableCreated(TableCreateInfo{ JobID: jobID, Reason: reason, - Path: d.objProvider.Path(objMeta), + Path: objCreator.Path(objMeta), FileNum: diskFileNum, }) if c.kind != compactionKindFlush { @@ -3510,7 +3545,7 @@ func (d *DB) runCompaction( // compactStats. stats.countMissizedDels = iter.stats.countMissizedDels - if err := d.objProvider.Sync(); err != nil { + if err := objCreator.Sync(); err != nil { return nil, pendingOutputs, stats, err } diff --git a/data_test.go b/data_test.go index 1ca28dd0e25..b29a3bfe4d6 100644 --- a/data_test.go +++ b/data_test.go @@ -869,7 +869,7 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) { // to the user-defined boundaries. c.maxOutputFileSize = math.MaxUint64 - newVE, _, _, err := d.runCompaction(0, c) + newVE, _, _, err := d.runCompaction(0, c, d.objProvider) if err != nil { return err } diff --git a/flushable.go b/flushable.go index eb5b6f8207d..c8b0c43a671 100644 --- a/flushable.go +++ b/flushable.go @@ -14,6 +14,8 @@ import ( "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/manifest" + "github.com/cockroachdb/pebble/objstorage" + "github.com/cockroachdb/pebble/sstable" ) // flushable defines the interface for immutable memtables. @@ -77,6 +79,10 @@ type flushableEntry struct { delayedFlushForcedAt time.Time // logNum corresponds to the WAL that contains the records present in the // receiver. + // + // TODO(aadityas,jackson): We'll need to do something about this (and + // logSize) for entries corresponding to bufferedSSTables since there may be + // multiple associated log nums. logNum base.DiskFileNum // logSize is the size in bytes of the associated WAL. Protected by DB.mu. logSize uint64 @@ -312,6 +318,106 @@ func (s *ingestedFlushable) computePossibleOverlaps( } } +// bufferedSSTables holds a set of in-memory sstables produced by a flush. +// Buffering flushed state reduces write amplification by making it more likely +// that we're able to drop KVs before they reach disk. +type bufferedSSTables struct { + metas []*fileMetadata + readers []*sstable.Reader +} + +var ( + // Assert that *bufferedSSTables implements the flushable interface. + _ flushable = (*bufferedSSTables)(nil) + // Assert that *bufferedSSTables implements the objectCreator interface. + _ objectCreator = (*bufferedSSTables)(nil) +) + +// newIter is part of the flushable interface. +func (b *bufferedSSTables) newIter(o *IterOptions) internalIterator { + panic("TODO") +} + +// newFlushIter is part of the flushable interface. +func (b *bufferedSSTables) newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator { + panic("TODO") +} + +func (b *bufferedSSTables) constructRangeDelIter( + file *manifest.FileMetadata, _ keyspan.SpanIterOptions, +) (keyspan.FragmentIterator, error) { + panic("TODO") +} + +// newRangeDelIter is part of the flushable interface. +// +// TODO(sumeer): *IterOptions are being ignored, so the index block load for +// the point iterator in constructRangeDeIter is not tracked. +func (b *bufferedSSTables) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator { + panic("TODO") +} + +// newRangeKeyIter is part of the flushable interface. +func (b *bufferedSSTables) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator { + if !b.containsRangeKeys() { + return nil + } + panic("TODO") +} + +// containsRangeKeys is part of the flushable interface. +func (b *bufferedSSTables) containsRangeKeys() bool { + panic("TODO") +} + +// inuseBytes is part of the flushable interface. +func (b *bufferedSSTables) inuseBytes() uint64 { + panic("TODO") +} + +// totalBytes is part of the flushable interface. +func (b *bufferedSSTables) totalBytes() uint64 { + panic("TODO") +} + +// readyForFlush is part of the flushable interface. +func (b *bufferedSSTables) readyForFlush() bool { + // Buffered sstables are always ready for flush; they're immutable. + return true +} + +// computePossibleOverlaps is part of the flushable interface. +func (b *bufferedSSTables) computePossibleOverlaps( + fn func(bounded) shouldContinue, bounded ...bounded, +) { + panic("TODO") +} + +// Create implements the objectCreator interface. +func (b *bufferedSSTables) Create( + ctx context.Context, + fileType base.FileType, + FileNum base.DiskFileNum, + opts objstorage.CreateOptions, +) (w objstorage.Writable, meta objstorage.ObjectMetadata, err error) { + panic("TODO") +} + +// Path implements the objectCreator interface. +func (b *bufferedSSTables) Path(meta objstorage.ObjectMetadata) string { + panic("TODO") +} + +// Remove implements the objectCreator interface. +func (b *bufferedSSTables) Remove(fileType base.FileType, FileNum base.DiskFileNum) error { + panic("TODO") +} + +// Sync implements the objectCreator interface. +func (b *bufferedSSTables) Sync() error { + panic("TODO") +} + // computePossibleOverlapsGenericImpl is an implemention of the flushable // interface's computePossibleOverlaps function for flushable implementations // with only in-memory state that do not have special requirements and should diff --git a/open.go b/open.go index 8f027562bc9..b17acecaaf2 100644 --- a/open.go +++ b/open.go @@ -803,7 +803,7 @@ func (d *DB) replayWAL( if err != nil { return err } - newVE, _, _, err := d.runCompaction(jobID, c) + newVE, _, _, err := d.runCompaction(jobID, c, d.objProvider) if err != nil { return errors.Wrapf(err, "running compaction during WAL replay") }