Skip to content

Commit

Permalink
kvserver: add MVCC-compliant AddSSTable variant
Browse files Browse the repository at this point in the history
Previously, `AddSSTable` did not comply with MVCC or closed timestamp
invariants: MVCC timestamps in the SST were client-generated and used
as-is, which could mutate MVCC history and write below the closed
timestamp.

This patch adds a `WriteAtRequestTimestamp` parameter and corresponding
`MVCCAddSSTable` version gate, which will rewrite the SST MVCC
timestamps to be at the request's write timestamp and error out if they
conflict with existing MVCC writes.

The implementation here is correct but unoptimized, simply iterating
over the existing SST and writing a new one with updated timestamps.
This has significant room for improvement, which will be explored later.

An additional `Blind` parameter is added to write the SST blindly,
without checking for write conflicts. This is believed to uphold
serializability, but will yield incorrect MVCC statistics.

Release note: None
  • Loading branch information
erikgrinaker committed Oct 29, 2021
1 parent af360f8 commit 2598c11
Show file tree
Hide file tree
Showing 15 changed files with 1,405 additions and 820 deletions.
8 changes: 8 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,10 @@ const (
// limits when splitting requests.
TargetBytesAvoidExcess

// MVCCAddSSTable supports MVCC-compliant AddSSTable requests via the new
// WriteAtRequestTimestamp parameter.
MVCCAddSSTable

// *************************************************
// Step (1): Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -473,6 +477,10 @@ var versionsSingleton = keyedVersions{
Key: TargetBytesAvoidExcess,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 4},
},
{
Key: MVCCAddSSTable,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 6},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

