diff --git a/compaction.go b/compaction.go index 536d7b2c1cb..2121892f349 100644 --- a/compaction.go +++ b/compaction.go @@ -462,6 +462,7 @@ const ( compactionKindRead compactionKindRewrite compactionKindIngestedFlushable + compactionKindBufferedFlush ) func (k compactionKind) String() string { @@ -1874,6 +1875,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { var n, inputs int var inputBytes uint64 var ingest bool + var bufferedFlush bool // TODO(aaditya): loop this into a config setting for ; n < len(d.mu.mem.queue)-1; n++ { if f, ok := d.mu.mem.queue[n].flushable.(*ingestedFlushable); ok { if n == 0 { @@ -1932,6 +1934,10 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { if err != nil { return 0, err } + + if bufferedFlush && !ingest { + c.kind = compactionKindBufferedFlush + } d.addInProgressCompaction(c) jobID := d.mu.nextJobID @@ -1946,7 +1952,16 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { // Compactions always write directly to the database's object provider. // Flushes may write to an in-memory object provider first. - var objCreator objectCreator = d.objProvider + var objCreator objectCreator + if c.kind == compactionKindBufferedFlush { + bufferedSSTs := &bufferedSSTables{} + // TODO(aaditya): pick a better size + bufferedSSTs.init(10) + objCreator = bufferedSSTs + } else { + objCreator = d.objProvider + } + var ve *manifest.VersionEdit var pendingOutputs []physicalMeta var stats compactStats @@ -1963,6 +1978,27 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { // TODO(aadityas,jackson): If the buffered output sstables are too small, // avoid linking them into the version and just update the flushable queue // appropriately. + if c.kind == compactionKindBufferedFlush { + var metas []*fileMetadata + var fileNums []base.DiskFileNum + for _, file := range ve.NewFiles { + metas = append(metas, file.Meta) + fileNums = append(fileNums, file.BackingFileNum) + } + + bufferedSST := objCreator.(*bufferedSSTables) + if bufferedSST.size < d.opts.MemTableSize /* TODO(aaditya): does this make sense? */ { + var f flushable + f, err = newFlushableBufferedSSTables(d.opts.Comparer, metas, sstable.ReaderOptions{}, bufferedSST) + fe := d.newFlushableEntry(f, fileNums[0], 0 /* TODO(aaditya): figure out what to put here */) + remaining := d.mu.mem.queue[n : len(d.mu.mem.queue)-2] + mutable := d.mu.mem.queue[len(d.mu.mem.queue)-1] + d.mu.mem.queue = append(remaining, fe, mutable) + return 0, err + } + + // else convert to objProvider and write to disk + } // Acquire logLock. This will be released either on an error, by way of // logUnlock, or through a call to logAndApply if there is no error. diff --git a/flushable.go b/flushable.go index 50fb9fba214..cdd62916d23 100644 --- a/flushable.go +++ b/flushable.go @@ -554,6 +554,8 @@ type bufferedSSTables struct { currFileNum base.DiskFileNum // finished holds the set of previously written and finished sstables. finished []bufferedSSTable + // cumulative size of the finished buffers + size uint64 // objectIsOpen is true if the bufferedSSTables is currently being used as a // Writable. objectIsOpen bool @@ -619,10 +621,9 @@ func (b *bufferedSSTables) Sync() error { // written by the flush. var _ objstorage.Writable = (*bufferedSSTables)(nil) -// Finish implements objstorage.Writable. +// Write implements objstorage.Writable. func (o *bufferedSSTables) Write(p []byte) error { _, err := o.curr.Write(p) - o.curr.Reset() return err } @@ -635,6 +636,7 @@ func (o *bufferedSSTables) Finish() error { fileNum: o.currFileNum, buf: slices.Clone(o.curr.Bytes()), }) + o.size += uint64(o.curr.Len()) o.curr.Reset() o.objectIsOpen = false return nil diff --git a/table_cache.go b/table_cache.go index 14f2492e1c8..a7f61452564 100644 --- a/table_cache.go +++ b/table_cache.go @@ -1180,6 +1180,7 @@ type loadInfo struct { } func (v *tableCacheValue) load(loadInfo loadInfo, c *tableCacheShard, dbOpts *tableCacheOpts) { + // TODO(aaditya): Example of creating iter for SST // Try opening the file first. var f objstorage.Readable var err error