Skip to content

Commit

Permalink
Merge pull request #72271 from erikgrinaker/backport21.2-72042
Browse files Browse the repository at this point in the history
release-21.2: kvserver: batch intents in `MVCCIterator.CheckForKeyCollisions`
  • Loading branch information
erikgrinaker authored Nov 16, 2021
2 parents 37129c9 + 75e83a0 commit a4b1f6c
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 33 deletions.
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ func EvalAddSSTable(
var skippedKVStats enginepb.MVCCStats
var err error
if args.DisallowShadowing {
if skippedKVStats, err = checkForKeyCollisions(ctx, readWriter, mvccStartKey, mvccEndKey, args.Data); err != nil {
maxIntents := storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV)
skippedKVStats, err = checkForKeyCollisions(
ctx, readWriter, mvccStartKey, mvccEndKey, args.Data, maxIntents)
if err != nil {
return result.Result{}, errors.Wrap(err, "checking for key collisions")
}
}
Expand Down Expand Up @@ -294,6 +297,7 @@ func checkForKeyCollisions(
mvccStartKey storage.MVCCKey,
mvccEndKey storage.MVCCKey,
data []byte,
maxIntents int64,
) (enginepb.MVCCStats, error) {
// Create iterator over the existing data.
existingDataIter := reader.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{UpperBound: mvccEndKey.Key})
Expand All @@ -306,5 +310,5 @@ func checkForKeyCollisions(
return enginepb.MVCCStats{}, nil
}

return existingDataIter.CheckForKeyCollisions(data, mvccStartKey.Key, mvccEndKey.Key)
return existingDataIter.CheckForKeyCollisions(data, mvccStartKey.Key, mvccEndKey.Key, maxIntents)
}
39 changes: 31 additions & 8 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -495,6 +496,9 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
e := engineImpl.create()
defer e.Close()

st := cluster.MakeTestingClusterSettings()
evalCtx := (&batcheval.MockEvalCtx{ClusterSettings: st}).EvalContext()

for _, kv := range mvccKVsFromStrs([]strKv{
{"a", 2, "aa"},
{"b", 1, "bb"},
Expand Down Expand Up @@ -550,6 +554,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
sstBytes := getSSTBytes(sstKVs)
stats := getStats(roachpb.Key("a"), roachpb.Key("b"), sstBytes)
cArgs := batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
},
Expand Down Expand Up @@ -577,6 +582,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {

sstBytes := getSSTBytes(sstKVs)
cArgs := batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
},
Expand Down Expand Up @@ -606,6 +612,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {

sstBytes := getSSTBytes(sstKVs)
cArgs := batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
},
Expand Down Expand Up @@ -633,6 +640,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
sstBytes := getSSTBytes(sstKVs)
stats := getStats(roachpb.Key("c"), roachpb.Key("i"), sstBytes)
cArgs := batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
},
Expand Down Expand Up @@ -665,6 +673,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {

sstBytes := getSSTBytes(sstKVs)
cArgs := batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
},
Expand Down Expand Up @@ -693,6 +702,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {

sstBytes := getSSTBytes(sstKVs)
cArgs := batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
},
Expand Down Expand Up @@ -722,6 +732,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {

sstBytes := getSSTBytes(sstKVs)
cArgs := batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
},
Expand All @@ -739,16 +750,16 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
}
}

// Test key collision when ingesting a key which has a write intent in the
// Test key collision when ingesting keys which have write intents in the
// existing data.
{
sstKVs := mvccKVsFromStrs([]strKv{
{"f", 2, "ff"},
{"q", 4, "qq"},
{"t", 3, "ttt"}, // has a write intent in the existing data.
{"q", 4, "qq"}, // has a write intent in the existing data
{"t", 3, "ttt"}, // has a write intent in the existing data
})

// Add in a write intent.
// Add in two write intents.
ts := hlc.Timestamp{WallTime: 7}
txn := roachpb.MakeTransaction(
"test",
Expand All @@ -757,18 +768,24 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
ts,
base.DefaultMaxClockOffset.Nanoseconds(),
)
if err := storage.MVCCPut(
ctx, e, nil, []byte("q"), ts,
roachpb.MakeValueFromBytes([]byte("q")),
&txn,
); err != nil {
t.Fatalf("%+v", err)
}
if err := storage.MVCCPut(
ctx, e, nil, []byte("t"), ts,
roachpb.MakeValueFromBytes([]byte("tt")),
&txn,
); err != nil {
if !errors.HasType(err, (*roachpb.WriteIntentError)(nil)) {
t.Fatalf("%+v", err)
}
t.Fatalf("%+v", err)
}

sstBytes := getSSTBytes(sstKVs)
cArgs := batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
},
Expand All @@ -781,7 +798,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
}

