Skip to content

Commit

Permalink
sstable: split Writer into Writer and RawWriter
Browse files Browse the repository at this point in the history
The sstable.Writer had some egronomic methods for writing to the Writer from
non-internal client code (implementing the pebble.Writer interface). These
methods are internally unused, but may be used by external clients writing to
sstables for ingestion, storage outside the LSM, tests, etc. This commit splits
the Writer into two types, with the sugaring methods on the Writer type and the
lower-level internal methods on the RawWriter type.

In future work, sstable.RawWriter may become an interface to support two
implementations: one for rowblk-based sstables and one for colblk-based
sstables.
  • Loading branch information
jbowens committed Jul 29, 2024
1 parent ed42fb4 commit f0a0e2b
Show file tree
Hide file tree
Showing 29 changed files with 2,171 additions and 2,115 deletions.
4 changes: 2 additions & 2 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2656,7 +2656,7 @@ func (c *compaction) makeVersionEdit(result compact.Result) (*versionEdit, error
// compaction or flush.
func (d *DB) newCompactionOutput(
jobID JobID, c *compaction, writerOpts sstable.WriterOptions,
) (objstorage.ObjectMetadata, *sstable.Writer, CPUWorkHandle, error) {
) (objstorage.ObjectMetadata, *sstable.RawWriter, CPUWorkHandle, error) {
d.mu.Lock()
diskFileNum := d.mu.versions.getNextDiskFileNum()
d.mu.Unlock()
Expand Down Expand Up @@ -2730,7 +2730,7 @@ func (d *DB) newCompactionOutput(
d.opts.Experimental.MaxWriterConcurrency > 0 &&
(cpuWorkHandle.Permitted() || d.opts.Experimental.ForceWriterParallelism)

tw := sstable.NewWriter(writable, writerOpts)
tw := sstable.NewRawWriter(writable, writerOpts)
return objMeta, tw, cpuWorkHandle, nil
}

Expand Down
8 changes: 4 additions & 4 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ func runBuildRemoteCmd(td *datadriven.TestData, d *DB, storage remote.Storage) e
for kv := iter.First(); kv != nil; kv = iter.Next() {
tmp := kv.K
tmp.SetSeqNum(0)
if err := w.Add(tmp, kv.InPlaceValue()); err != nil {
if err := w.Raw().Add(tmp, kv.InPlaceValue()); err != nil {
return err
}
}
Expand All @@ -599,7 +599,7 @@ func runBuildRemoteCmd(td *datadriven.TestData, d *DB, storage remote.Storage) e
for ; s != nil && err == nil; s, err = rdi.Next() {
err = rangedel.Encode(s, func(k base.InternalKey, v []byte) error {
k.SetSeqNum(0)
return w.Add(k, v)
return w.Raw().Add(k, v)
})
if err != nil {
return err
Expand Down Expand Up @@ -680,7 +680,7 @@ func runBuildCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
for kv := iter.First(); kv != nil; kv = iter.Next() {
tmp := kv.K
tmp.SetSeqNum(0)
if err := w.Add(tmp, kv.InPlaceValue()); err != nil {
if err := w.Raw().Add(tmp, kv.InPlaceValue()); err != nil {
return err
}
}
Expand All @@ -693,7 +693,7 @@ func runBuildCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
for ; s != nil && err == nil; s, err = rdi.Next() {
err = rangedel.Encode(s, func(k base.InternalKey, v []byte) error {
k.SetSeqNum(0)
return w.Add(k, v)
return w.Raw().Add(k, v)
})
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions event_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestEventListener(t *testing.T) {
if err != nil {
return err.Error()
}
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
TableFormat: d.FormatMajorVersion().MaxTableFormat(),
})
if err := w.Add(base.MakeInternalKey([]byte("a"), 0, InternalKeyKindSet), nil); err != nil {
Expand Down Expand Up @@ -172,7 +172,7 @@ func TestEventListener(t *testing.T) {
if err != nil {
return err
}
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
TableFormat: d.FormatMajorVersion().MaxTableFormat(),
})
if err := w.Add(base.MakeInternalKey([]byte{key}, 0, InternalKeyKindSet), nil); err != nil {
Expand Down
34 changes: 23 additions & 11 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestIngestLoad(t *testing.T) {
if err != nil {
return err.Error()
}
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), writerOpts)
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), writerOpts)
for _, data := range strings.Split(td.Input, "\n") {
if strings.HasPrefix(data, "rangekey: ") {
data = strings.TrimPrefix(data, "rangekey: ")
Expand Down Expand Up @@ -193,7 +193,7 @@ func TestIngestLoadRand(t *testing.T) {

expected[i].ExtendPointKeyBounds(cmp, keys[0], keys[len(keys)-1])

w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
TableFormat: version.MaxTableFormat(),
})
var count uint64
Expand Down Expand Up @@ -1092,7 +1092,7 @@ func testIngestSharedImpl(
f, err := to.opts.FS.Create(sstPath, vfs.WriteCategoryUnspecified)
require.NoError(t, err)
replicateCounter++
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), writeOpts)
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), writeOpts)

var sharedSSTs []SharedSSTMeta
err = from.ScanInternal(context.TODO(), sstable.CategoryAndQoS{}, startKey, endKey,
Expand All @@ -1103,7 +1103,11 @@ func testIngestSharedImpl(
return nil
},
func(start, end []byte, seqNum base.SeqNum) error {
require.NoError(t, w.DeleteRange(start, end))
require.NoError(t, w.EncodeSpan(&keyspan.Span{
Start: start,
End: end,
Keys: []keyspan.Key{{Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeDelete)}},
}))
return nil
},
func(start, end []byte, keys []keyspan.Key) error {
Expand Down Expand Up @@ -1588,7 +1592,7 @@ func TestConcurrentExcise(t *testing.T) {
f, err := to.opts.FS.Create(sstPath, vfs.WriteCategoryUnspecified)
require.NoError(t, err)
replicateCounter++
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), writeOpts)
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), writeOpts)

