Skip to content

Commit

Permalink
storage: add range key support for SST iterators and writers
Browse files Browse the repository at this point in the history
This patch adds `NewPebbleSSTIterator()`, which constructs an MVCC
iterator for SSTs. Unlike the existing `sstIterator`, it supports range
keys and merged iteration across multiple SSTs. It is based on a new
`pebble.NewExternalIter()` API for SST iteration, and reuses
`pebbleIterator` for the CRDB logic. Value verification has been split
out to a separate, general `VerifyingMVCCIterator`.

This iterator currently has significantly worse performance than the
existing SST iterator (as much as 100%), so it is not used yet outside
of tests. This will be optimized later.

The patch also adds support for writing range keys in `SSTWriter`. This
is only enabled when the SST format is `TableFormatPebblev2` or newer,
which is only the case once the cluster version reaches
`EnablePebbleFormatVersionRangeKeys`.

Release note: None
  • Loading branch information
erikgrinaker committed Jun 24, 2022
1 parent 1a037bc commit cf162f9
Show file tree
Hide file tree
Showing 11 changed files with 1,127 additions and 28 deletions.
1 change: 1 addition & 0 deletions pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ go_library(
"store_properties.go",
"temp_engine.go",
"testing_knobs.go",
"verifying_iterator.go",
":gen-resourcelimitreached-stringer", # keep
],
importpath = "github.com/cockroachdb/cockroach/pkg/storage",
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2353,7 +2353,7 @@ func scanIntentKeys(t *testing.T, r Reader) []roachpb.Key {

// scanIter scans all point/range keys from the iterator, and returns a combined
// slice of MVCCRangeKeyValue and MVCCKeyValue in order.
func scanIter(t *testing.T, iter MVCCIterator) []interface{} {
func scanIter(t *testing.T, iter SimpleMVCCIterator) []interface{} {
t.Helper()

iter.SeekGE(MVCCKey{Key: keys.LocalMax})
Expand All @@ -2377,8 +2377,8 @@ func scanIter(t *testing.T, iter MVCCIterator) []interface{} {
}
if hasPoint {
keys = append(keys, MVCCKeyValue{
Key: iter.Key(),
Value: iter.Value(),
Key: iter.UnsafeKey().Clone(),
Value: append([]byte{}, iter.UnsafeValue()...),
})
}
iter.Next()
Expand Down
235 changes: 235 additions & 0 deletions pkg/storage/mvcc_history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
Expand All @@ -39,9 +40,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/redact"
)

var sstIterVerify = util.ConstantWithMetamorphicTestBool("mvcc-histories-sst-iter-verify", false)

// TestMVCCHistories verifies that sequences of MVCC reads and writes
// perform properly.
//
Expand Down Expand Up @@ -90,6 +95,13 @@ import (
// clear_range k=<key> end=<key>
// clear_rangekey k=<key> end=<key> ts=<int>[,<int>]
//
// sst_put [ts=<int>[,<int>]] [localTs=<int>[,<int>]] k=<key> [v=<string>]
// sst_put_rangekey ts=<int>[,<int>] [localTS=<int>[,<int>]] k=<key> end=<key>
// sst_clear_range k=<key> end=<key>
// sst_clear_rangekey k=<key> end=<key> ts=<int>[,<int>]
// sst_finish
// sst_iter_new
//
// Where `<key>` can be a simple string, or a string
// prefixed by the following characters:
//
Expand Down Expand Up @@ -198,6 +210,104 @@ func TestMVCCHistories(t *testing.T) {
return err
}

// reportSSTEntries outputs entries from a raw SSTable. It uses a raw
// SST iterator in order to accurately represent the raw SST data.
reportSSTEntries := func(buf *redact.StringBuilder, name string, sst []byte) error {
r, err := sstable.NewMemReader(sst, sstable.ReaderOptions{
Comparer: EngineComparer,
})
if err != nil {
return err
}
buf.Printf(">> %s:\n", name)

// Dump point keys.
iter, err := r.NewIter(nil, nil)
if err != nil {
return err
}
defer func() { _ = iter.Close() }()
for k, v := iter.SeekGE(nil, false); k != nil; k, v = iter.Next() {
if err := iter.Error(); err != nil {
return err
}
key, err := DecodeMVCCKey(k.UserKey)
if err != nil {
return err
}
value, err := DecodeMVCCValue(v)
if err != nil {
return err
}
buf.Printf("%s: %s -> %s\n", strings.ToLower(k.Kind().String()), key, value)
}

// Dump rangedels.
if rdIter, err := r.NewRawRangeDelIter(); err != nil {
return err
} else if rdIter != nil {
defer func() { _ = rdIter.Close() }()
for s := rdIter.SeekGE(nil); s != nil; s = rdIter.Next() {
if err := rdIter.Error(); err != nil {
return err
}
start, err := DecodeMVCCKey(s.Start)
if err != nil {
return err
}
end, err := DecodeMVCCKey(s.End)
if err != nil {
return err
}
for _, k := range s.Keys {
buf.Printf("%s: %s\n", strings.ToLower(k.Kind().String()),
roachpb.Span{Key: start.Key, EndKey: end.Key})
}
}
}

// Dump range keys.
if rkIter, err := r.NewRawRangeKeyIter(); err != nil {
return err
} else if rkIter != nil {
defer func() { _ = rkIter.Close() }()
for s := rkIter.SeekGE(nil); s != nil; s = rkIter.Next() {
if err := rkIter.Error(); err != nil {
return err
}
start, err := DecodeMVCCKey(s.Start)
if err != nil {
return err
}
end, err := DecodeMVCCKey(s.End)
if err != nil {
return err
}
for _, k := range s.Keys {
buf.Printf("%s: %s", strings.ToLower(k.Kind().String()),
roachpb.Span{Key: start.Key, EndKey: end.Key})
if k.Suffix != nil {
ts, err := decodeMVCCTimestampSuffix(k.Suffix)
if err != nil {
return err
}
buf.Printf("/%s", ts)
}
if k.Kind() == pebble.InternalKeyKindRangeKeySet {
value, err := DecodeMVCCValue(k.Value)
if err != nil {
return err
}
buf.Printf(" -> %s", value)
}
buf.Printf("\n")
}
}
}

return nil
}

e := newEvalCtx(ctx, engine)
defer e.close()

Expand Down Expand Up @@ -279,6 +389,17 @@ func TestMVCCHistories(t *testing.T) {
buf.Printf("error reading data: (%T:) %v\n", err, err)
}
}
for i, sst := range e.ssts {
err = reportSSTEntries(&buf, fmt.Sprintf("sst-%d", i), sst)
if err != nil {
if foundErr == nil {
// Handle the error below.
foundErr = err
} else {
buf.Printf("error reading SST data: (%T:) %v\n", err, err)
}
}
}
}
}