_, err := batcheval.EvalAddSSTable(ctx, e, cArgs, nil)
if !testutils.IsError(err, "conflicting intents on \"t") {
if !testutils.IsError(err, "conflicting intents on \"q\", \"t\"") {
t.Fatalf("%+v", err)
}
}
Expand All @@ -807,6 +824,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {

sstBytes := getSSTBytes(sstKVs)
cArgs := batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
},
Expand Down Expand Up @@ -836,6 +854,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
sstBytes := getSSTBytes(sstKVs)
stats := getStats(roachpb.Key("e"), roachpb.Key("zz"), sstBytes)
cArgs := batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
},
Expand Down Expand Up @@ -865,6 +884,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {

sstBytes := getSSTBytes(sstKVs)
cArgs := batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
},
Expand Down Expand Up @@ -893,6 +913,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {

sstBytes := getSSTBytes(sstKVs)
cArgs := batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
},
Expand Down Expand Up @@ -921,6 +942,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {

sstBytes := getSSTBytes(sstKVs)
cArgs := batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
},
Expand Down Expand Up @@ -960,6 +982,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
commandStats := enginepb.MVCCStats{}

cArgs := batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
},
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,9 @@ func (i *MVCCIterator) FindSplitKey(

// CheckForKeyCollisions is part of the storage.MVCCIterator interface.
func (i *MVCCIterator) CheckForKeyCollisions(
sstData []byte, start, end roachpb.Key,
sstData []byte, start, end roachpb.Key, maxIntents int64,
) (enginepb.MVCCStats, error) {
return i.i.CheckForKeyCollisions(sstData, start, end)
return i.i.CheckForKeyCollisions(sstData, start, end, maxIntents)
}

// SetUpperBound is part of the storage.MVCCIterator interface.
Expand Down
10 changes: 7 additions & 3 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,13 @@ type MVCCIterator interface {
// must set the upper bound on the iterator before calling this method.
FindSplitKey(start, end, minSplitKey roachpb.Key, targetSize int64) (MVCCKey, error)
// CheckForKeyCollisions checks whether any keys collide between the iterator
// and the encoded SST data specified, within the provided key range. Returns
// stats on skipped KVs, or an error if a collision is found.
CheckForKeyCollisions(sstData []byte, start, end roachpb.Key) (enginepb.MVCCStats, error)
// and the encoded SST data specified, within the provided key range.
// maxIntents specifies the number of intents to collect and return in a
// WriteIntentError (0 disables batching, pass math.MaxInt64 to collect all).
// Returns stats on skipped KVs, or an error if a collision is found.
CheckForKeyCollisions(
sstData []byte, start, end roachpb.Key, maxIntents int64,
) (enginepb.MVCCStats, error)
// SetUpperBound installs a new upper bound for this iterator. The caller
// can modify the parameter after this function returns. This must not be a
// nil key. When Reader.ConsistentIterators is true, prefer creating a new
Expand Down
78 changes: 78 additions & 0 deletions pkg/storage/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,84 @@ func TestEngineDeleteIterRange(t *testing.T) {
})
}

func TestMVCCIteratorCheckForKeyCollisionsMaxIntents(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

keys := []string{"aa", "bb", "cc", "dd"}
intents := []string{"a", "b", "c"}

testcases := []struct {
maxIntents int64
expectIntents []string
}{
{maxIntents: -1, expectIntents: []string{"a"}},
{maxIntents: 0, expectIntents: []string{"a"}},
{maxIntents: 1, expectIntents: []string{"a"}},
{maxIntents: 2, expectIntents: []string{"a", "b"}},
{maxIntents: 3, expectIntents: []string{"a", "b", "c"}},
{maxIntents: 4, expectIntents: []string{"a", "b", "c"}},
}

// Create SST with keys equal to intents at txn2TS.
sstFile := &MemFile{}
sstWriter := MakeBackupSSTWriter(sstFile)
defer sstWriter.Close()
for _, k := range intents {
key := MVCCKey{Key: roachpb.Key(k), Timestamp: txn2TS}
value := roachpb.Value{}
value.SetString("sst")
value.InitChecksum(key.Key)
require.NoError(t, sstWriter.Put(key, value.RawBytes))
}
require.NoError(t, sstWriter.Finish())
sstWriter.Close()

for _, engineImpl := range mvccEngineImpls {
t.Run(engineImpl.name, func(t *testing.T) {
ctx := context.Background()
engine := engineImpl.create()
defer engine.Close()

// Write some committed keys and intents at txn1TS.
batch := engine.NewBatch()
for _, key := range keys {
require.NoError(t, batch.PutMVCC(
MVCCKey{Key: roachpb.Key(key), Timestamp: txn1TS}, []byte("value")))
}
for _, key := range intents {
require.NoError(t, MVCCPut(
ctx, batch, nil, roachpb.Key(key), txn1TS, roachpb.MakeValueFromString("intent"), txn1))
}
require.NoError(t, batch.Commit(true))
batch.Close()
require.NoError(t, engine.Flush())

for _, tc := range testcases {
t.Run(fmt.Sprintf("maxIntents=%d", tc.maxIntents), func(t *testing.T) {
// Provoke and check WriteIntentErrors.
iter := engine.NewMVCCIterator(
MVCCKeyAndIntentsIterKind, IterOptions{UpperBound: roachpb.Key("z")})
defer iter.Close()
iter.SeekGE(MVCCKey{Key: roachpb.Key("a")})

_, err := iter.CheckForKeyCollisions(
sstFile.Bytes(), roachpb.Key("a"), roachpb.Key("z"), tc.maxIntents)
require.Error(t, err)
writeIntentErr := &roachpb.WriteIntentError{}
require.ErrorAs(t, err, &writeIntentErr)

actual := []string{}
for _, i := range writeIntentErr.Intents {
actual = append(actual, string(i.Key))
}
require.Equal(t, tc.expectIntents, actual)
})
}
})
}
}

