Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db: add format major version for range keys #1497

Merged
merged 1 commit into from
Feb 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,6 @@ type Batch struct {

// The count of range key sets, unsets and deletes in the batch. Updated
// every time a RANGEKEYSET, RANGEKEYUNSET or RANGEKEYDEL key is added.
// TODO(jackson): This likely won't be necessary long-term, but it's useful
// for the in-memory only implementation in which these keys require special
// handling.
countRangeKeys uint64

// A deferredOp struct, stored in the Batch so that a pointer can be returned
Expand Down
23 changes: 17 additions & 6 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,11 +723,14 @@ func (d *DB) Apply(batch *Batch, opts *WriteOptions) error {
if d.opts.Experimental.RangeKeys == nil {
panic("pebble: range keys require the Experimental.RangeKeys option")
}
// TODO(jackson): Assert that all range key operands are suffixless.
if d.FormatMajorVersion() < FormatRangeKeys {
panic(fmt.Sprintf(
"pebble: range keys require at least format major version %d (current: %d)",
FormatRangeKeys, d.FormatMajorVersion(),
))
}

// TODO(jackson): Once the format major version for range keys is
// introduced, error if the batch includes range keys but the active
// format major version doesn't enable them.
// TODO(jackson): Assert that all range key operands are suffixless.
}

if batch.db == nil {
Expand Down Expand Up @@ -871,8 +874,16 @@ func (d *DB) newIterInternal(batch *Batch, s *Snapshot, o *IterOptions) *Iterato
if err := d.closed.Load(); err != nil {
panic(err)
}
if o.rangeKeys() && d.opts.Experimental.RangeKeys == nil {
panic("pebble: range keys require the Experimental.RangeKeys option")
if o.rangeKeys() {
if d.opts.Experimental.RangeKeys == nil {
panic("pebble: range keys require the Experimental.RangeKeys option")
}
if d.FormatMajorVersion() < FormatRangeKeys {
panic(fmt.Sprintf(
"pebble: range keys require at least format major version %d (current: %d)",
FormatRangeKeys, d.FormatMajorVersion(),
))
}
}
if o != nil && o.RangeKeyMasking.Suffix != nil && o.KeyTypes != IterKeyTypePointsAndRanges {
panic("pebble: range key masking requires IterKeyTypePointsAndRanges")
Expand Down
9 changes: 8 additions & 1 deletion format_major_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,13 @@ const (
// FormatBlockPropertyCollector is a format major version that introduces
// BlockPropertyCollectors.
FormatBlockPropertyCollector
// FormatRangeKeys is a format major version that introduces range keys.
FormatRangeKeys
// FormatNewest always contains the most recent format major version.
// NB: When adding new versions, the MaxTableFormat method should also be
// updated to return the maximum allowable version for the new
// FormatMajorVersion.
FormatNewest FormatMajorVersion = FormatBlockPropertyCollector
FormatNewest FormatMajorVersion = FormatRangeKeys
)

// MaxTableFormat returns the maximum sstable.TableFormat that can be used at
Expand All @@ -86,6 +88,8 @@ func (v FormatMajorVersion) MaxTableFormat() sstable.TableFormat {
return sstable.TableFormatRocksDBv2
case FormatBlockPropertyCollector:
return sstable.TableFormatPebblev1
case FormatRangeKeys:
return sstable.TableFormatPebblev2
default:
panic(fmt.Sprintf("pebble: unsupported format major version: %s", v))
}
Expand Down Expand Up @@ -167,6 +171,9 @@ var formatMajorVersionMigrations = map[FormatMajorVersion]func(*DB) error{
FormatBlockPropertyCollector: func(d *DB) error {
return d.finalizeFormatVersUpgrade(FormatBlockPropertyCollector)
},
FormatRangeKeys: func(d *DB) error {
return d.finalizeFormatVersUpgrade(FormatRangeKeys)
},
}

const formatVersionMarkerName = `format-version`
Expand Down
3 changes: 3 additions & 0 deletions format_major_version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ func TestRatchetFormat(t *testing.T) {
require.Equal(t, FormatSetWithDelete, d.FormatMajorVersion())
require.NoError(t, d.RatchetFormatMajorVersion(FormatBlockPropertyCollector))
require.Equal(t, FormatBlockPropertyCollector, d.FormatMajorVersion())
require.NoError(t, d.RatchetFormatMajorVersion(FormatRangeKeys))
require.Equal(t, FormatRangeKeys, d.FormatMajorVersion())
require.NoError(t, d.Close())

// If we Open the database again, leaving the default format, the
Expand Down Expand Up @@ -189,6 +191,7 @@ func TestFormatMajorVersions_TableFormat(t *testing.T) {
FormatVersioned: sstable.TableFormatRocksDBv2,
FormatSetWithDelete: sstable.TableFormatRocksDBv2,
FormatBlockPropertyCollector: sstable.TableFormatPebblev1,
FormatRangeKeys: sstable.TableFormatPebblev2,
}

// Valid versions.
Expand Down
28 changes: 24 additions & 4 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ func ingestValidateKey(opts *Options, key *InternalKey) error {
}

func ingestLoad1(
opts *Options, path string, cacheID uint64, fileNum FileNum,
opts *Options,
fmv FormatMajorVersion,
path string,
cacheID uint64,
fileNum FileNum,
) (*fileMetadata, error) {
stat, err := opts.FS.Stat(path)
if err != nil {
Expand All @@ -64,6 +68,18 @@ func ingestLoad1(
}
defer r.Close()

// Avoid ingesting tables with format versions this DB doesn't support.
tf, err := r.TableFormat()
if err != nil {
return nil, err
}
if tf > fmv.MaxTableFormat() {
return nil, errors.Newf(
"pebble: table with format %s unsupported at DB format major version %d, %s",
tf, fmv, fmv.MaxTableFormat(),
)
}

meta := &fileMetadata{}
meta.FileNum = fileNum
meta.Size = uint64(stat.Size())
Expand Down Expand Up @@ -154,12 +170,16 @@ func ingestLoad1(
}

func ingestLoad(
opts *Options, paths []string, cacheID uint64, pending []FileNum,
opts *Options,
fmv FormatMajorVersion,
paths []string,
cacheID uint64,
pending []FileNum,
) ([]*fileMetadata, []string, error) {
meta := make([]*fileMetadata, 0, len(paths))
newPaths := make([]string, 0, len(paths))
for i := range paths {
m, err := ingestLoad1(opts, paths[i], cacheID, pending[i])
m, err := ingestLoad1(opts, fmv, paths[i], cacheID, pending[i])
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -525,7 +545,7 @@ func (d *DB) Ingest(paths []string) error {

// Load the metadata for all of the files being ingested. This step detects
// and elides empty sstables.
meta, paths, err := ingestLoad(d.opts, paths, d.cacheID, pendingOutputs)
meta, paths, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, d.cacheID, pendingOutputs)
if err != nil {
return err
}
Expand Down
25 changes: 21 additions & 4 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,28 @@ func TestIngestLoad(t *testing.T) {
datadriven.RunTest(t, "testdata/ingest_load", func(td *datadriven.TestData) string {
switch td.Cmd {
case "load":
writerOpts := sstable.WriterOptions{}
var dbVersion FormatMajorVersion
for _, cmdArgs := range td.CmdArgs {
v, err := strconv.Atoi(cmdArgs.Vals[0])
if err != nil {
return err.Error()
}
switch k := cmdArgs.Key; k {
case "writer-version":
fmv := FormatMajorVersion(v)
writerOpts.TableFormat = fmv.MaxTableFormat()
case "db-version":
dbVersion = FormatMajorVersion(v)
default:
return fmt.Sprintf("unknown cmd %s\n", k)
}
}
f, err := mem.Create("ext")
if err != nil {
return err.Error()
}
w := sstable.NewWriter(f, sstable.WriterOptions{})
w := sstable.NewWriter(f, writerOpts)
for _, data := range strings.Split(td.Input, "\n") {
j := strings.Index(data, ":")
if j < 0 {
Expand All @@ -57,7 +74,7 @@ func TestIngestLoad(t *testing.T) {
Comparer: DefaultComparer,
FS: mem,
}
meta, _, err := ingestLoad(opts, []string{"ext"}, 0, []FileNum{1})
meta, _, err := ingestLoad(opts, dbVersion, []string{"ext"}, 0, []FileNum{1})
if err != nil {
return err.Error()
}
Expand Down Expand Up @@ -141,7 +158,7 @@ func TestIngestLoadRand(t *testing.T) {
Comparer: DefaultComparer,
FS: mem,
}
meta, _, err := ingestLoad(opts, paths, 0, pending)
meta, _, err := ingestLoad(opts, FormatNewest, paths, 0, pending)
require.NoError(t, err)

for _, m := range meta {
Expand All @@ -162,7 +179,7 @@ func TestIngestLoadInvalid(t *testing.T) {
Comparer: DefaultComparer,
FS: mem,
}
if _, _, err := ingestLoad(opts, []string{"invalid"}, 0, []FileNum{1}); err == nil {
if _, _, err := ingestLoad(opts, FormatNewest, []string{"invalid"}, 0, []FileNum{1}); err == nil {
t.Fatalf("expected error, but found success")
}
}
Expand Down
2 changes: 1 addition & 1 deletion open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestNewDBFilenames(t *testing.T) {
"LOCK",
"MANIFEST-000001",
"OPTIONS-000003",
"marker.format-version.000004.005",
"marker.format-version.000005.006",
"marker.manifest.000001.MANIFEST-000001",
},
}
Expand Down
47 changes: 43 additions & 4 deletions range_keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ package pebble
import (
"bytes"
"fmt"
"strconv"
"testing"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/datadriven"
"github.com/cockroachdb/pebble/internal/testkeys"
"github.com/cockroachdb/pebble/vfs"
Expand Down Expand Up @@ -35,10 +37,24 @@ func TestRangeKeys(t *testing.T) {
require.NoError(t, d.Close())
}
opts := &Options{
FS: vfs.NewMem(),
Comparer: testkeys.Comparer,
FS: vfs.NewMem(),
Comparer: testkeys.Comparer,
FormatMajorVersion: FormatRangeKeys,
}
opts.Experimental.RangeKeys = new(RangeKeysArena)

for _, cmdArgs := range td.CmdArgs {
if cmdArgs.Key != "format-major-version" {
return fmt.Sprintf("unknown command %s\n", cmdArgs.Key)
}
v, err := strconv.Atoi(cmdArgs.Vals[0])
if err != nil {
return err.Error()
}
// Override the DB version.
opts.FormatMajorVersion = FormatMajorVersion(v)
}

var err error
d, err = Open("", opts)
require.NoError(t, err)
Expand All @@ -52,8 +68,19 @@ func TestRangeKeys(t *testing.T) {
case "batch":
b := d.NewBatch()
require.NoError(t, runBatchDefineCmd(td, b))
var err error
func() {
defer func() {
if r := recover(); r != nil {
err = errors.New(r.(string))
}
}()
err = b.Commit(nil)
}()
if err != nil {
return err.Error()
}
count := b.Count()
require.NoError(t, b.Commit(nil))
return fmt.Sprintf("wrote %d keys\n", count)
case "indexed-batch":
b = d.NewIndexedBatch()
Expand All @@ -76,7 +103,19 @@ func TestRangeKeys(t *testing.T) {
}
o.RangeKeyMasking.Suffix = []byte(arg.Vals[0])
}
iter := newIter(o)
var iter *Iterator
var err error
func() {
defer func() {
if r := recover(); r != nil {
err = errors.New(r.(string))
}
}()
iter = newIter(o)
}()
if err != nil {
return err.Error()
}
return runIterCmd(td, iter, true /* close iter */)
case "rangekey-iter":
iter := newIter(&IterOptions{KeyTypes: IterKeyTypeRangesOnly})
Expand Down
10 changes: 10 additions & 0 deletions sstable/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2066,6 +2066,7 @@ type Reader struct {
mergerOK bool
checksumType ChecksumType
tableFilter *tableFilterReader
tableFormat TableFormat
Properties Properties
}

Expand Down Expand Up @@ -2690,6 +2691,14 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) {
return endBH.Offset + endBH.Length + blockTrailerLen - startBH.Offset, nil
}

// TableFormat returns the format version for the table.
func (r *Reader) TableFormat() (TableFormat, error) {
if r.err != nil {
return TableFormatUnspecified, r.err
}
return r.tableFormat, nil
}

// ReadableFile describes subset of vfs.File required for reading SSTs.
type ReadableFile interface {
io.ReaderAt
Expand Down Expand Up @@ -2735,6 +2744,7 @@ func NewReader(f ReadableFile, o ReaderOptions, extraOpts ...ReaderOption) (*Rea
return nil, r.Close()
}
r.checksumType = footer.checksum
r.tableFormat = footer.format
// Read the metaindex.
if err := r.readMetaindex(footer.metaindexBH); err != nil {
r.err = err
Expand Down
29 changes: 29 additions & 0 deletions sstable/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,35 @@ func TestValidateBlockChecksums(t *testing.T) {
}
}

func TestReader_TableFormat(t *testing.T) {
test := func(t *testing.T, want TableFormat) {
fs := vfs.NewMem()
f, err := fs.Create("test")
require.NoError(t, err)

opts := WriterOptions{TableFormat: want}
w := NewWriter(f, opts)
err = w.Close()
require.NoError(t, err)

f, err = fs.Open("test")
require.NoError(t, err)
r, err := NewReader(f, ReaderOptions{})
require.NoError(t, err)
defer r.Close()

got, err := r.TableFormat()
require.NoError(t, err)
require.Equal(t, want, got)
}

for tf := TableFormatLevelDB; tf <= TableFormatMax; tf++ {
t.Run(tf.String(), func(t *testing.T) {
test(t, tf)
})
}
}

func buildTestTable(
t *testing.T, numEntries uint64, blockSize, indexBlockSize int, compression Compression,
) *Reader {
Expand Down
Loading