Expand Down Expand Up @@ -403,6 +524,13 @@ func TestMVCCHistories(t *testing.T) {
foundErr = e.iterErr()
}

// Flush any unfinished SSTs.
if foundErr == nil {
foundErr = e.finishSST()
} else {
e.closeSST()
}

if !trace {
// If we were not tracing, no results were printed yet. Do it now.
if txnChange || dataChange {
Expand Down Expand Up @@ -514,6 +642,14 @@ var commands = map[string]cmd{
"iter_next_key": {typReadOnly, cmdIterNextKey},
"iter_prev": {typReadOnly, cmdIterPrev},
"iter_scan": {typReadOnly, cmdIterScan},

"sst_put": {typDataUpdate, cmdSSTPut},
"sst_put_rangekey": {typDataUpdate, cmdSSTPutRangeKey},
"sst_clear_range": {typDataUpdate, cmdSSTClearRange},
"sst_clear_rangekey": {typDataUpdate, cmdSSTClearRangeKey},
"sst_finish": {typDataUpdate, cmdSSTFinish},
"sst_reset": {typDataUpdate, cmdSSTReset},
"sst_iter_new": {typReadOnly, cmdSSTIterNew},
}

func cmdTxnAdvance(e *evalCtx) error {
Expand Down Expand Up @@ -1185,6 +1321,69 @@ func cmdIterScan(e *evalCtx) error {
}
}

func cmdSSTPut(e *evalCtx) error {
key := e.getKey()
ts := e.getTs(nil)
var val roachpb.Value
if e.hasArg("v") {
val = e.getVal()
}
return e.sst().PutMVCC(MVCCKey{Key: key, Timestamp: ts}, MVCCValue{Value: val})
}

func cmdSSTPutRangeKey(e *evalCtx) error {
var rangeKey MVCCRangeKey
rangeKey.StartKey, rangeKey.EndKey = e.getKeyRange()
rangeKey.Timestamp = e.getTs(nil)
var value MVCCValue
value.MVCCValueHeader.LocalTimestamp = hlc.ClockTimestamp(e.getTsWithName("localTs"))

return e.sst().ExperimentalPutMVCCRangeKey(rangeKey, value)
}

func cmdSSTClearRange(e *evalCtx) error {
start, end := e.getKeyRange()
return e.sst().ClearRawRange(start, end)
}

func cmdSSTClearRangeKey(e *evalCtx) error {
var rangeKey MVCCRangeKey
rangeKey.StartKey, rangeKey.EndKey = e.getKeyRange()
rangeKey.Timestamp = e.getTs(nil)

return e.sst().ExperimentalClearMVCCRangeKey(rangeKey)
}

func cmdSSTFinish(e *evalCtx) error {
return e.finishSST()
}

func cmdSSTReset(e *evalCtx) error {
if err := e.finishSST(); err != nil {
return err
}
e.ssts = nil
return nil
}

func cmdSSTIterNew(e *evalCtx) error {
if e.iter != nil {
e.iter.Close()
}
// Reverse the order of the SSTs, since earliers SSTs take precedence over
// later SSTs, and we want last-write-wins.
ssts := make([][]byte, len(e.ssts))
for i, sst := range e.ssts {
ssts[len(ssts)-i-1] = sst
}
iter, err := NewPebbleMultiMemSSTIterator(ssts, sstIterVerify)
if err != nil {
return err
}
e.iter = iter
return nil
}

func printIter(e *evalCtx) {
e.results.buf.Printf("%s:", e.td.Cmd)
defer e.results.buf.Printf("\n")
Expand Down Expand Up @@ -1294,18 +1493,23 @@ type evalCtx struct {
traceIntentWrites bool
}
ctx context.Context
st *cluster.Settings
engine Engine
iter SimpleMVCCIterator
t *testing.T
td *datadriven.TestData
txns map[string]*roachpb.Transaction
txnCounter uint128.Uint128
ms *enginepb.MVCCStats
sstWriter *SSTWriter
sstFile *MemFile
ssts [][]byte
}

func newEvalCtx(ctx context.Context, engine Engine) *evalCtx {
return &evalCtx{
ctx: ctx,
st: cluster.MakeTestingClusterSettings(),
engine: engine,
txns: make(map[string]*roachpb.Transaction),
txnCounter: uint128.FromInts(0, 1),
Expand Down Expand Up @@ -1500,6 +1704,37 @@ func (e *evalCtx) newTxn(
return txn, nil
}

func (e *evalCtx) sst() *SSTWriter {
if e.sstWriter == nil {
e.sstFile = &MemFile{}
w := MakeIngestionSSTWriter(e.ctx, e.st, e.sstFile)
e.sstWriter = &w
}
return e.sstWriter
}

func (e *evalCtx) finishSST() error {
if e.sstWriter == nil {
return nil
}
err := e.sstWriter.Finish()
if err == nil && e.sstWriter.DataSize > 0 {
e.ssts = append(e.ssts, e.sstFile.Bytes())
}
e.sstFile = nil
e.sstWriter = nil
return err
}

func (e *evalCtx) closeSST() {
if e.sstWriter == nil {
return
}
e.sstWriter.Close()
e.sstFile = nil
e.sstWriter = nil
}

func (e *evalCtx) lookupTxn(txnName string) (*roachpb.Transaction, error) {
txn, ok := e.txns[txnName]
if !ok {
Expand Down
14 changes: 14 additions & 0 deletions pkg/storage/pebble_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,20 @@ func newPebbleIteratorByCloning(
return p
}

// newPebbleSSTIterator creates a new Pebble iterator for the given SSTs.
func newPebbleSSTIterator(files []sstable.ReadableFile, opts IterOptions) (*pebbleIterator, error) {
p := pebbleIterPool.Get().(*pebbleIterator)
p.reusable = false // defensive
p.init(nil, opts, StandardDurability, true /* supportsRangeKeys */)

var err error
if p.iter, err = pebble.NewExternalIter(DefaultPebbleOptions(), &p.options, files); err != nil {
p.destroy()
return nil, err
}
return p, nil
}

// init resets this pebbleIterator for use with the specified arguments,
// reconfiguring the given iter. It is valid to pass a nil iter and then create
// p.iter using p.options, to avoid redundant reconfiguration via SetOptions().
Expand Down
Loading

0 comments on commit cf162f9

Please sign in to comment.