func TestSnapshot(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/intent_interleaving_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -942,9 +942,9 @@ func (i *intentInterleavingIter) FindSplitKey(
}

func (i *intentInterleavingIter) CheckForKeyCollisions(
sstData []byte, start, end roachpb.Key,
sstData []byte, start, end roachpb.Key, maxIntents int64,
) (enginepb.MVCCStats, error) {
return checkForKeyCollisionsGo(i, sstData, start, end)
return checkForKeyCollisionsGo(i, sstData, start, end, maxIntents)
}

func (i *intentInterleavingIter) SetUpperBound(key roachpb.Key) {
Expand Down
29 changes: 15 additions & 14 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4128,9 +4128,10 @@ func ComputeStatsForRange(
// is not considered a collision and we continue iteration from the next key in
// the existing data.
func checkForKeyCollisionsGo(
existingIter MVCCIterator, sstData []byte, start, end roachpb.Key,
existingIter MVCCIterator, sstData []byte, start, end roachpb.Key, maxIntents int64,
) (enginepb.MVCCStats, error) {
var skippedKVStats enginepb.MVCCStats
var intents []roachpb.Intent
sstIter, err := NewMemSSTIterator(sstData, false)
if err != nil {
return enginepb.MVCCStats{}, err
Expand Down Expand Up @@ -4164,20 +4165,17 @@ func checkForKeyCollisionsGo(
if len(mvccMeta.RawBytes) > 0 {
return enginepb.MVCCStats{}, errors.Errorf("inline values are unsupported when checking for key collisions")
} else if mvccMeta.Txn != nil {
// Check for a write intent.
//
// TODO(adityamaru): Currently, we raise a WriteIntentError on
// encountering all intents. This is because, we do not expect to
// encounter many intents during IMPORT INTO as we lock the key space we
// are importing into. Older write intents could however be found in the
// target key space, which will require appropriate resolution logic.
writeIntentErr := roachpb.WriteIntentError{
Intents: []roachpb.Intent{
roachpb.MakeIntent(mvccMeta.Txn, existingIter.Key().Key),
},
// Check for a write intent. We keep looking for additional intents to
// return a large batch for intent resolution. The caller will likely
// resolve the returned intents and retry the call, which would be
// quadratic, so this significantly reduces the overall number of scans.
intents = append(intents, roachpb.MakeIntent(mvccMeta.Txn, existingIter.Key().Key))
if int64(len(intents)) >= maxIntents {
return enginepb.MVCCStats{}, &roachpb.WriteIntentError{Intents: intents}
}

return enginepb.MVCCStats{}, &writeIntentErr
existingIter.NextKey()
ok, extErr = existingIter.Valid()
continue
} else {
return enginepb.MVCCStats{}, errors.Errorf("intent without transaction")
}
Expand Down Expand Up @@ -4254,6 +4252,9 @@ func checkForKeyCollisionsGo(
if sstErr != nil {
return enginepb.MVCCStats{}, sstErr
}
if len(intents) > 0 {
return enginepb.MVCCStats{}, &roachpb.WriteIntentError{Intents: intents}
}

return skippedKVStats, nil
}
4 changes: 2 additions & 2 deletions pkg/storage/pebble_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,9 +778,9 @@ func (p *pebbleIterator) SupportsPrev() bool {
// CheckForKeyCollisions indicates if the provided SST data collides with this
// iterator in the specified range.
func (p *pebbleIterator) CheckForKeyCollisions(
sstData []byte, start, end roachpb.Key,
sstData []byte, start, end roachpb.Key, maxIntents int64,
) (enginepb.MVCCStats, error) {
return checkForKeyCollisionsGo(p, sstData, start, end)
return checkForKeyCollisionsGo(p, sstData, start, end, maxIntents)
}

// GetRawIter is part of the EngineIterator interface.
Expand Down

0 comments on commit a4b1f6c

Please sign in to comment.