Skip to content

Commit

Permalink
kvserver: add MVCC-compliant AddSSTable variant
Browse files Browse the repository at this point in the history
Previously, `AddSSTable` did not comply with MVCC or closed timestamp
invariants: MVCC timestamps in the SST were client-generated and used
as-is, which could mutate MVCC history and write below the closed
timestamp.

This patch adds a `WriteAtRequestTimestamp` parameter and corresponding
`MVCCAddSSTable` version gate, which will rewrite the SST MVCC
timestamps to be at the request's write timestamp and error out if they
conflict with existing MVCC writes.

The implementation here is correct but unoptimized, simply iterating
over the existing SST and writing a new one with updated timestamps.
This has significant room for improvement, which will be explored later.

An additional `Blind` parameter is added to write the SST blindly,
without checking for write conflicts. This is believed to uphold
serializability, but will yield incorrect MVCC statistics.

Release note: None
  • Loading branch information
erikgrinaker committed Oct 29, 2021
1 parent af360f8 commit 768d3b6
Show file tree
Hide file tree
Showing 15 changed files with 1,059 additions and 816 deletions.
8 changes: 8 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,10 @@ const (
// limits when splitting requests.
TargetBytesAvoidExcess

// MVCCAddSSTable supports MVCC-compliant AddSSTable requests via the new
// WriteAtRequestTimestamp parameter.
MVCCAddSSTable

// *************************************************
// Step (1): Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -473,6 +477,10 @@ var versionsSingleton = keyedVersions{
Key: TargetBytesAvoidExcess,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 4},
},
{
Key: MVCCAddSSTable,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 6},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 35 additions & 42 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func declareKeysAddSSTable(
// | Streaming replication | false | Key TS | Offline tenant |
//
args := req.(*roachpb.AddSSTableRequest)
if args.DisallowShadowing {
if (args.WriteAtRequestTimestamp && !args.Blind) || args.DisallowShadowing {
DefaultDeclareIsolatedKeys(rs, header, req, latchSpans, lockSpans)
} else {
DefaultDeclareKeys(rs, header, req, latchSpans, lockSpans)
Expand All @@ -110,20 +110,36 @@ func EvalAddSSTable(
h := cArgs.Header
ms := cArgs.Stats
mvccStartKey, mvccEndKey := storage.MVCCKey{Key: args.Key}, storage.MVCCKey{Key: args.EndKey}
sst := args.Data

if args.Blind {
if args.DisallowShadowing {
return result.Result{}, errors.New("Blind cannot be combined with DisallowShadowing")
}
if args.IngestAsWrites {
return result.Result{}, errors.New("Blind cannot be combined with IngestAsWrites")
}
}

var span *tracing.Span
var err error
ctx, span = tracing.ChildSpan(ctx, fmt.Sprintf("AddSSTable [%s,%s)", args.Key, args.EndKey))
defer span.Finish()
log.Eventf(ctx, "evaluating AddSSTable [%s,%s)", mvccStartKey.Key, mvccEndKey.Key)

// IMPORT INTO should not proceed if any KVs from the SST shadow existing data
// entries - #38044.
var skippedKVStats enginepb.MVCCStats
var err error
if args.DisallowShadowing {
if args.WriteAtRequestTimestamp {
sst, err = storage.UpdateSSTTimestamps(sst, h.WriteTimestamp())
if err != nil {
return result.Result{}, errors.Wrap(err, "updating SST timestamps")
}
}

var statsDiff enginepb.MVCCStats
checkConflicts := (args.WriteAtRequestTimestamp && !args.Blind) || args.DisallowShadowing
if checkConflicts {
maxIntents := storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV)
skippedKVStats, err = checkForKeyCollisions(
ctx, readWriter, mvccStartKey, mvccEndKey, args.Data, maxIntents)
statsDiff, err = storage.CheckSSTConflicts(
ctx, sst, readWriter, mvccStartKey, mvccEndKey, args.DisallowShadowing, maxIntents)
if err != nil {
return result.Result{}, errors.Wrap(err, "checking for key collisions")
}
Expand All @@ -132,7 +148,7 @@ func EvalAddSSTable(
// Verify that the keys in the sstable are within the range specified by the
// request header, and if the request did not include pre-computed stats,
// compute the expected MVCC stats delta of ingesting the SST.
dataIter, err := storage.NewMemSSTIterator(args.Data, true)
dataIter, err := storage.NewMemSSTIterator(sst, true)
if err != nil {
return result.Result{}, err
}
Expand All @@ -156,10 +172,9 @@ func EvalAddSSTable(
stats = *args.MVCCStats
}

// Stats are computed on-the-fly when shadowing of keys is disallowed. If we
// took the fast path and race is enabled, assert the stats were correctly
// computed.
verifyFastPath := args.DisallowShadowing && util.RaceEnabled
// Stats are computed on-the-fly when checking for conflicts. If we took the
// fast path and race is enabled, assert the stats were correctly computed.
verifyFastPath := checkConflicts && util.RaceEnabled
if args.MVCCStats == nil || verifyFastPath {
log.VEventf(ctx, 2, "computing MVCCStats for SSTable [%s,%s)", mvccStartKey.Key, mvccEndKey.Key)

Expand Down Expand Up @@ -243,8 +258,8 @@ func EvalAddSSTable(
// checking for the collision condition in C++ and subtract them from the
// stats of the SST being ingested before adding them to the running
// cumulative for this command. These stats can then be marked as accurate.
if args.DisallowShadowing {
stats.Subtract(skippedKVStats)
if checkConflicts {
stats.Add(statsDiff)
stats.ContainsEstimates = 0
} else {
stats.ContainsEstimates++
Expand All @@ -253,8 +268,8 @@ func EvalAddSSTable(
ms.Add(stats)

if args.IngestAsWrites {
span.RecordStructured(&types.StringValue{Value: fmt.Sprintf("ingesting SST (%d keys/%d bytes) via regular write batch", stats.KeyCount, len(args.Data))})
log.VEventf(ctx, 2, "ingesting SST (%d keys/%d bytes) via regular write batch", stats.KeyCount, len(args.Data))
span.RecordStructured(&types.StringValue{Value: fmt.Sprintf("ingesting SST (%d keys/%d bytes) via regular write batch", stats.KeyCount, len(sst))})
log.VEventf(ctx, 2, "ingesting SST (%d keys/%d bytes) via regular write batch", stats.KeyCount, len(sst))
dataIter.SeekGE(storage.MVCCKey{Key: keys.MinKey})
for {
ok, err := dataIter.Valid()
Expand All @@ -272,7 +287,7 @@ func EvalAddSSTable(
return result.Result{}, err
}
} else {
if err := readWriter.PutMVCC(dataIter.UnsafeKey(), dataIter.UnsafeValue()); err != nil {
if err := readWriter.PutMVCC(k, dataIter.UnsafeValue()); err != nil {
return result.Result{}, err
}
}
Expand All @@ -284,31 +299,9 @@ func EvalAddSSTable(
return result.Result{
Replicated: kvserverpb.ReplicatedEvalResult{
AddSSTable: &kvserverpb.ReplicatedEvalResult_AddSSTable{
Data: args.Data,
CRC32: util.CRC32(args.Data),
Data: sst,
CRC32: util.CRC32(sst),
},
},
}, nil
}

func checkForKeyCollisions(
_ context.Context,
reader storage.Reader,
mvccStartKey storage.MVCCKey,
mvccEndKey storage.MVCCKey,
data []byte,
maxIntents int64,
) (enginepb.MVCCStats, error) {
// Create iterator over the existing data.
existingDataIter := reader.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{UpperBound: mvccEndKey.Key})
defer existingDataIter.Close()
existingDataIter.SeekGE(mvccStartKey)
if ok, err := existingDataIter.Valid(); err != nil {
return enginepb.MVCCStats{}, errors.Wrap(err, "checking for key collisions")
} else if !ok {
// Target key range is empty, so it is safe to ingest.
return enginepb.MVCCStats{}, nil
}

return existingDataIter.CheckForKeyCollisions(data, mvccStartKey.Key, mvccEndKey.Key, maxIntents)
}
20 changes: 10 additions & 10 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
}

_, err := batcheval.EvalAddSSTable(ctx, e, cArgs, nil)
if !testutils.IsError(err, "ingested key collides with an existing one: \"a\"") {
if !testutils.IsError(err, "SST key \"a\"/0.000000007,0 shadows existing key \"a\"/0.000000002,0") {
t.Fatalf("%+v", err)
}
}
Expand All @@ -598,7 +598,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
}

_, err := batcheval.EvalAddSSTable(ctx, e, cArgs, nil)
if !testutils.IsError(err, "ingested key collides with an existing one: \"g\"") {
if !testutils.IsError(err, "WriteTooOldError: write at timestamp 0.000000004,0 too old; wrote at 0.000000005,1") {
t.Fatalf("%+v", err)
}
}
Expand Down Expand Up @@ -627,7 +627,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
}

_, err := batcheval.EvalAddSSTable(ctx, e, cArgs, nil)
if !testutils.IsError(err, "ingested key collides with an existing one: \"z\"") {
if !testutils.IsError(err, "SST key \"z\"/0.000000003,0 shadows existing key \"z\"/0.000000002,0") {
t.Fatalf("%+v", err)
}
}
Expand Down Expand Up @@ -689,7 +689,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
}

_, err := batcheval.EvalAddSSTable(ctx, e, cArgs, nil)
if !testutils.IsError(err, "ingested key collides with an existing one: \"y\"") {
if !testutils.IsError(err, "WriteTooOldError: write at timestamp 0.000000003,0 too old; wrote at 0.000000005,1") {
t.Fatalf("%+v", err)
}
}
Expand Down Expand Up @@ -718,7 +718,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
}

_, err := batcheval.EvalAddSSTable(ctx, e, cArgs, nil)
if !testutils.IsError(err, "ingested key collides with an existing one: \"b\"") {
if !testutils.IsError(err, "WriteTooOldError: write at timestamp 0.000000004,0 too old; wrote at 0.000000006,1") {
t.Fatalf("%+v", err)
}
}
Expand Down Expand Up @@ -748,7 +748,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
}

_, err := batcheval.EvalAddSSTable(ctx, e, cArgs, nil)
if !testutils.IsError(err, "ingested key collides with an existing one: \"y\"") {
if !testutils.IsError(err, "WriteTooOldError: write at timestamp 0.000000004,0 too old; wrote at 0.000000005,1") {
t.Fatalf("%+v", err)
}
}
Expand Down Expand Up @@ -840,7 +840,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
}

_, err := batcheval.EvalAddSSTable(ctx, e, cArgs, nil)
if !testutils.IsError(err, "inline values are unsupported when checking for key collisions") {
if !testutils.IsError(err, "inline values are unsupported") {
t.Fatalf("%+v", err)
}
}
Expand Down Expand Up @@ -900,7 +900,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
}

_, err := batcheval.EvalAddSSTable(ctx, e, cArgs, nil)
if !testutils.IsError(err, "ingested key collides with an existing one: \"y\"") {
if !testutils.IsError(err, "SST key \"y\"/0.000000006,0 shadows existing key \"y\"/0.000000005,0") {
t.Fatalf("%+v", err)
}
}
Expand Down Expand Up @@ -929,7 +929,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
}

_, err := batcheval.EvalAddSSTable(ctx, e, cArgs, nil)
if !testutils.IsError(err, "ingested key collides with an existing one: \"y\"") {
if !testutils.IsError(err, "WriteTooOldError: write at timestamp 0.000000005,0 too old; wrote at 0.000000005,1") {
t.Fatalf("%+v", err)
}
}
Expand Down Expand Up @@ -958,7 +958,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
}

_, err := batcheval.EvalAddSSTable(ctx, e, cArgs, nil)
if !testutils.IsError(err, "ingested key collides with an existing one: \"z\"") {
if !testutils.IsError(err, "SST key \"z\"/0.000000003,0 shadows existing key \"z\"/0.000000002,0") {
t.Fatalf("%+v", err)
}
}
Expand Down
7 changes: 0 additions & 7 deletions pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,6 @@ func (i *MVCCIterator) FindSplitKey(
return i.i.FindSplitKey(start, end, minSplitKey, targetSize)
}

// CheckForKeyCollisions is part of the storage.MVCCIterator interface.
func (i *MVCCIterator) CheckForKeyCollisions(
sstData []byte, start, end roachpb.Key, maxIntents int64,
) (enginepb.MVCCStats, error) {
return i.i.CheckForKeyCollisions(sstData, start, end, maxIntents)
}

// SetUpperBound is part of the storage.MVCCIterator interface.
func (i *MVCCIterator) SetUpperBound(key roachpb.Key) {
i.i.SetUpperBound(key)
Expand Down
Loading

0 comments on commit 768d3b6

Please sign in to comment.