diff --git a/compaction.go b/compaction.go index 248476f2be..4fd0a903f3 100644 --- a/compaction.go +++ b/compaction.go @@ -462,6 +462,7 @@ const ( compactionKindRead compactionKindRewrite compactionKindIngestedFlushable + compactionKindBufferedFlush ) func (k compactionKind) String() string { @@ -1871,6 +1872,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { var n, inputs int var inputBytes uint64 var ingest bool + var bufferedFlush = true // TODO(aaditya): loop this into a config setting for ; n < len(d.mu.mem.queue)-1; n++ { if f, ok := d.mu.mem.queue[n].flushable.(*ingestedFlushable); ok { if n == 0 { @@ -1929,6 +1931,10 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { if err != nil { return 0, err } + + if bufferedFlush && !ingest { + c.kind = compactionKindBufferedFlush + } d.addInProgressCompaction(c) jobID := d.mu.nextJobID @@ -1943,7 +1949,16 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { // Compactions always write directly to the database's object provider. // Flushes may write to an in-memory object provider first. - var objCreator objectCreator = d.objProvider + var objCreator objectCreator + if c.kind == compactionKindBufferedFlush { + bufferedSSTs := &bufferedSSTables{} + // TODO(aaditya): pick a better size + bufferedSSTs.init(10) + objCreator = bufferedSSTs + } else { + objCreator = d.objProvider + } + var ve *manifest.VersionEdit var pendingOutputs []physicalMeta var stats compactStats @@ -1960,6 +1975,27 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { // TODO(aadityas,jackson): If the buffered output sstables are too small, // avoid linking them into the version and just update the flushable queue // appropriately. + if c.kind == compactionKindBufferedFlush { + var metas []*fileMetadata + var fileNums []base.DiskFileNum + for _, file := range ve.NewFiles { + metas = append(metas, file.Meta) + fileNums = append(fileNums, file.BackingFileNum) + } + + bufferedSST := objCreator.(*bufferedSSTables) + if bufferedSST.size < d.opts.MemTableSize /* TODO(aaditya): does this make sense? */ { + var f flushable + f, err = newFlushableBufferedSSTables(d.opts.Comparer, metas, sstable.ReaderOptions{}, bufferedSST) + fe := d.newFlushableEntry(f, fileNums[0], 0 /* TODO(aaditya): figure out what to put here */) + remaining := d.mu.mem.queue[n : len(d.mu.mem.queue)-2] + mutable := d.mu.mem.queue[len(d.mu.mem.queue)-1] + d.mu.mem.queue = append(remaining, fe, mutable) + return 0, err + } + + // else convert to objProvider and write to disk + } // Acquire logLock. This will be released either on an error, by way of // logUnlock, or through a call to logAndApply if there is no error. diff --git a/flushable.go b/flushable.go index c02e94087c..264cdf035e 100644 --- a/flushable.go +++ b/flushable.go @@ -554,6 +554,8 @@ type bufferedSSTables struct { currFileNum base.DiskFileNum // finished holds the set of previously written and finished sstables. finished []bufferedSSTable + // cumulative size of the finished buffers + size uint64 // objectIsOpen is true if the bufferedSSTables is currently being used as a // Writable. objectIsOpen bool @@ -630,6 +632,7 @@ func (b *bufferedSSTables) Remove(fileType base.FileType, FileNum base.DiskFileN // Sync implements the objectCreator interface. func (b *bufferedSSTables) Sync() error { + // BufferedSSTs store their data in memory and do not need to sync. return nil } @@ -640,7 +643,7 @@ func (b *bufferedSSTables) Sync() error { // written by the flush. var _ objstorage.Writable = (*bufferedSSTables)(nil) -// Finish implements objstorage.Writable. +// Write implements objstorage.Writable. func (b *bufferedSSTables) Write(p []byte) error { _, err := b.curr.Write(p) b.curr.Reset() @@ -656,6 +659,7 @@ func (b *bufferedSSTables) Finish() error { fileNum: b.currFileNum, buf: slices.Clone(b.curr.Bytes()), }) + b.size += uint64(b.curr.Len()) b.curr.Reset() b.objectIsOpen = false return nil diff --git a/table_cache.go b/table_cache.go index 14f2492e1c..a7f6145256 100644 --- a/table_cache.go +++ b/table_cache.go @@ -1180,6 +1180,7 @@ type loadInfo struct { } func (v *tableCacheValue) load(loadInfo loadInfo, c *tableCacheShard, dbOpts *tableCacheOpts) { + // TODO(aaditya): Example of creating iter for SST // Try opening the file first. var f objstorage.Readable var err error