Skip to content

Commit

Permalink
kvserver: batch intents in MVCCIterator.CheckForKeyCollisions
Browse files Browse the repository at this point in the history
`MVCCIterator.CheckForKeyCollisions()` is used by `AddSSTable` to check
for key collisions when `DisallowShadowing` is set. If it encounters any
intents, it returns `WriteIntentError` to resolve these before retrying.

However, this returned an error for each individual intent, which has
quadratic performance. This patch changes it to instead collect and
return a batch of intents, for more efficient intent resolution.

The batch size is controlled by the existing setting
`storage.mvcc.max_intents_per_error`, which defaults to 5000.

Release note (performance improvement): Improved `IMPORT INTO`
performance in cases where it encounters large numbers of unresolved
write intents.
  • Loading branch information
erikgrinaker committed Oct 31, 2021
1 parent f279cc9 commit 75e83a0
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 33 deletions.
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ func EvalAddSSTable(
var skippedKVStats enginepb.MVCCStats
var err error
if args.DisallowShadowing {
if skippedKVStats, err = checkForKeyCollisions(ctx, readWriter, mvccStartKey, mvccEndKey, args.Data); err != nil {
maxIntents := storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV)
skippedKVStats, err = checkForKeyCollisions(
ctx, readWriter, mvccStartKey, mvccEndKey, args.Data, maxIntents)
if err != nil {
return result.Result{}, errors.Wrap(err, "checking for key collisions")
}
}
Expand Down Expand Up @@ -226,6 +229,7 @@ func checkForKeyCollisions(
mvccStartKey storage.MVCCKey,
mvccEndKey storage.MVCCKey,
data []byte,
maxIntents int64,
) (enginepb.MVCCStats, error) {
// Create iterator over the existing data.
existingDataIter := reader.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{UpperBound: mvccEndKey.Key})
Expand All @@ -238,5 +242,5 @@ func checkForKeyCollisions(
return enginepb.MVCCStats{}, nil
}

return existingDataIter.CheckForKeyCollisions(data, mvccStartKey.Key, mvccEndKey.Key)
return existingDataIter.CheckForKeyCollisions(data, mvccStartKey.Key, mvccEndKey.Key, maxIntents)
}
39 changes: 31 additions & 8 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -494,6 +495,9 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
e := engineImpl.create()
defer e.Close()

st := cluster.MakeTestingClusterSettings()
evalCtx := (&batcheval.MockEvalCtx{ClusterSettings: st}).EvalContext()

for _, kv := range mvccKVsFromStrs([]strKv{
{"a", 2, "aa"},
{"b", 1, "bb"},
Expand Down Expand Up @@ -549,6 +553,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
sstBytes := getSSTBytes(sstKVs)
stats := getStats(roachpb.Key("a"), roachpb.Key("b"), sstBytes)
cArgs := batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
},
Expand Down Expand Up @@ -576,6 +581,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {

sstBytes := getSSTBytes(sstKVs)
cArgs := batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
},
Expand Down Expand Up @@ -605,6 +611,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {

sstBytes := getSSTBytes(sstKVs)
cArgs := batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
},
Expand Down Expand Up @@ -632,6 +639,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
sstBytes := getSSTBytes(sstKVs)
stats := getStats(roachpb.Key("c"), roachpb.Key("i"), sstBytes)
cArgs := batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
},
Expand Down Expand Up @@ -664,6 +672,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {

sstBytes := getSSTBytes(sstKVs)
cArgs := batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
},
Expand Down Expand Up @@ -692,6 +701,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {

sstBytes := getSSTBytes(sstKVs)
cArgs := batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
},
Expand Down Expand Up @@ -721,6 +731,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {

sstBytes := getSSTBytes(sstKVs)
cArgs := batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
},
Expand All @@ -738,16 +749,16 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
}
}

// Test key collision when ingesting a key which has a write intent in the
// Test key collision when ingesting keys which have write intents in the
// existing data.
{
sstKVs := mvccKVsFromStrs([]strKv{
{"f", 2, "ff"},
{"q", 4, "qq"},
{"t", 3, "ttt"}, // has a write intent in the existing data.
{"q", 4, "qq"}, // has a write intent in the existing data
{"t", 3, "ttt"}, // has a write intent in the existing data
})

// Add in a write intent.
// Add in two write intents.
ts := hlc.Timestamp{WallTime: 7}
txn := roachpb.MakeTransaction(
"test",
Expand All @@ -756,18 +767,24 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
ts,
base.DefaultMaxClockOffset.Nanoseconds(),
)
if err := storage.MVCCPut(
ctx, e, nil, []byte("q"), ts,
roachpb.MakeValueFromBytes([]byte("q")),
&txn,
); err != nil {
t.Fatalf("%+v", err)
}
if err := storage.MVCCPut(
ctx, e, nil, []byte("t"), ts,
roachpb.MakeValueFromBytes([]byte("tt")),
&txn,
); err != nil {
if !errors.HasType(err, (*roachpb.WriteIntentError)(nil)) {
t.Fatalf("%+v", err)
}
t.Fatalf("%+v", err)
}

sstBytes := getSSTBytes(sstKVs)
cArgs := batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
},
Expand All @@ -780,7 +797,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
}