var sharedSSTs []SharedSSTMeta
err = from.ScanInternal(context.TODO(), sstable.CategoryAndQoS{}, startKey, endKey,
Expand All @@ -1599,7 +1603,11 @@ func TestConcurrentExcise(t *testing.T) {
return nil
},
func(start, end []byte, seqNum base.SeqNum) error {
require.NoError(t, w.DeleteRange(start, end))
require.NoError(t, w.EncodeSpan(&keyspan.Span{
Start: start,
End: end,
Keys: []keyspan.Key{{Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeDelete)}},
}))
return nil
},
func(start, end []byte, keys []keyspan.Key) error {
Expand Down Expand Up @@ -2021,7 +2029,7 @@ func TestIngestExternal(t *testing.T) {
f, err := to.opts.FS.Create(sstPath, vfs.WriteCategoryUnspecified)
require.NoError(t, err)
replicateCounter++
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), writeOpts)
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), writeOpts)

var externalFiles []ExternalFile
err = from.ScanInternal(context.TODO(), sstable.CategoryAndQoS{}, startKey, endKey,
Expand All @@ -2032,7 +2040,11 @@ func TestIngestExternal(t *testing.T) {
return nil
},
func(start, end []byte, seqNum base.SeqNum) error {
require.NoError(t, w.DeleteRange(start, end))
require.NoError(t, w.EncodeSpan(&keyspan.Span{
Start: start,
End: end,
Keys: []keyspan.Key{{Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeDelete)}},
}))
return nil
},
func(start, end []byte, keys []keyspan.Key) error {
Expand Down Expand Up @@ -2574,7 +2586,7 @@ func TestIngestCompact(t *testing.T) {
f, err := mem.Create(src(0), vfs.WriteCategoryUnspecified)
require.NoError(t, err)

w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{})
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{})
key := []byte("a")
require.NoError(t, w.Add(base.MakeInternalKey(key, 0, InternalKeyKindSet), nil))
require.NoError(t, w.Close())
Expand Down Expand Up @@ -3212,12 +3224,12 @@ func TestIngestFileNumReuseCrash(t *testing.T) {
func TestIngest_UpdateSequenceNumber(t *testing.T) {
mem := vfs.NewMem()
cmp := base.DefaultComparer.Compare
parse := func(input string) (*sstable.Writer, error) {
parse := func(input string) (*sstable.RawWriter, error) {
f, err := mem.Create("ext", vfs.WriteCategoryUnspecified)
if err != nil {
return nil, err
}
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
TableFormat: sstable.TableFormatMax,
})
for _, data := range strings.Split(input, "\n") {
Expand Down
4 changes: 2 additions & 2 deletions internal/compact/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (r *Runner) MoreDataToWrite() bool {
// Result.Tables. Should only be called if MoreDataToWrite() returned true.
//
// WriteTable always closes the Writer.
func (r *Runner) WriteTable(objMeta objstorage.ObjectMetadata, tw *sstable.Writer) {
func (r *Runner) WriteTable(objMeta objstorage.ObjectMetadata, tw *sstable.RawWriter) {
if r.err != nil {
panic("error already encountered")
}
Expand All @@ -159,7 +159,7 @@ func (r *Runner) WriteTable(objMeta objstorage.ObjectMetadata, tw *sstable.Write
r.tables[len(r.tables)-1].WriterMeta = *writerMeta
}

func (r *Runner) writeKeysToTable(tw *sstable.Writer) (splitKey []byte, _ error) {
func (r *Runner) writeKeysToTable(tw *sstable.RawWriter) (splitKey []byte, _ error) {
firstKey := base.MinUserKey(r.cmp, spanStartOrNil(&r.lastRangeDelSpan), spanStartOrNil(&r.lastRangeKeySpan))
if r.key != nil && firstKey == nil {
firstKey = r.key.UserKey
Expand Down
2 changes: 1 addition & 1 deletion internal/compact/spans.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (c *RangeKeySpanCompactor) elideInLastStripe(
//
// The span can contain either only RANGEDEL keys or only range keys.
func SplitAndEncodeSpan(
cmp base.Compare, span *keyspan.Span, upToKey []byte, tw *sstable.Writer,
cmp base.Compare, span *keyspan.Span, upToKey []byte, tw *sstable.RawWriter,
) error {
if span.Empty() {
return nil
Expand Down
2 changes: 1 addition & 1 deletion internal/compact/spans_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestSplitAndEncodeSpan(t *testing.T) {
}

obj := &objstorage.MemObj{}
tw := sstable.NewWriter(obj, sstable.WriterOptions{TableFormat: sstable.TableFormatMax})
tw := sstable.NewRawWriter(obj, sstable.WriterOptions{TableFormat: sstable.TableFormatMax})
require.NoError(t, SplitAndEncodeSpan(base.DefaultComparer.Compare, &span, upToKey, tw))
require.NoError(t, tw.Close())
_, rangeDels, rangeKeys := sstable.ReadAll(obj)
Expand Down
2 changes: 1 addition & 1 deletion level_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func TestCheckLevelsCornerCases(t *testing.T) {
writerOpts.SetInternal(sstableinternal.WriterOptions{
DisableKeyOrderChecks: disableKeyOrderChecks,
})
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), writerOpts)
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), writerOpts)
var tombstones []keyspan.Span
frag := keyspan.Fragmenter{
Cmp: testkeys.Comparer.Compare,
Expand Down
6 changes: 3 additions & 3 deletions level_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (lt *levelIterTest) runBuild(d *datadriven.TestData) string {
}
}
fp := bloom.FilterPolicy(10)
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f0), sstable.WriterOptions{
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f0), sstable.WriterOptions{
Comparer: &lt.cmp,
FilterPolicy: fp,
TableFormat: tableFormat,
Expand Down Expand Up @@ -508,9 +508,9 @@ func buildLevelIterTables(
files[i] = f
}

writers := make([]*sstable.Writer, len(files))
writers := make([]*sstable.RawWriter, len(files))
for i := range files {
writers[i] = sstable.NewWriter(objstorageprovider.NewFileWritable(files[i]), sstable.WriterOptions{
writers[i] = sstable.NewRawWriter(objstorageprovider.NewFileWritable(files[i]), sstable.WriterOptions{
BlockRestartInterval: restartInterval,
BlockSize: blockSize,
Compression: NoCompression,
Expand Down
10 changes: 5 additions & 5 deletions merging_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func TestMergingIterCornerCases(t *testing.T) {
if err != nil {
return err.Error()
}
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{})
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{})
var tombstones []keyspan.Span
frag := keyspan.Fragmenter{
Cmp: cmp,
Expand Down Expand Up @@ -339,9 +339,9 @@ func buildMergingIterTables(
files[i] = f
}

writers := make([]*sstable.Writer, len(files))
writers := make([]*sstable.RawWriter, len(files))
for i := range files {
writers[i] = sstable.NewWriter(objstorageprovider.NewFileWritable(files[i]), sstable.WriterOptions{
writers[i] = sstable.NewRawWriter(objstorageprovider.NewFileWritable(files[i]), sstable.WriterOptions{
BlockRestartInterval: restartInterval,
BlockSize: blockSize,
Compression: NoCompression,
Expand Down Expand Up @@ -542,7 +542,7 @@ func buildLevelsForMergingIterSeqSeek(
}

const targetL6FirstFileSize = 2 << 20
writers := make([][]*sstable.Writer, levelCount)
writers := make([][]*sstable.RawWriter, levelCount)
// A policy unlikely to have false positives.
filterPolicy := bloom.FilterPolicy(100)
for i := range files {
Expand Down Expand Up @@ -572,7 +572,7 @@ func buildLevelsForMergingIterSeqSeek(
writerOptions.IndexBlockSize = 1
}
}
writers[i] = append(writers[i], sstable.NewWriter(objstorageprovider.NewFileWritable(files[i][j]), writerOptions))
writers[i] = append(writers[i], sstable.NewRawWriter(objstorageprovider.NewFileWritable(files[i][j]), writerOptions))
}
}

Expand Down
6 changes: 3 additions & 3 deletions metamorphic/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func writeSSTForIngestion(
if err != nil {
return nil, err
}
if err := w.Add(k.K, valBytes); err != nil {
if err := w.Raw().Add(k.K, valBytes); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -142,7 +142,7 @@ func writeSSTForIngestion(
collapsed.Keys[i].Trailer = base.MakeTrailer(0, collapsed.Keys[i].Kind())
}
keyspan.SortKeysByTrailer(&collapsed.Keys)
if err := w.EncodeSpan(&collapsed); err != nil {
if err := w.Raw().EncodeSpan(&collapsed); err != nil {
return nil, err
}
}
Expand All @@ -156,7 +156,7 @@ func writeSSTForIngestion(
if err := w.Close(); err != nil {
return nil, err
}
sstMeta, err := w.Metadata()
sstMeta, err := w.Raw().Metadata()
if err != nil {
return nil, err
}
Expand Down
14 changes: 7 additions & 7 deletions metamorphic/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -1928,7 +1928,7 @@ func (r *replicateOp) runSharedReplicate(
if err != nil {
panic(err)
}
return w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val)
return w.Raw().Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val)
},
func(start, end []byte, seqNum base.SeqNum) error {
return w.DeleteRange(start, end)
Expand All @@ -1939,7 +1939,7 @@ func (r *replicateOp) runSharedReplicate(
End: end,
Keys: keys,
}
return w.EncodeSpan(&s)
return w.Raw().EncodeSpan(&s)
},
func(sst *pebble.SharedSSTMeta) error {
sharedSSTs = append(sharedSSTs, *sst)
Expand All @@ -1957,7 +1957,7 @@ func (r *replicateOp) runSharedReplicate(
h.Recordf("%s // %v", r, err)
return
}
meta, err := w.Metadata()
meta, err := w.Raw().Metadata()
if err != nil {
h.Recordf("%s // %v", r, err)
return
Expand Down Expand Up @@ -1992,7 +1992,7 @@ func (r *replicateOp) runExternalReplicate(
if err != nil {
panic(err)
}
return w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val)
return w.Raw().Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val)
},
func(start, end []byte, seqNum base.SeqNum) error {
return w.DeleteRange(start, end)
Expand All @@ -2003,7 +2003,7 @@ func (r *replicateOp) runExternalReplicate(
End: end,
Keys: keys,
}
return w.EncodeSpan(&s)
return w.Raw().EncodeSpan(&s)
},
nil,
func(sst *pebble.ExternalFile) error {
Expand All @@ -2021,7 +2021,7 @@ func (r *replicateOp) runExternalReplicate(
h.Recordf("%s // %v", r, err)
return
}
meta, err := w.Metadata()
meta, err := w.Raw().Metadata()
if err != nil {
h.Recordf("%s // %v", r, err)
return
Expand Down Expand Up @@ -2116,7 +2116,7 @@ func (r *replicateOp) run(t *Test, h historyRecorder) {
}
}
keyspan.SortKeysByTrailer(&span.Keys)
if err := w.EncodeSpan(span); err != nil {
if err := w.Raw().EncodeSpan(span); err != nil {
panic(err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion range_del_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ func benchmarkRangeDelIterate(b *testing.B, entries, deleted int, snapshotCompac
if err != nil {
b.Fatal(err)
}
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
BlockSize: 32 << 10, // 32 KB
})
for i := 0; i < entries; i++ {
Expand Down
Loading

0 comments on commit f0a0e2b

Please sign in to comment.