Skip to content

Commit

Permalink
kvserver: add merged SSTTimestampToRequestTimestamp param for `AddS…
Browse files Browse the repository at this point in the history
…STable`

This merges the `AddSSTable` parameters `WriteAtRequestTimestamp` and
`SSTTimestamp` into a new `SSTTimestampToRequestTimestamp` parameter.
This parameter specifies the MVCC timestamp of all existing timestamps
in the SST, which will be rewritten to the request timestamp if they
differ (e.g. if the request gets pushed).

Both of the replaced parameters are new in 22.1, so this does not
require a version gate and allows changing the type of one of the
existing Protobuf fields.

Release note: None
  • Loading branch information
erikgrinaker committed Feb 21, 2022
1 parent 7746828 commit ef6a82b
Show file tree
Hide file tree
Showing 13 changed files with 334 additions and 316 deletions.
2 changes: 1 addition & 1 deletion pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ const (
// bundles when the query latency exceeds the user provided threshold.
AlterSystemStmtDiagReqs
// MVCCAddSSTable supports MVCC-compliant AddSSTable requests via the new
// WriteAtRequestTimestamp and DisallowConflicts parameters.
// SSTTimestampToRequestTimestamp and DisallowConflicts parameters.
MVCCAddSSTable
// InsertPublicSchemaNamespaceEntryOnRestore ensures all public schemas
// have an entry in system.namespace upon being restored.
Expand Down
18 changes: 8 additions & 10 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,8 +776,7 @@ func (b *Batch) addSSTable(
disallowShadowingBelow hlc.Timestamp,
stats *enginepb.MVCCStats,
ingestAsWrites bool,
writeAtRequestTimestamp bool,
sstTimestamp hlc.Timestamp,
sstTimestampToRequestTimestamp hlc.Timestamp,
) {
begin, err := marshalKey(s)
if err != nil {
Expand All @@ -794,14 +793,13 @@ func (b *Batch) addSSTable(
Key: begin,
EndKey: end,
},
Data: data,
DisallowConflicts: disallowConflicts,
DisallowShadowing: disallowShadowing,
DisallowShadowingBelow: disallowShadowingBelow,
MVCCStats: stats,
IngestAsWrites: ingestAsWrites,
WriteAtRequestTimestamp: writeAtRequestTimestamp,
SSTTimestamp: sstTimestamp,
Data: data,
DisallowConflicts: disallowConflicts,
DisallowShadowing: disallowShadowing,
DisallowShadowingBelow: disallowShadowingBelow,
MVCCStats: stats,
IngestAsWrites: ingestAsWrites,
SSTTimestampToRequestTimestamp: sstTimestampToRequestTimestamp,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ func (db *DB) AddSSTable(
) error {
b := &Batch{Header: roachpb.Header{Timestamp: batchTs}}
b.addSSTable(begin, end, data, disallowConflicts, disallowShadowing, disallowShadowingBelow,
stats, ingestAsWrites, false /* writeAtBatchTS */, hlc.Timestamp{} /* sstTimestamp */)
stats, ingestAsWrites, hlc.Timestamp{} /* sstTimestampToRequestTimestamp */)
return getOneErr(db.Run(ctx, b), b)
}

Expand All @@ -711,7 +711,7 @@ func (db *DB) AddSSTableAtBatchTimestamp(
) (hlc.Timestamp, error) {
b := &Batch{Header: roachpb.Header{Timestamp: batchTs}}
b.addSSTable(begin, end, data, disallowConflicts, disallowShadowing, disallowShadowingBelow,
stats, ingestAsWrites, true /* writeAtBatchTS */, batchTs)
stats, ingestAsWrites, batchTs)
err := getOneErr(db.Run(ctx, b), b)
if err != nil {
return hlc.Timestamp{}, err
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/rangefeed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ func WithOnCheckpoint(f OnCheckpoint) Option {
// requested by WithDiff, and callers must obtain these themselves if needed.
//
// Also note that AddSSTable requests that do not set the
// WriteAtRequestTimestamp flag, possibly writing below the closed timestamp,
// will cause affected rangefeeds to be disconnected with a terminal
// SSTTimestampToRequestTimestamp param, possibly writing below the closed
// timestamp, will cause affected rangefeeds to be disconnected with a terminal
// MVCCHistoryMutationError and thus will not be emitted here -- there should be
// no such requests into spans with rangefeeds across them, but it is up to
// callers to ensure this.
Expand Down
44 changes: 21 additions & 23 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ func init() {
RegisterReadWriteCommand(roachpb.AddSSTable, DefaultDeclareIsolatedKeys, EvalAddSSTable)
}

var addSSTableRewriteConcurrency = settings.RegisterIntSetting(
// AddSSTableRewriteConcurrency sets the concurrency of a single SST rewrite.
var AddSSTableRewriteConcurrency = settings.RegisterIntSetting(
settings.SystemOnly,
"kv.bulk_io_write.sst_rewrite_concurrency.per_call",
"concurrency to use when rewriting a sstable timestamps by block, or 0 to use a loop",
"concurrency to use when rewriting sstable timestamps by block, or 0 to use a loop",
int64(util.ConstantWithMetamorphicTestRange("addsst-rewrite-concurrency", 0, 0, 16)),
settings.NonNegativeInt,
)
Expand All @@ -56,35 +57,32 @@ func EvalAddSSTable(
ms := cArgs.Stats
start, end := storage.MVCCKey{Key: args.Key}, storage.MVCCKey{Key: args.EndKey}
sst := args.Data
sstToReqTS := args.SSTTimestampToRequestTimestamp

var span *tracing.Span
var err error
ctx, span = tracing.ChildSpan(ctx, fmt.Sprintf("AddSSTable [%s,%s)", start.Key, end.Key))
defer span.Finish()
log.Eventf(ctx, "evaluating AddSSTable [%s,%s)", start.Key, end.Key)

// Under the race detector, scan the SST contents and make sure it satisfies
// the AddSSTable requirements. We do not always perform these checks
// otherwise, due to the cost.
// Under the race detector, check that the SST contents satisfy AddSSTable
// requirements. We don't always do this otherwise, due to the cost.
if util.RaceEnabled {
if err := assertSSTContents(sst, args.SSTTimestamp, args.MVCCStats); err != nil {
if err := assertSSTContents(sst, sstToReqTS, args.MVCCStats); err != nil {
return result.Result{}, err
}
}

// If requested, rewrite the SST's MVCC timestamps to the request timestamp.
// This ensures the writes comply with the timestamp cache and closed
// timestamp, i.e. by not writing to timestamps that have already been
// observed or closed.
if args.WriteAtRequestTimestamp &&
(args.SSTTimestamp.IsEmpty() || h.Timestamp != args.SSTTimestamp ||
util.ConstantWithMetamorphicTestBool("addsst-rewrite-forced", false)) {
// If requested and necessary, rewrite the SST's MVCC timestamps to the
// request timestamp. This ensures the writes comply with the timestamp cache
// and closed timestamp, i.e. by not writing to timestamps that have already
// been observed or closed.
if sstToReqTS.IsSet() && (h.Timestamp != sstToReqTS ||
util.ConstantWithMetamorphicTestBool("addsst-rewrite-forced", false)) {
st := cArgs.EvalCtx.ClusterSettings()
// TODO(dt): use a quotapool.
concurrency := int(addSSTableRewriteConcurrency.Get(&st.SV))
sst, err = storage.UpdateSSTTimestamps(
ctx, st, sst, args.SSTTimestamp, h.Timestamp, concurrency,
)
conc := int(AddSSTableRewriteConcurrency.Get(&cArgs.EvalCtx.ClusterSettings().SV))
sst, err = storage.UpdateSSTTimestamps(ctx, st, sst, sstToReqTS, h.Timestamp, conc)
if err != nil {
return result.Result{}, errors.Wrap(err, "updating SST timestamps")
}
Expand Down Expand Up @@ -226,7 +224,7 @@ func EvalAddSSTable(
ms.Add(stats)

var mvccHistoryMutation *kvserverpb.ReplicatedEvalResult_MVCCHistoryMutation
if !args.WriteAtRequestTimestamp {
if sstToReqTS.IsEmpty() {
mvccHistoryMutation = &kvserverpb.ReplicatedEvalResult_MVCCHistoryMutation{
Spans: []roachpb.Span{{Key: start.Key, EndKey: end.Key}},
}
Expand Down Expand Up @@ -261,7 +259,7 @@ func EvalAddSSTable(
// write by not allowing writing below existing keys, and we want to
// retain parity with regular SST ingestion which does allow this. We
// therefore record these operations ourselves.
if args.WriteAtRequestTimestamp {
if sstToReqTS.IsSet() {
readWriter.LogLogicalOp(storage.MVCCWriteValueOpType, storage.MVCCLogicalOpDetails{
Key: k.Key,
Timestamp: k.Timestamp,
Expand All @@ -287,7 +285,7 @@ func EvalAddSSTable(
Data: sst,
CRC32: util.CRC32(sst),
Span: roachpb.Span{Key: start.Key, EndKey: end.Key},
AtWriteTimestamp: args.WriteAtRequestTimestamp,
AtWriteTimestamp: sstToReqTS.IsSet(),
},
MVCCHistoryMutation: mvccHistoryMutation,
},
Expand Down Expand Up @@ -325,9 +323,9 @@ func assertSSTContents(sst []byte, sstTimestamp hlc.Timestamp, stats *enginepb.M
if len(value) == 0 {
return errors.AssertionFailedf("SST contains tombstone for key %s", key)
}
if !sstTimestamp.IsEmpty() && key.Timestamp != sstTimestamp {
return errors.AssertionFailedf("SST has incorrect timestamp %s for SST key %s (expected %s)",
key.Timestamp, key.Key, sstTimestamp)
if sstTimestamp.IsSet() && key.Timestamp != sstTimestamp {
return errors.AssertionFailedf("SST has unexpected timestamp %s (expected %s) for key %s",
key.Timestamp, sstTimestamp, key.Key)
}
iter.Next()
}
Expand Down
Loading

0 comments on commit ef6a82b

Please sign in to comment.