93 changes: 51 additions & 42 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func declareKeysAddSSTable(
// | Streaming replication | false | Key TS | Offline tenant |
//
args := req.(*roachpb.AddSSTableRequest)
if args.DisallowShadowing {
if (args.WriteAtRequestTimestamp && !args.Blind) || args.DisallowShadowing {
DefaultDeclareIsolatedKeys(rs, header, req, latchSpans, lockSpans)
} else {
DefaultDeclareKeys(rs, header, req, latchSpans, lockSpans)
Expand All @@ -110,20 +110,52 @@ func EvalAddSSTable(
h := cArgs.Header
ms := cArgs.Stats
mvccStartKey, mvccEndKey := storage.MVCCKey{Key: args.Key}, storage.MVCCKey{Key: args.EndKey}
sst := args.Data

if args.Blind {
if args.DisallowShadowing {
return result.Result{}, errors.New("Blind cannot be combined with DisallowShadowing")
}
if args.IngestAsWrites {
return result.Result{}, errors.New("Blind cannot be combined with IngestAsWrites")
}
}
if args.Blind && !h.TimestampFromServerClock {
// FIXME: We require this in the hope that the server-assigned timestamp
// will be unique, such that it won't replace an existing key at the same
// timestamp. However, this is likely not sufficient, because even though
// the HLC time must be the highest seen timestamp at the time the store
// receives the request, other requests received later with a past timestamp
// (equal to ours) may still be evaluated before us. Also, this is still
// vulnerable to pushes, e.g. by the closed timestamp, which may not be
// unique.
//
// Furthermore, this currently can't be pushed by the closed timestamp,
// because AddSSTable does not have the isIntentWrite flag (used to detect
// MVCC writes). The lack of this flag follows from !isTxn. This needs to be
// addressed such that it respects the closed timestamp.
return result.Result{}, errors.New("Blind writes must use server-assigned timestamps")
}

var span *tracing.Span
var err error
ctx, span = tracing.ChildSpan(ctx, fmt.Sprintf("AddSSTable [%s,%s)", args.Key, args.EndKey))
defer span.Finish()
log.Eventf(ctx, "evaluating AddSSTable [%s,%s)", mvccStartKey.Key, mvccEndKey.Key)

// IMPORT INTO should not proceed if any KVs from the SST shadow existing data
// entries - #38044.
var skippedKVStats enginepb.MVCCStats
var err error
if args.DisallowShadowing {
if args.WriteAtRequestTimestamp {
sst, err = storage.UpdateSSTTimestamps(sst, h.WriteTimestamp())
if err != nil {
return result.Result{}, errors.Wrap(err, "updating SST timestamps")
}
}

var statsDiff enginepb.MVCCStats
checkConflicts := (args.WriteAtRequestTimestamp && !args.Blind) || args.DisallowShadowing
if checkConflicts {
maxIntents := storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV)
skippedKVStats, err = checkForKeyCollisions(
ctx, readWriter, mvccStartKey, mvccEndKey, args.Data, maxIntents)
statsDiff, err = storage.CheckSSTConflicts(
ctx, sst, readWriter, mvccStartKey, mvccEndKey, args.DisallowShadowing, maxIntents)
if err != nil {
return result.Result{}, errors.Wrap(err, "checking for key collisions")
}
Expand All @@ -132,7 +164,7 @@ func EvalAddSSTable(
// Verify that the keys in the sstable are within the range specified by the
// request header, and if the request did not include pre-computed stats,
// compute the expected MVCC stats delta of ingesting the SST.
dataIter, err := storage.NewMemSSTIterator(args.Data, true)
dataIter, err := storage.NewMemSSTIterator(sst, true)
if err != nil {
return result.Result{}, err
}
Expand All @@ -156,10 +188,9 @@ func EvalAddSSTable(
stats = *args.MVCCStats
}

// Stats are computed on-the-fly when shadowing of keys is disallowed. If we
// took the fast path and race is enabled, assert the stats were correctly
// computed.
verifyFastPath := args.DisallowShadowing && util.RaceEnabled
// Stats are computed on-the-fly when checking for conflicts. If we took the
// fast path and race is enabled, assert the stats were correctly computed.
verifyFastPath := checkConflicts && util.RaceEnabled
if args.MVCCStats == nil || verifyFastPath {
log.VEventf(ctx, 2, "computing MVCCStats for SSTable [%s,%s)", mvccStartKey.Key, mvccEndKey.Key)

Expand Down Expand Up @@ -243,8 +274,8 @@ func EvalAddSSTable(
// checking for the collision condition in C++ and subtract them from the
// stats of the SST being ingested before adding them to the running
// cumulative for this command. These stats can then be marked as accurate.
if args.DisallowShadowing {
stats.Subtract(skippedKVStats)
if checkConflicts {
stats.Add(statsDiff)
stats.ContainsEstimates = 0
} else {
stats.ContainsEstimates++
Expand All @@ -253,8 +284,8 @@ func EvalAddSSTable(
ms.Add(stats)

if args.IngestAsWrites {
span.RecordStructured(&types.StringValue{Value: fmt.Sprintf("ingesting SST (%d keys/%d bytes) via regular write batch", stats.KeyCount, len(args.Data))})
log.VEventf(ctx, 2, "ingesting SST (%d keys/%d bytes) via regular write batch", stats.KeyCount, len(args.Data))
span.RecordStructured(&types.StringValue{Value: fmt.Sprintf("ingesting SST (%d keys/%d bytes) via regular write batch", stats.KeyCount, len(sst))})
log.VEventf(ctx, 2, "ingesting SST (%d keys/%d bytes) via regular write batch", stats.KeyCount, len(sst))
dataIter.SeekGE(storage.MVCCKey{Key: keys.MinKey})
for {
ok, err := dataIter.Valid()
Expand All @@ -272,7 +303,7 @@ func EvalAddSSTable(
return result.Result{}, err
}
} else {
if err := readWriter.PutMVCC(dataIter.UnsafeKey(), dataIter.UnsafeValue()); err != nil {
if err := readWriter.PutMVCC(k, dataIter.UnsafeValue()); err != nil {
return result.Result{}, err
}
}
Expand All @@ -284,31 +315,9 @@ func EvalAddSSTable(
return result.Result{
Replicated: kvserverpb.ReplicatedEvalResult{
AddSSTable: &kvserverpb.ReplicatedEvalResult_AddSSTable{
Data: args.Data,
CRC32: util.CRC32(args.Data),
Data: sst,
CRC32: util.CRC32(sst),
},
},
}, nil
}

func checkForKeyCollisions(
_ context.Context,
reader storage.Reader,
mvccStartKey storage.MVCCKey,
mvccEndKey storage.MVCCKey,
data []byte,
maxIntents int64,
) (enginepb.MVCCStats, error) {
// Create iterator over the existing data.
existingDataIter := reader.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{UpperBound: mvccEndKey.Key})
defer existingDataIter.Close()
existingDataIter.SeekGE(mvccStartKey)
if ok, err := existingDataIter.Valid(); err != nil {
return enginepb.MVCCStats{}, errors.Wrap(err, "checking for key collisions")
} else if !ok {
// Target key range is empty, so it is safe to ingest.
return enginepb.MVCCStats{}, nil
}

return existingDataIter.CheckForKeyCollisions(data, mvccStartKey.Key, mvccEndKey.Key, maxIntents)
}
Loading

0 comments on commit 2598c11

Please sign in to comment.