_, err := batcheval.EvalAddSSTable(ctx, e, cArgs, nil)
if !testutils.IsError(err, "conflicting intents on \"t") {
if !testutils.IsError(err, "conflicting intents on \"q\", \"t\"") {
t.Fatalf("%+v", err)
}
}
Expand All @@ -806,6 +823,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {

sstBytes := getSSTBytes(sstKVs)
cArgs := batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
},
Expand Down Expand Up @@ -835,6 +853,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
sstBytes := getSSTBytes(sstKVs)
stats := getStats(roachpb.Key("e"), roachpb.Key("zz"), sstBytes)
cArgs := batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
},
Expand Down Expand Up @@ -864,6 +883,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {

sstBytes := getSSTBytes(sstKVs)
cArgs := batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
},
Expand Down Expand Up @@ -892,6 +912,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {

sstBytes := getSSTBytes(sstKVs)
cArgs := batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
},
Expand Down Expand Up @@ -920,6 +941,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {

sstBytes := getSSTBytes(sstKVs)
cArgs := batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
},
Expand Down Expand Up @@ -959,6 +981,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
commandStats := enginepb.MVCCStats{}

cArgs := batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
},
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,9 @@ func (i *MVCCIterator) FindSplitKey(

// CheckForKeyCollisions is part of the storage.MVCCIterator interface.
func (i *MVCCIterator) CheckForKeyCollisions(
sstData []byte, start, end roachpb.Key,
sstData []byte, start, end roachpb.Key, maxIntents int64,
) (enginepb.MVCCStats, error) {
return i.i.CheckForKeyCollisions(sstData, start, end)
return i.i.CheckForKeyCollisions(sstData, start, end, maxIntents)
}

// SetUpperBound is part of the storage.MVCCIterator interface.
Expand Down
10 changes: 7 additions & 3 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,13 @@ type MVCCIterator interface {
// must set the upper bound on the iterator before calling this method.
FindSplitKey(start, end, minSplitKey roachpb.Key, targetSize int64) (MVCCKey, error)
// CheckForKeyCollisions checks whether any keys collide between the iterator
// and the encoded SST data specified, within the provided key range. Returns
// stats on skipped KVs, or an error if a collision is found.
CheckForKeyCollisions(sstData []byte, start, end roachpb.Key) (enginepb.MVCCStats, error)
// and the encoded SST data specified, within the provided key range.
// maxIntents specifies the number of intents to collect and return in a
// WriteIntentError (0 disables batching, pass math.MaxInt64 to collect all).
// Returns stats on skipped KVs, or an error if a collision is found.
CheckForKeyCollisions(
sstData []byte, start, end roachpb.Key, maxIntents int64,
) (enginepb.MVCCStats, error)
// SetUpperBound installs a new upper bound for this iterator. The caller
// can modify the parameter after this function returns. This must not be a
// nil key. When Reader.ConsistentIterators is true, prefer creating a new
Expand Down
78 changes: 78 additions & 0 deletions pkg/storage/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,84 @@ func TestEngineDeleteIterRange(t *testing.T) {
})
}

func TestMVCCIteratorCheckForKeyCollisionsMaxIntents(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

keys := []string{"aa", "bb", "cc", "dd"}
intents := []string{"a", "b", "c"}

testcases := []struct {
maxIntents int64
expectIntents []string
}{
{maxIntents: -1, expectIntents: []string{"a"}},
{maxIntents: 0, expectIntents: []string{"a"}},
{maxIntents: 1, expectIntents: []string{"a"}},
{maxIntents: 2, expectIntents: []string{"a", "b"}},
{maxIntents: 3, expectIntents: []string{"a", "b", "c"}},
{maxIntents: 4, expectIntents: []string{"a", "b", "c"}},
}

// Create SST with keys equal to intents at txn2TS.
sstFile := &MemFile{}
sstWriter := MakeBackupSSTWriter(sstFile)
defer sstWriter.Close()
for _, k := range intents {
key := MVCCKey{Key: roachpb.Key(k), Timestamp: txn2TS}
value := roachpb.Value{}
value.SetString("sst")
value.InitChecksum(key.Key)
require.NoError(t, sstWriter.Put(key, value.RawBytes))
}
require.NoError(t, sstWriter.Finish())
sstWriter.Close()

for _, engineImpl := range mvccEngineImpls {
t.Run(engineImpl.name, func(t *testing.T) {
ctx := context.Background()
engine := engineImpl.create()
defer engine.Close()

// Write some committed keys and intents at txn1TS.
batch := engine.NewBatch()
for _, key := range keys {
require.NoError(t, batch.PutMVCC(
MVCCKey{Key: roachpb.Key(key), Timestamp: txn1TS}, []byte("value")))
}
for _, key := range intents {
require.NoError(t, MVCCPut(
ctx, batch, nil, roachpb.Key(key), txn1TS, roachpb.MakeValueFromString("intent"), txn1))
}
require.NoError(t, batch.Commit(true))
batch.Close()
require.NoError(t, engine.Flush())

for _, tc := range testcases {
t.Run(fmt.Sprintf("maxIntents=%d", tc.maxIntents), func(t *testing.T) {
// Provoke and check WriteIntentErrors.
iter := engine.NewMVCCIterator(
MVCCKeyAndIntentsIterKind, IterOptions{UpperBound: roachpb.Key("z")})
defer iter.Close()
iter.SeekGE(MVCCKey{Key: roachpb.Key("a")})

_, err := iter.CheckForKeyCollisions(
sstFile.Bytes(), roachpb.Key("a"), roachpb.Key("z"), tc.maxIntents)
require.Error(t, err)
writeIntentErr := &roachpb.WriteIntentError{}
require.ErrorAs(t, err, &writeIntentErr)

actual := []string{}
for _, i := range writeIntentErr.Intents {
actual = append(actual, string(i.Key))
}
require.Equal(t, tc.expectIntents, actual)
})
}
})
}
}

