Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
aadityasondhi authored and jbowens committed Feb 15, 2024
1 parent c46bc99 commit 24ffc0f
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 2 deletions.
38 changes: 37 additions & 1 deletion compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ const (
compactionKindRead
compactionKindRewrite
compactionKindIngestedFlushable
compactionKindBufferedFlush
)

func (k compactionKind) String() string {
Expand Down Expand Up @@ -1871,6 +1872,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
var n, inputs int
var inputBytes uint64
var ingest bool
var bufferedFlush = true // TODO(aaditya): loop this into a config setting
for ; n < len(d.mu.mem.queue)-1; n++ {
if f, ok := d.mu.mem.queue[n].flushable.(*ingestedFlushable); ok {
if n == 0 {
Expand Down Expand Up @@ -1929,6 +1931,10 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
if err != nil {
return 0, err
}

if bufferedFlush && !ingest {
c.kind = compactionKindBufferedFlush
}
d.addInProgressCompaction(c)

jobID := d.mu.nextJobID
Expand All @@ -1943,7 +1949,16 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {

// Compactions always write directly to the database's object provider.
// Flushes may write to an in-memory object provider first.
var objCreator objectCreator = d.objProvider
var objCreator objectCreator
if c.kind == compactionKindBufferedFlush {
bufferedSSTs := &bufferedSSTables{}
// TODO(aaditya): pick a better size
bufferedSSTs.init(10)
objCreator = bufferedSSTs
} else {
objCreator = d.objProvider
}

var ve *manifest.VersionEdit
var pendingOutputs []physicalMeta
var stats compactStats
Expand All @@ -1960,6 +1975,27 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
// TODO(aadityas,jackson): If the buffered output sstables are too small,
// avoid linking them into the version and just update the flushable queue
// appropriately.
if c.kind == compactionKindBufferedFlush {
var metas []*fileMetadata
var fileNums []base.DiskFileNum
for _, file := range ve.NewFiles {
metas = append(metas, file.Meta)
fileNums = append(fileNums, file.BackingFileNum)
}

bufferedSST := objCreator.(*bufferedSSTables)
if bufferedSST.size < d.opts.MemTableSize /* TODO(aaditya): does this make sense? */ {
var f flushable
f, err = newFlushableBufferedSSTables(d.opts.Comparer, metas, sstable.ReaderOptions{}, bufferedSST)
fe := d.newFlushableEntry(f, fileNums[0], 0 /* TODO(aaditya): figure out what to put here */)
remaining := d.mu.mem.queue[n : len(d.mu.mem.queue)-2]
mutable := d.mu.mem.queue[len(d.mu.mem.queue)-1]
d.mu.mem.queue = append(remaining, fe, mutable)
return 0, err
}

// else convert to objProvider and write to disk
}

// Acquire logLock. This will be released either on an error, by way of
// logUnlock, or through a call to logAndApply if there is no error.
Expand Down
6 changes: 5 additions & 1 deletion flushable.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,8 @@ type bufferedSSTables struct {
currFileNum base.DiskFileNum
// finished holds the set of previously written and finished sstables.
finished []bufferedSSTable
// cumulative size of the finished buffers
size uint64
// objectIsOpen is true if the bufferedSSTables is currently being used as a
// Writable.
objectIsOpen bool
Expand Down Expand Up @@ -630,6 +632,7 @@ func (b *bufferedSSTables) Remove(fileType base.FileType, FileNum base.DiskFileN

// Sync implements the objectCreator interface.
func (b *bufferedSSTables) Sync() error {
// BufferedSSTs store their data in memory and do not need to sync.
return nil
}

Expand All @@ -640,7 +643,7 @@ func (b *bufferedSSTables) Sync() error {
// written by the flush.
var _ objstorage.Writable = (*bufferedSSTables)(nil)

// Finish implements objstorage.Writable.
// Write implements objstorage.Writable.
func (b *bufferedSSTables) Write(p []byte) error {
_, err := b.curr.Write(p)
b.curr.Reset()
Expand All @@ -656,6 +659,7 @@ func (b *bufferedSSTables) Finish() error {
fileNum: b.currFileNum,
buf: slices.Clone(b.curr.Bytes()),
})
b.size += uint64(b.curr.Len())
b.curr.Reset()
b.objectIsOpen = false
return nil
Expand Down
1 change: 1 addition & 0 deletions table_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,7 @@ type loadInfo struct {
}

func (v *tableCacheValue) load(loadInfo loadInfo, c *tableCacheShard, dbOpts *tableCacheOpts) {
// TODO(aaditya): Example of creating iter for SST
// Try opening the file first.
var f objstorage.Readable
var err error
Expand Down

0 comments on commit 24ffc0f

Please sign in to comment.