func TestSnapshot(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/intent_interleaving_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -942,9 +942,9 @@ func (i *intentInterleavingIter) FindSplitKey(
}

func (i *intentInterleavingIter) CheckForKeyCollisions(
sstData []byte, start, end roachpb.Key,
sstData []byte, start, end roachpb.Key, maxIntents int64,
) (enginepb.MVCCStats, error) {
return checkForKeyCollisionsGo(i, sstData, start, end)
return checkForKeyCollisionsGo(i, sstData, start, end, maxIntents)
}

func (i *intentInterleavingIter) SetUpperBound(key roachpb.Key) {
Expand Down
29 changes: 15 additions & 14 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4128,9 +4128,10 @@ func ComputeStatsForRange(
// is not considered a collision and we continue iteration from the next key in
// the existing data.
func checkForKeyCollisionsGo(
existingIter MVCCIterator, sstData []byte, start, end roachpb.Key,
existingIter MVCCIterator, sstData []byte, start, end roachpb.Key, maxIntents int64,
) (enginepb.MVCCStats, error) {
var skippedKVStats enginepb.MVCCStats
var intents []roachpb.Intent
sstIter, err := NewMemSSTIterator(sstData, false)
if err != nil {
return enginepb.MVCCStats{}, err
Expand Down Expand Up @@ -4164,20 +4165,17 @@ func checkForKeyCollisionsGo(
if len(mvccMeta.RawBytes) > 0 {
return enginepb.MVCCStats{}, errors.Errorf("inline values are unsupported when checking for key collisions")
} else if mvccMeta.Txn != nil {
// Check for a write intent.
//
// TODO(adityamaru): Currently, we raise a WriteIntentError on
// encountering all intents. This is because, we do not expect to
// encounter many intents during IMPORT INTO as we lock the key space we
// are importing into. Older write intents could however be found in the
// target key space, which will require appropriate resolution logic.
writeIntentErr := roachpb.WriteIntentError{
Intents: []roachpb.Intent{
roachpb.MakeIntent(mvccMeta.Txn, existingIter.Key().Key),
},
// Check for a write intent. We keep looking for additional intents to
// return a large batch for intent resolution. The caller will likely
// resolve the returned intents and retry the call, which would be
// quadratic, so this significantly reduces the overall number of scans.
intents = append(intents, roachpb.MakeIntent(mvccMeta.Txn, existingIter.Key().Key))
if int64(len(intents)) >= maxIntents {
return enginepb.MVCCStats{}, &roachpb.WriteIntentError{Intents: intents}
}

return enginepb.MVCCStats{}, &writeIntentErr
existingIter.NextKey()
ok, extErr = existingIter.Valid()
continue
} else {
return enginepb.MVCCStats{}, errors.Errorf("intent without transaction")
}
Expand Down Expand Up @@ -4254,6 +4252,9 @@ func checkForKeyCollisionsGo(
if sstErr != nil {
return enginepb.MVCCStats{}, sstErr
}
if len(intents) > 0 {
return enginepb.MVCCStats{}, &roachpb.WriteIntentError{Intents: intents}
}

return skippedKVStats, nil
}
4 changes: 2 additions & 2 deletions pkg/storage/pebble_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,9 +778,9 @@ func (p *pebbleIterator) SupportsPrev() bool {
// CheckForKeyCollisions indicates if the provided SST data collides with this
// iterator in the specified range.
func (p *pebbleIterator) CheckForKeyCollisions(
sstData []byte, start, end roachpb.Key,
sstData []byte, start, end roachpb.Key, maxIntents int64,
) (enginepb.MVCCStats, error) {
return checkForKeyCollisionsGo(p, sstData, start, end)
return checkForKeyCollisionsGo(p, sstData, start, end, maxIntents)
}

// GetRawIter is part of the EngineIterator interface.
Expand Down

0 comments on commit 75e83a0

Please sign in to comment.