From ef6a82bcc3083a3e336c4c6814003dbbafcdf52c Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Sat, 5 Feb 2022 13:34:55 +0000 Subject: [PATCH] kvserver: add merged `SSTTimestampToRequestTimestamp` param for `AddSSTable` This merges the `AddSSTable` parameters `WriteAtRequestTimestamp` and `SSTTimestamp` into a new `SSTTimestampToRequestTimestamp` parameter. This parameter specifies the MVCC timestamp of all existing timestamps in the SST, which will be rewritten to the request timestamp if they differ (e.g. if the request gets pushed). Both of the replaced parameters are new in 22.1, so this does not require a version gate and allows changing the type of one of the existing Protobuf fields. Release note: None --- pkg/clusterversion/cockroach_versions.go | 2 +- pkg/kv/batch.go | 18 +- pkg/kv/db.go | 4 +- pkg/kv/kvclient/rangefeed/config.go | 4 +- pkg/kv/kvserver/batcheval/cmd_add_sstable.go | 44 +- .../batcheval/cmd_add_sstable_test.go | 461 +++++++++--------- pkg/kv/kvserver/kvserverpb/proposer_kv.proto | 4 +- pkg/kv/kvserver/replica_rangefeed.go | 4 +- pkg/roachpb/api.go | 2 +- pkg/roachpb/api.proto | 44 +- pkg/roachpb/api_test.go | 3 +- pkg/storage/sst.go | 36 +- pkg/storage/sst_test.go | 24 +- 13 files changed, 334 insertions(+), 316 deletions(-) diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 77c95b0454f9..d79ab5197ac3 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -186,7 +186,7 @@ const ( // bundles when the query latency exceeds the user provided threshold. AlterSystemStmtDiagReqs // MVCCAddSSTable supports MVCC-compliant AddSSTable requests via the new - // WriteAtRequestTimestamp and DisallowConflicts parameters. + // SSTTimestampToRequestTimestamp and DisallowConflicts parameters. MVCCAddSSTable // InsertPublicSchemaNamespaceEntryOnRestore ensures all public schemas // have an entry in system.namespace upon being restored. diff --git a/pkg/kv/batch.go b/pkg/kv/batch.go index 320dc2483a23..3c248d9ab3a8 100644 --- a/pkg/kv/batch.go +++ b/pkg/kv/batch.go @@ -776,8 +776,7 @@ func (b *Batch) addSSTable( disallowShadowingBelow hlc.Timestamp, stats *enginepb.MVCCStats, ingestAsWrites bool, - writeAtRequestTimestamp bool, - sstTimestamp hlc.Timestamp, + sstTimestampToRequestTimestamp hlc.Timestamp, ) { begin, err := marshalKey(s) if err != nil { @@ -794,14 +793,13 @@ func (b *Batch) addSSTable( Key: begin, EndKey: end, }, - Data: data, - DisallowConflicts: disallowConflicts, - DisallowShadowing: disallowShadowing, - DisallowShadowingBelow: disallowShadowingBelow, - MVCCStats: stats, - IngestAsWrites: ingestAsWrites, - WriteAtRequestTimestamp: writeAtRequestTimestamp, - SSTTimestamp: sstTimestamp, + Data: data, + DisallowConflicts: disallowConflicts, + DisallowShadowing: disallowShadowing, + DisallowShadowingBelow: disallowShadowingBelow, + MVCCStats: stats, + IngestAsWrites: ingestAsWrites, + SSTTimestampToRequestTimestamp: sstTimestampToRequestTimestamp, } b.appendReqs(req) b.initResult(1, 0, notRaw, nil) diff --git a/pkg/kv/db.go b/pkg/kv/db.go index 88fd9b841329..05e834a1a220 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -688,7 +688,7 @@ func (db *DB) AddSSTable( ) error { b := &Batch{Header: roachpb.Header{Timestamp: batchTs}} b.addSSTable(begin, end, data, disallowConflicts, disallowShadowing, disallowShadowingBelow, - stats, ingestAsWrites, false /* writeAtBatchTS */, hlc.Timestamp{} /* sstTimestamp */) + stats, ingestAsWrites, hlc.Timestamp{} /* sstTimestampToRequestTimestamp */) return getOneErr(db.Run(ctx, b), b) } @@ -711,7 +711,7 @@ func (db *DB) AddSSTableAtBatchTimestamp( ) (hlc.Timestamp, error) { b := &Batch{Header: roachpb.Header{Timestamp: batchTs}} b.addSSTable(begin, end, data, disallowConflicts, disallowShadowing, disallowShadowingBelow, - stats, ingestAsWrites, true /* writeAtBatchTS */, batchTs) + stats, ingestAsWrites, batchTs) err := getOneErr(db.Run(ctx, b), b) if err != nil { return hlc.Timestamp{}, err diff --git a/pkg/kv/kvclient/rangefeed/config.go b/pkg/kv/kvclient/rangefeed/config.go index 77747b3ef3bf..56a0e235730a 100644 --- a/pkg/kv/kvclient/rangefeed/config.go +++ b/pkg/kv/kvclient/rangefeed/config.go @@ -163,8 +163,8 @@ func WithOnCheckpoint(f OnCheckpoint) Option { // requested by WithDiff, and callers must obtain these themselves if needed. // // Also note that AddSSTable requests that do not set the -// WriteAtRequestTimestamp flag, possibly writing below the closed timestamp, -// will cause affected rangefeeds to be disconnected with a terminal +// SSTTimestampToRequestTimestamp param, possibly writing below the closed +// timestamp, will cause affected rangefeeds to be disconnected with a terminal // MVCCHistoryMutationError and thus will not be emitted here -- there should be // no such requests into spans with rangefeeds across them, but it is up to // callers to ensure this. diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go index bb8faf9ac2b8..d123dc0b0bcc 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go @@ -38,10 +38,11 @@ func init() { RegisterReadWriteCommand(roachpb.AddSSTable, DefaultDeclareIsolatedKeys, EvalAddSSTable) } -var addSSTableRewriteConcurrency = settings.RegisterIntSetting( +// AddSSTableRewriteConcurrency sets the concurrency of a single SST rewrite. +var AddSSTableRewriteConcurrency = settings.RegisterIntSetting( settings.SystemOnly, "kv.bulk_io_write.sst_rewrite_concurrency.per_call", - "concurrency to use when rewriting a sstable timestamps by block, or 0 to use a loop", + "concurrency to use when rewriting sstable timestamps by block, or 0 to use a loop", int64(util.ConstantWithMetamorphicTestRange("addsst-rewrite-concurrency", 0, 0, 16)), settings.NonNegativeInt, ) @@ -56,6 +57,7 @@ func EvalAddSSTable( ms := cArgs.Stats start, end := storage.MVCCKey{Key: args.Key}, storage.MVCCKey{Key: args.EndKey} sst := args.Data + sstToReqTS := args.SSTTimestampToRequestTimestamp var span *tracing.Span var err error @@ -63,28 +65,24 @@ func EvalAddSSTable( defer span.Finish() log.Eventf(ctx, "evaluating AddSSTable [%s,%s)", start.Key, end.Key) - // Under the race detector, scan the SST contents and make sure it satisfies - // the AddSSTable requirements. We do not always perform these checks - // otherwise, due to the cost. + // Under the race detector, check that the SST contents satisfy AddSSTable + // requirements. We don't always do this otherwise, due to the cost. if util.RaceEnabled { - if err := assertSSTContents(sst, args.SSTTimestamp, args.MVCCStats); err != nil { + if err := assertSSTContents(sst, sstToReqTS, args.MVCCStats); err != nil { return result.Result{}, err } } - // If requested, rewrite the SST's MVCC timestamps to the request timestamp. - // This ensures the writes comply with the timestamp cache and closed - // timestamp, i.e. by not writing to timestamps that have already been - // observed or closed. - if args.WriteAtRequestTimestamp && - (args.SSTTimestamp.IsEmpty() || h.Timestamp != args.SSTTimestamp || - util.ConstantWithMetamorphicTestBool("addsst-rewrite-forced", false)) { + // If requested and necessary, rewrite the SST's MVCC timestamps to the + // request timestamp. This ensures the writes comply with the timestamp cache + // and closed timestamp, i.e. by not writing to timestamps that have already + // been observed or closed. + if sstToReqTS.IsSet() && (h.Timestamp != sstToReqTS || + util.ConstantWithMetamorphicTestBool("addsst-rewrite-forced", false)) { st := cArgs.EvalCtx.ClusterSettings() // TODO(dt): use a quotapool. - concurrency := int(addSSTableRewriteConcurrency.Get(&st.SV)) - sst, err = storage.UpdateSSTTimestamps( - ctx, st, sst, args.SSTTimestamp, h.Timestamp, concurrency, - ) + conc := int(AddSSTableRewriteConcurrency.Get(&cArgs.EvalCtx.ClusterSettings().SV)) + sst, err = storage.UpdateSSTTimestamps(ctx, st, sst, sstToReqTS, h.Timestamp, conc) if err != nil { return result.Result{}, errors.Wrap(err, "updating SST timestamps") } @@ -226,7 +224,7 @@ func EvalAddSSTable( ms.Add(stats) var mvccHistoryMutation *kvserverpb.ReplicatedEvalResult_MVCCHistoryMutation - if !args.WriteAtRequestTimestamp { + if sstToReqTS.IsEmpty() { mvccHistoryMutation = &kvserverpb.ReplicatedEvalResult_MVCCHistoryMutation{ Spans: []roachpb.Span{{Key: start.Key, EndKey: end.Key}}, } @@ -261,7 +259,7 @@ func EvalAddSSTable( // write by not allowing writing below existing keys, and we want to // retain parity with regular SST ingestion which does allow this. We // therefore record these operations ourselves. - if args.WriteAtRequestTimestamp { + if sstToReqTS.IsSet() { readWriter.LogLogicalOp(storage.MVCCWriteValueOpType, storage.MVCCLogicalOpDetails{ Key: k.Key, Timestamp: k.Timestamp, @@ -287,7 +285,7 @@ func EvalAddSSTable( Data: sst, CRC32: util.CRC32(sst), Span: roachpb.Span{Key: start.Key, EndKey: end.Key}, - AtWriteTimestamp: args.WriteAtRequestTimestamp, + AtWriteTimestamp: sstToReqTS.IsSet(), }, MVCCHistoryMutation: mvccHistoryMutation, }, @@ -325,9 +323,9 @@ func assertSSTContents(sst []byte, sstTimestamp hlc.Timestamp, stats *enginepb.M if len(value) == 0 { return errors.AssertionFailedf("SST contains tombstone for key %s", key) } - if !sstTimestamp.IsEmpty() && key.Timestamp != sstTimestamp { - return errors.AssertionFailedf("SST has incorrect timestamp %s for SST key %s (expected %s)", - key.Timestamp, key.Key, sstTimestamp) + if sstTimestamp.IsSet() && key.Timestamp != sstTimestamp { + return errors.AssertionFailedf("SST has unexpected timestamp %s (expected %s) for key %s", + key.Timestamp, sstTimestamp, key.Key) } iter.Next() } diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go index ce05f70b1f63..7633119980ea 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go @@ -48,21 +48,20 @@ func TestEvalAddSSTable(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - const defaultReqTS = 10 // request sent with this timestamp by default - const intentTS = 100 // values with this timestamp are written as intents + const intentTS = 100 // values with this timestamp are written as intents - // These are run with IngestAsWrites both disabled and enabled. - // nolint:composites + // These are run with IngestAsWrites both disabled and enabled, and + // kv.bulk_io_write.sst_rewrite_concurrency.per_call of 0 and 8. testcases := map[string]struct { data []sstutil.KV sst []sstutil.KV - sstTimestamp int64 // SSTTimestamp set to given timestamp - atReqTS int64 // WriteAtRequestTimestamp with given timestamp + reqTS int64 + toReqTS int64 // SSTTimestampToRequestTimestamp with given SST timestamp noConflict bool // DisallowConflicts noShadow bool // DisallowShadowing noShadowBelow int64 // DisallowShadowingBelow expect []sstutil.KV - expectErr interface{} // error type, substring, or true (any error) + expectErr interface{} // error type, substring, substring slice, or true (any) expectErrRace interface{} expectStatsEst bool // expect MVCCStats.ContainsEstimates, don't check stats }{ @@ -89,13 +88,19 @@ func TestEvalAddSSTable(t *testing.T) { sst: []sstutil.KV{{"a", 1, "sst"}, {"c", 1, "sst"}}, expectErr: &roachpb.WriteIntentError{}, }, - "blind writes tombstones": { // unfortunately, for performance + "blind ignores intent outside span": { + data: []sstutil.KV{{"b", intentTS, "b0"}}, + sst: []sstutil.KV{{"c", 1, "sst"}, {"d", 1, "sst"}}, + expect: []sstutil.KV{{"b", intentTS, "b0"}, {"c", 1, "sst"}, {"d", 1, "sst"}}, + expectStatsEst: true, + }, + "blind writes tombstones unless race": { // unfortunately, for performance sst: []sstutil.KV{{"a", 1, ""}}, expect: []sstutil.KV{{"a", 1, ""}}, expectStatsEst: true, expectErrRace: `SST contains tombstone for key "a"/0.000000001,0`, }, - "blind writes SST inline values": { // unfortunately, for performance + "blind writes SST inline values unless race": { // unfortunately, for performance sst: []sstutil.KV{{"a", 0, "inline"}}, expect: []sstutil.KV{{"a", 0, "inline"}}, expectStatsEst: true, @@ -108,96 +113,123 @@ func TestEvalAddSSTable(t *testing.T) { expectStatsEst: true, }, - // WriteAtRequestTimestamp - "WriteAtRequestTimestamp sets timestamp": { - atReqTS: 10, - sst: []sstutil.KV{{"a", 1, "a1"}, {"b", 3, "b3"}}, - expect: []sstutil.KV{{"a", 10, "a1"}, {"b", 10, "b3"}}, + // SSTTimestampToRequestTimestamp + "SSTTimestampToRequestTimestamp rewrites timestamp": { + reqTS: 10, + toReqTS: 1, + sst: []sstutil.KV{{"a", 1, "a1"}, {"b", 1, "b1"}}, + expect: []sstutil.KV{{"a", 10, "a1"}, {"b", 10, "b1"}}, expectStatsEst: true, }, - "WriteAtRequestTimestamp rejects tombstones": { - atReqTS: 10, - sst: []sstutil.KV{{"a", 1, ""}}, - expectErr: "SST values cannot be tombstones", - expectErrRace: `SST contains tombstone for key "a"/0.000000001,0`, + "SSTTimestampToRequestTimestamp writes tombstones unless race": { // unfortunately, for performance + reqTS: 10, + toReqTS: 1, + sst: []sstutil.KV{{"a", 1, ""}}, + expect: []sstutil.KV{{"a", 10, ""}}, + expectErrRace: `SST contains tombstone for key "a"/0.000000001,0`, + expectStatsEst: true, }, - "WriteAtRequestTimestamp rejects inline values": { - atReqTS: 10, - sst: []sstutil.KV{{"a", 0, "inline"}}, - expectErr: "inline values or intents are not supported", - expectErrRace: `SST contains inline value or intent for key "a"/0,0`, + "SSTTimestampToRequestTimestamp rejects incorrect SST timestamp": { + reqTS: 10, + toReqTS: 1, + sst: []sstutil.KV{{"a", 1, "a1"}, {"b", 1, "b1"}, {"c", 2, "c2"}}, + expectErr: []string{ + `unexpected timestamp 0.000000002,0 (expected 0.000000001,0) for key "c"`, + `key has suffix "\x00\x00\x00\x00\x00\x00\x00\x02\t", expected "\x00\x00\x00\x00\x00\x00\x00\x01\t"`, + }, }, - "WriteAtRequestTimestamp writes below and replaces": { - atReqTS: 5, + "SSTTimestampToRequestTimestamp rejects incorrect 0 SST timestamp": { + reqTS: 10, + toReqTS: 1, + sst: []sstutil.KV{{"a", 1, "a1"}, {"b", 1, "b1"}, {"c", 0, "c0"}}, + expectErr: []string{ + `unexpected timestamp 0,0 (expected 0.000000001,0) for key "c"`, + `key has suffix "", expected "\x00\x00\x00\x00\x00\x00\x00\x01\t"`, + }, + expectErrRace: `SST contains inline value or intent for key "c"/0,0`, + }, + "SSTTimestampToRequestTimestamp writes below and replaces": { + reqTS: 5, + toReqTS: 1, data: []sstutil.KV{{"a", 5, "a5"}, {"b", 7, "b7"}}, sst: []sstutil.KV{{"a", 1, "sst"}, {"b", 1, "sst"}}, expect: []sstutil.KV{{"a", 5, "sst"}, {"b", 7, "b7"}, {"b", 5, "sst"}}, expectStatsEst: true, }, - "WriteAtRequestTimestamp returns WriteIntentError for intents": { - atReqTS: 10, + "SSTTimestampToRequestTimestamp returns WriteIntentError for intents": { + reqTS: 10, + toReqTS: 1, data: []sstutil.KV{{"a", intentTS, "intent"}}, sst: []sstutil.KV{{"a", 1, "a@1"}}, expectErr: &roachpb.WriteIntentError{}, }, - "WriteAtRequestTimestamp errors with DisallowConflicts below existing": { - atReqTS: 5, + "SSTTimestampToRequestTimestamp errors with DisallowConflicts below existing": { + reqTS: 5, + toReqTS: 10, noConflict: true, data: []sstutil.KV{{"a", 5, "a5"}, {"b", 7, "b7"}}, sst: []sstutil.KV{{"a", 10, "sst"}, {"b", 10, "sst"}}, expectErr: &roachpb.WriteTooOldError{}, }, - "WriteAtRequestTimestamp succeeds with DisallowConflicts above existing": { - atReqTS: 8, + "SSTTimestampToRequestTimestamp succeeds with DisallowConflicts above existing": { + reqTS: 8, + toReqTS: 1, noConflict: true, data: []sstutil.KV{{"a", 5, "a5"}, {"b", 7, "b7"}}, sst: []sstutil.KV{{"a", 1, "sst"}, {"b", 1, "sst"}}, expect: []sstutil.KV{{"a", 8, "sst"}, {"a", 5, "a5"}, {"b", 8, "sst"}, {"b", 7, "b7"}}, }, - "WriteAtRequestTimestamp errors with DisallowShadowing below existing": { - atReqTS: 5, + "SSTTimestampToRequestTimestamp errors with DisallowShadowing below existing": { + reqTS: 5, + toReqTS: 10, noShadow: true, data: []sstutil.KV{{"a", 5, "a5"}, {"b", 7, "b7"}}, sst: []sstutil.KV{{"a", 10, "sst"}, {"b", 10, "sst"}}, expectErr: `ingested key collides with an existing one: "a"`, }, - "WriteAtRequestTimestamp errors with DisallowShadowing above existing": { - atReqTS: 8, + "SSTTimestampToRequestTimestamp errors with DisallowShadowing above existing": { + reqTS: 8, + toReqTS: 1, noShadow: true, data: []sstutil.KV{{"a", 5, "a5"}, {"b", 7, "b7"}}, sst: []sstutil.KV{{"a", 1, "sst"}, {"b", 1, "sst"}}, expectErr: `ingested key collides with an existing one: "a"`, }, - "WriteAtRequestTimestamp succeeds with DisallowShadowing above tombstones": { - atReqTS: 8, + "SSTTimestampToRequestTimestamp succeeds with DisallowShadowing above tombstones": { + reqTS: 8, + toReqTS: 1, noShadow: true, data: []sstutil.KV{{"a", 5, ""}, {"b", 7, ""}}, sst: []sstutil.KV{{"a", 1, "sst"}, {"b", 1, "sst"}}, expect: []sstutil.KV{{"a", 8, "sst"}, {"a", 5, ""}, {"b", 8, "sst"}, {"b", 7, ""}}, }, - "WriteAtRequestTimestamp succeeds with DisallowShadowing and idempotent writes": { - atReqTS: 5, + "SSTTimestampToRequestTimestamp succeeds with DisallowShadowing and idempotent writes": { + reqTS: 5, + toReqTS: 1, noShadow: true, data: []sstutil.KV{{"a", 5, "a5"}, {"b", 5, "b5"}}, sst: []sstutil.KV{{"a", 1, "a5"}, {"b", 1, "b5"}}, expect: []sstutil.KV{{"a", 5, "a5"}, {"b", 5, "b5"}}, }, - "WriteAtRequestTimestamp errors with DisallowShadowingBelow equal value above existing below limit": { - atReqTS: 7, + "SSTTimestampToRequestTimestamp errors with DisallowShadowingBelow equal value above existing below limit": { + reqTS: 7, + toReqTS: 10, noShadowBelow: 5, data: []sstutil.KV{{"a", 3, "a3"}}, sst: []sstutil.KV{{"a", 10, "a3"}}, expectErr: `ingested key collides with an existing one: "a"`, }, - "WriteAtRequestTimestamp errors with DisallowShadowingBelow errors above existing above limit": { - atReqTS: 7, + "SSTTimestampToRequestTimestamp errors with DisallowShadowingBelow errors above existing above limit": { + reqTS: 7, + toReqTS: 10, noShadowBelow: 5, data: []sstutil.KV{{"a", 6, "a6"}}, sst: []sstutil.KV{{"a", 10, "sst"}}, expectErr: `ingested key collides with an existing one: "a"`, }, - "WriteAtRequestTimestamp allows DisallowShadowingBelow equal value above existing above limit": { - atReqTS: 7, + "SSTTimestampToRequestTimestamp allows DisallowShadowingBelow equal value above existing above limit": { + reqTS: 7, + toReqTS: 10, noShadowBelow: 5, data: []sstutil.KV{{"a", 6, "a6"}}, sst: []sstutil.KV{{"a", 10, "a6"}}, @@ -612,158 +644,148 @@ func TestEvalAddSSTable(t *testing.T) { sst: []sstutil.KV{{"a", 7, "a8"}}, expectErr: &roachpb.WriteTooOldError{}, }, - - // SSTTimestamp - "SSTTimestamp works with WriteAtRequestTimestamp": { - atReqTS: 7, - data: []sstutil.KV{{"a", 6, "a6"}}, - sst: []sstutil.KV{{"a", 7, "a7"}}, - sstTimestamp: 7, - expect: []sstutil.KV{{"a", 7, "a7"}, {"a", 6, "a6"}}, - expectStatsEst: true, - }, - /* Disabled due to nondeterminism under metamorphic tests. SSTTimestamp will - * shortly be removed anyway. - "SSTTimestamp doesn't rewrite with incorrect timestamp, but errors under race": { - atReqTS: 8, - data: []sstutil.KV{{"a", 6, "a6"}}, - sst: []sstutil.KV{{"a", 7, "a7"}}, - sstTimestamp: 8, - expect: []sstutil.KV{{"a", 7, "a7"}, {"a", 6, "a6"}}, - expectErrRace: `incorrect timestamp 0.000000007,0 for SST key "a" (expected 0.000000008,0)`, - expectStatsEst: true, - },*/ } testutils.RunTrueAndFalse(t, "IngestAsWrites", func(t *testing.T, ingestAsWrites bool) { - for name, tc := range testcases { - t.Run(name, func(t *testing.T) { - st := cluster.MakeTestingClusterSettings() - ctx := context.Background() - - dir := t.TempDir() - engine, err := storage.Open(ctx, storage.Filesystem(filepath.Join(dir, "db")), storage.Settings(st)) - require.NoError(t, err) - defer engine.Close() - - // Write initial data. - intentTxn := roachpb.MakeTransaction("intentTxn", nil, 0, hlc.Timestamp{WallTime: intentTS}, 0, 1) - b := engine.NewBatch() - for i := len(tc.data) - 1; i >= 0; i-- { // reverse, older timestamps first - kv := tc.data[i] - var txn *roachpb.Transaction - if kv.WallTimestamp == intentTS { - txn = &intentTxn + testutils.RunValues(t, "RewriteConcurrency", []interface{}{0, 8}, func(t *testing.T, c interface{}) { + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + batcheval.AddSSTableRewriteConcurrency.Override(ctx, &st.SV, int64(c.(int))) + + dir := t.TempDir() + engine, err := storage.Open(ctx, storage.Filesystem(filepath.Join(dir, "db")), storage.Settings(st)) + require.NoError(t, err) + defer engine.Close() + + // Write initial data. + intentTxn := roachpb.MakeTransaction("intentTxn", nil, 0, hlc.Timestamp{WallTime: intentTS}, 0, 1) + b := engine.NewBatch() + for i := len(tc.data) - 1; i >= 0; i-- { // reverse, older timestamps first + kv := tc.data[i] + var txn *roachpb.Transaction + if kv.WallTimestamp == intentTS { + txn = &intentTxn + } + require.NoError(t, storage.MVCCPut(ctx, b, nil, kv.Key(), kv.Timestamp(), kv.Value(), txn)) } - require.NoError(t, storage.MVCCPut(ctx, b, nil, kv.Key(), kv.Timestamp(), kv.Value(), txn)) - } - require.NoError(t, b.Commit(false)) - stats := engineStats(t, engine) - - // Build and add SST. - sst, start, end := sstutil.MakeSST(t, st, tc.sst) - reqTS := hlc.Timestamp{WallTime: defaultReqTS} - if tc.atReqTS != 0 { - reqTS.WallTime = tc.atReqTS - } - resp := &roachpb.AddSSTableResponse{} - result, err := batcheval.EvalAddSSTable(ctx, engine, batcheval.CommandArgs{ - EvalCtx: (&batcheval.MockEvalCtx{ClusterSettings: st}).EvalContext(), - Stats: stats, - Header: roachpb.Header{ - Timestamp: reqTS, - }, - Args: &roachpb.AddSSTableRequest{ - RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, - Data: sst, - MVCCStats: sstutil.ComputeStats(t, sst), - DisallowConflicts: tc.noConflict, - DisallowShadowing: tc.noShadow, - DisallowShadowingBelow: hlc.Timestamp{WallTime: tc.noShadowBelow}, - WriteAtRequestTimestamp: tc.atReqTS != 0, - SSTTimestamp: hlc.Timestamp{WallTime: tc.sstTimestamp}, - IngestAsWrites: ingestAsWrites, - }, - }, resp) - - expectErr := tc.expectErr - if tc.expectErrRace != nil && util.RaceEnabled { - expectErr = tc.expectErrRace - } - if expectErr != nil { - require.Error(t, err) - if b, ok := expectErr.(bool); ok && b { - // any error is fine - } else if expectMsg, ok := expectErr.(string); ok { - require.Contains(t, err.Error(), expectMsg) - } else if e, ok := expectErr.(error); ok { - require.True(t, errors.HasType(err, e), "expected %T, got %v", e, err) - } else { - require.Fail(t, "invalid expectErr", "expectErr=%v", expectErr) + require.NoError(t, b.Commit(false)) + stats := engineStats(t, engine, 0) + + // Build and add SST. + if tc.toReqTS != 0 && tc.reqTS == 0 && tc.expectErr == nil { + t.Fatal("can't set toReqTS without reqTS") + } + sst, start, end := sstutil.MakeSST(t, st, tc.sst) + resp := &roachpb.AddSSTableResponse{} + result, err := batcheval.EvalAddSSTable(ctx, engine, batcheval.CommandArgs{ + EvalCtx: (&batcheval.MockEvalCtx{ClusterSettings: st}).EvalContext(), + Stats: stats, + Header: roachpb.Header{ + Timestamp: hlc.Timestamp{WallTime: tc.reqTS}, + }, + Args: &roachpb.AddSSTableRequest{ + RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, + Data: sst, + MVCCStats: sstutil.ComputeStats(t, sst), + DisallowConflicts: tc.noConflict, + DisallowShadowing: tc.noShadow, + DisallowShadowingBelow: hlc.Timestamp{WallTime: tc.noShadowBelow}, + SSTTimestampToRequestTimestamp: hlc.Timestamp{WallTime: tc.toReqTS}, + IngestAsWrites: ingestAsWrites, + }, + }, resp) + + expectErr := tc.expectErr + if tc.expectErrRace != nil && util.RaceEnabled { + expectErr = tc.expectErrRace + } + if expectErr != nil { + require.Error(t, err) + if b, ok := expectErr.(bool); ok && b { + // any error is fine + } else if expectMsg, ok := expectErr.(string); ok { + require.Contains(t, err.Error(), expectMsg) + } else if expectMsgs, ok := expectErr.([]string); ok { + var found bool + for _, msg := range expectMsgs { + if strings.Contains(err.Error(), msg) { + found = true + break + } + } + if !found { + t.Fatalf("%q does not contain any of %q", err, expectMsgs) + } + } else if e, ok := expectErr.(error); ok { + require.True(t, errors.HasType(err, e), "expected %T, got %v", e, err) + } else { + require.Fail(t, "invalid expectErr", "expectErr=%v", expectErr) + } + return } - return - } - require.NoError(t, err) - - if ingestAsWrites { - require.Nil(t, result.Replicated.AddSSTable) - } else { - require.NotNil(t, result.Replicated.AddSSTable) - sstPath := filepath.Join(dir, "sst") - require.NoError(t, engine.WriteFile(sstPath, result.Replicated.AddSSTable.Data)) - require.NoError(t, engine.IngestExternalFiles(ctx, []string{sstPath})) - } - - // Scan resulting data from engine. - iter := storage.NewMVCCIncrementalIterator(engine, storage.MVCCIncrementalIterOptions{ - EndKey: keys.MaxKey, - StartTime: hlc.MinTimestamp, - EndTime: hlc.MaxTimestamp, - IntentPolicy: storage.MVCCIncrementalIterIntentPolicyEmit, - InlinePolicy: storage.MVCCIncrementalIterInlinePolicyEmit, - }) - defer iter.Close() - iter.SeekGE(storage.MVCCKey{Key: keys.SystemPrefix}) - scan := []sstutil.KV{} - for { - ok, err := iter.Valid() require.NoError(t, err) - if !ok { - break + + if ingestAsWrites { + require.Nil(t, result.Replicated.AddSSTable) + } else { + require.NotNil(t, result.Replicated.AddSSTable) + sstPath := filepath.Join(dir, "sst") + require.NoError(t, engine.WriteFile(sstPath, result.Replicated.AddSSTable.Data)) + require.NoError(t, engine.IngestExternalFiles(ctx, []string{sstPath})) } - key := string(iter.Key().Key) - ts := iter.Key().Timestamp.WallTime - var value []byte - if iter.Key().IsValue() { - if len(iter.Value()) > 0 { - value, err = roachpb.Value{RawBytes: iter.Value()}.GetBytes() + + // Scan resulting data from engine. + iter := storage.NewMVCCIncrementalIterator(engine, storage.MVCCIncrementalIterOptions{ + EndKey: keys.MaxKey, + StartTime: hlc.MinTimestamp, + EndTime: hlc.MaxTimestamp, + IntentPolicy: storage.MVCCIncrementalIterIntentPolicyEmit, + InlinePolicy: storage.MVCCIncrementalIterInlinePolicyEmit, + }) + defer iter.Close() + iter.SeekGE(storage.MVCCKey{Key: keys.SystemPrefix}) + scan := []sstutil.KV{} + for { + ok, err := iter.Valid() + require.NoError(t, err) + if !ok { + break + } + key := string(iter.Key().Key) + ts := iter.Key().Timestamp.WallTime + var value []byte + if iter.Key().IsValue() { + if len(iter.Value()) > 0 { + value, err = roachpb.Value{RawBytes: iter.Value()}.GetBytes() + require.NoError(t, err) + } + } else { + var meta enginepb.MVCCMetadata + require.NoError(t, protoutil.Unmarshal(iter.UnsafeValue(), &meta)) + if meta.RawBytes == nil { + // Skip intent metadata records (value emitted separately). + iter.Next() + continue + } + value, err = roachpb.Value{RawBytes: meta.RawBytes}.GetBytes() require.NoError(t, err) } + scan = append(scan, sstutil.KV{key, ts, string(value)}) + iter.Next() + } + require.Equal(t, tc.expect, scan) + + // Check that stats were updated correctly. + if tc.expectStatsEst { + require.True(t, stats.ContainsEstimates > 0, "expected stats to be estimated") } else { - var meta enginepb.MVCCMetadata - require.NoError(t, protoutil.Unmarshal(iter.UnsafeValue(), &meta)) - if meta.RawBytes == nil { - // Skip intent metadata records (value emitted separately). - iter.Next() - continue - } - value, err = roachpb.Value{RawBytes: meta.RawBytes}.GetBytes() - require.NoError(t, err) + require.False(t, stats.ContainsEstimates > 0, "found estimated stats") + require.Equal(t, stats, engineStats(t, engine, stats.LastUpdateNanos)) } - scan = append(scan, sstutil.KV{key, ts, string(value)}) - iter.Next() - } - require.Equal(t, tc.expect, scan) - - // Check that stats were updated correctly. - if tc.expectStatsEst { - require.True(t, stats.ContainsEstimates > 0, "expected stats to be estimated") - } else { - require.False(t, stats.ContainsEstimates > 0, "found estimated stats") - stats.LastUpdateNanos = 0 // avoid spurious diffs - require.Equal(t, stats, engineStats(t, engine)) - } - }) - } + }) + } + }) }) } @@ -777,8 +799,8 @@ func TestEvalAddSSTableRangefeed(t *testing.T) { testcases := map[string]struct { sst []sstutil.KV - atReqTS bool // WriteAtRequestTimestamp - asWrites bool // IngestAsWrites + toReqTS int64 // SSTTimestampToRequestTimestamp + asWrites bool // IngestAsWrites expectHistoryMutation bool expectLogicalOps []enginepb.MVCCLogicalOp }{ @@ -787,9 +809,9 @@ func TestEvalAddSSTableRangefeed(t *testing.T) { expectHistoryMutation: true, expectLogicalOps: nil, }, - "WriteAtRequestTimestamp alone": { + "SSTTimestampToRequestTimestamp alone": { sst: []sstutil.KV{{"a", 1, "a1"}}, - atReqTS: true, + toReqTS: 1, expectHistoryMutation: false, expectLogicalOps: nil, }, @@ -799,10 +821,10 @@ func TestEvalAddSSTableRangefeed(t *testing.T) { expectHistoryMutation: true, expectLogicalOps: nil, }, - "IngestAsWrites and WriteAtRequestTimestamp": { - sst: []sstutil.KV{{"a", 1, "a1"}, {"b", 2, "b2"}}, + "IngestAsWrites and SSTTimestampToRequestTimestamp": { + sst: []sstutil.KV{{"a", 1, "a1"}, {"b", 1, "b1"}}, asWrites: true, - atReqTS: true, + toReqTS: 1, expectHistoryMutation: false, expectLogicalOps: []enginepb.MVCCLogicalOp{ // NOTE: Value is populated by the rangefeed processor, not MVCC, so it @@ -831,11 +853,11 @@ func TestEvalAddSSTableRangefeed(t *testing.T) { }, Stats: &enginepb.MVCCStats{}, Args: &roachpb.AddSSTableRequest{ - RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, - Data: sst, - MVCCStats: sstutil.ComputeStats(t, sst), - WriteAtRequestTimestamp: tc.atReqTS, - IngestAsWrites: tc.asWrites, + RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, + Data: sst, + MVCCStats: sstutil.ComputeStats(t, sst), + SSTTimestampToRequestTimestamp: hlc.Timestamp{WallTime: tc.toReqTS}, + IngestAsWrites: tc.asWrites, }, }, &roachpb.AddSSTableResponse{}) require.NoError(t, err) @@ -845,7 +867,7 @@ func TestEvalAddSSTableRangefeed(t *testing.T) { } else { require.NotNil(t, result.Replicated.AddSSTable) require.Equal(t, roachpb.Span{Key: start, EndKey: end}, result.Replicated.AddSSTable.Span) - require.Equal(t, tc.atReqTS, result.Replicated.AddSSTable.AtWriteTimestamp) + require.Equal(t, tc.toReqTS != 0, result.Replicated.AddSSTable.AtWriteTimestamp) } if tc.expectHistoryMutation { require.Equal(t, &kvserverpb.ReplicatedEvalResult_MVCCHistoryMutation{ @@ -1097,7 +1119,7 @@ func TestAddSSTableMVCCStats(t *testing.T) { // After EvalAddSSTable, cArgs.Stats contains a diff to the existing // stats. Make sure recomputing from scratch gets the same answer as // applying the diff to the stats - statsBefore := engineStats(t, engine) + statsBefore := engineStats(t, engine, 0) ts := hlc.Timestamp{WallTime: 7} cArgs := batcheval.CommandArgs{ EvalCtx: evalCtx, @@ -1121,8 +1143,7 @@ func TestAddSSTableMVCCStats(t *testing.T) { statsEvaled.Add(*cArgs.Stats) statsEvaled.Add(statsDelta) statsEvaled.ContainsEstimates = 0 - statsEvaled.LastUpdateNanos = 0 - require.Equal(t, engineStats(t, engine), statsEvaled) + require.Equal(t, engineStats(t, engine, statsEvaled.LastUpdateNanos), statsEvaled) // Check stats for a single KV. sst, start, end = sstutil.MakeSST(t, st, []sstutil.KV{{"zzzzzzz", ts.WallTime, "zzz"}}) @@ -1311,9 +1332,9 @@ func TestAddSSTableIntentResolution(t *testing.T) { require.Contains(t, err.Error(), "TransactionRetryWithProtoRefreshError: TransactionAbortedError") } -// TestAddSSTableWriteAtRequestTimestampRespectsTSCache checks that AddSSTable -// with WriteAtRequestTimestamp respects the timestamp cache. -func TestAddSSTableWriteAtRequestTimestampRespectsTSCache(t *testing.T) { +// TestAddSSTableSSTTimestampToRequestTimestampRespectsTSCache checks that AddSSTable +// with SSTTimestampToRequestTimestamp respects the timestamp cache. +func TestAddSSTableSSTTimestampToRequestTimestampRespectsTSCache(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1332,10 +1353,10 @@ func TestAddSSTableWriteAtRequestTimestampRespectsTSCache(t *testing.T) { // Add an SST writing below the previous write. sst, start, end := sstutil.MakeSST(t, s.ClusterSettings(), []sstutil.KV{{"key", 1, "sst"}}) sstReq := &roachpb.AddSSTableRequest{ - RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, - Data: sst, - MVCCStats: sstutil.ComputeStats(t, sst), - WriteAtRequestTimestamp: true, + RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, + Data: sst, + MVCCStats: sstutil.ComputeStats(t, sst), + SSTTimestampToRequestTimestamp: hlc.Timestamp{WallTime: 1}, } ba := roachpb.BatchRequest{ Header: roachpb.Header{Timestamp: txnTS.Prev()}, @@ -1364,9 +1385,9 @@ func TestAddSSTableWriteAtRequestTimestampRespectsTSCache(t *testing.T) { require.Equal(t, "sst", string(kv.ValueBytes())) } -// TestAddSSTableWriteAtRequestTimestampRespectsClosedTS checks that AddSSTable -// with WriteAtRequestTimestamp respects the closed timestamp. -func TestAddSSTableWriteAtRequestTimestampRespectsClosedTS(t *testing.T) { +// TestAddSSTableSSTTimestampToRequestTimestampRespectsClosedTS checks that AddSSTable +// with SSTTimestampToRequestTimestamp respects the closed timestamp. +func TestAddSSTableSSTTimestampToRequestTimestampRespectsClosedTS(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1392,10 +1413,10 @@ func TestAddSSTableWriteAtRequestTimestampRespectsClosedTS(t *testing.T) { reqTS := closedTS.Prev() sst, start, end := sstutil.MakeSST(t, store.ClusterSettings(), []sstutil.KV{{"key", 1, "sst"}}) sstReq := &roachpb.AddSSTableRequest{ - RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, - Data: sst, - MVCCStats: sstutil.ComputeStats(t, sst), - WriteAtRequestTimestamp: true, + RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, + Data: sst, + MVCCStats: sstutil.ComputeStats(t, sst), + SSTTimestampToRequestTimestamp: hlc.Timestamp{WallTime: 1}, } ba := roachpb.BatchRequest{ Header: roachpb.Header{Timestamp: reqTS}, @@ -1418,7 +1439,7 @@ func TestAddSSTableWriteAtRequestTimestampRespectsClosedTS(t *testing.T) { } // engineStats computes the MVCC stats for the given engine. -func engineStats(t *testing.T, engine storage.Engine) *enginepb.MVCCStats { +func engineStats(t *testing.T, engine storage.Engine, nowNanos int64) *enginepb.MVCCStats { t.Helper() iter := engine.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ @@ -1428,7 +1449,7 @@ func engineStats(t *testing.T, engine storage.Engine) *enginepb.MVCCStats { defer iter.Close() // We don't care about nowNanos, because the SST can't contain intents or // tombstones and all existing intents will be resolved. - stats, err := storage.ComputeStatsForRange(iter, keys.LocalMax, keys.MaxKey, 0) + stats, err := storage.ComputeStatsForRange(iter, keys.LocalMax, keys.MaxKey, nowNanos) require.NoError(t, err) return &stats } diff --git a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto index 241a00982512..2e19c02270b9 100644 --- a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto +++ b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto @@ -166,8 +166,8 @@ message ReplicatedEvalResult { uint32 crc32 = 2 [(gogoproto.customname) = "CRC32"]; roachpb.Span span = 3 [(gogoproto.nullable) = false]; // If true, all SST MVCC timestamps equal the WriteTimestamp. This is given - // by the WriteAtRequestTimestamp request parameter, which was introduced in - // 22.1 and only used with the MVCCAddSSTable cluster version. + // by the SSTTimestampToRequestTimestamp request parameter, which was + // introduced in 22.1 and only used with the MVCCAddSSTable cluster version. // // TODO(erikgrinaker): This field currently controls whether to emit an // AddSSTable event across the rangefeed. We could equivalently check diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index e84bdc4020f3..25f18ed294c3 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -638,8 +638,8 @@ func (r *Replica) handleLogicalOpLogRaftMuLocked( // handleSSTableRaftMuLocked emits an ingested SSTable from AddSSTable via the // rangefeed. These can be expected to have timestamps at the write timestamp -// (i.e. submitted with WriteAtRequestTimestamp) since we assert elsewhere that -// MVCCHistoryMutation commands disconnect rangefeeds. +// (i.e. submitted with SSTTimestampToRequestTimestamp) since we assert +// elsewhere that MVCCHistoryMutation commands disconnect rangefeeds. // // NB: We currently don't have memory budgeting for rangefeeds, instead using a // large buffered channel, so this can easily OOM the node. This is "fine" for diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index f4d0b4c8aa47..67d6c649e334 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -1335,7 +1335,7 @@ func (*AdminScatterRequest) flags() flag { return isAdmin | isR func (*AdminVerifyProtectedTimestampRequest) flags() flag { return isAdmin | isRange | isAlone } func (r *AddSSTableRequest) flags() flag { flags := isWrite | isRange | isAlone | isUnsplittable | canBackpressure - if r.WriteAtRequestTimestamp { + if r.SSTTimestampToRequestTimestamp.IsSet() { flags |= appliesTSCache } return flags diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index 2ce1436c216e..378d9c33b0c6 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -1600,9 +1600,8 @@ message AdminVerifyProtectedTimestampResponse { // AddSSTableRequest contains arguments to the AddSSTable method, which links an // SST file into the Pebble log-structured merge-tree. The SST must only contain // committed versioned values with non-zero MVCC timestamps (no intents or -// inline values) and no tombstones, but this is only fully enforced when -// WriteAtRequestTimestamp is enabled, for performance. It cannot be used in a -// transaction, cannot be split across ranges, and must be alone in a batch. +// inline values) and no tombstones. It cannot be used in a transaction, cannot +// be split across ranges, and must be alone in a batch. // // By default, AddSSTable will blindly write the SST contents into Pebble, with // fixed MVCC timestamps unaffected by pushes. This can violate many CRDB @@ -1615,8 +1614,8 @@ message AdminVerifyProtectedTimestampResponse { // The following parameters can be used to make AddSSTable enforce these // guarantees, at a performance cost: // -// * WriteAtRequestTimestamp: ensures compliance with the timestamp cache and -// closed timestamp, by rewriting SST timestamps to the request timestamp. +// * SSTTimestampToRequestTimestamp: ensures compliance with the timestamp cache +// and closed timestamp, by rewriting SST timestamps to the request timestamp. // Also emits the SST via the range feed. // // * DisallowConflicts, DisallowShadowing, or DisallowShadowingBelow: ensures @@ -1643,11 +1642,13 @@ message AddSSTableRequest { RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; bytes data = 2; - // WriteAtRequestTimestamp updates all MVCC timestamps in the SST to the - // request timestamp, even if the request gets pushed. This ensures the writes - // comply with the timestamp cache and closed timestamp. It also causes the - // AddSSTable to be emitted via the range feed, since it respects the closed - // timestamp. + // SSTTimestampToRequestTimestamp gives the timestamp used for all MVCC keys + // in the provided SST. If this timestamp differs from the request timestamp + // (e.g. if the request gets pushed) then all MVCC keys in the SST will be + // rewritten to the request timestamp during request evaluation. This ensures + // the writes comply with the timestamp cache and closed timestamp. It also + // causes the AddSSTable to be emitted via the range feed, since it respects + // the closed timestamp. // // Callers should always set this, except in very special circumstances when // the timestamp cache and closed timestamp can safely be ignored (e.g. @@ -1660,17 +1661,8 @@ message AddSSTableRequest { // serializability. // // Added in 22.1, so check the MVCCAddSSTable version gate before using. - bool write_at_request_timestamp = 6; - - // SSTTimestamp is a promise from the client that all MVCC timestamps in the - // SST equal the provided timestamp, and that there are no inline values, - // intents, or tombstones. When used together with WriteAtRequestTimestamp, - // this can avoid an SST rewrite (and the associated overhead) if the SST - // timestamp equals the request timestamp (i.e. if it was provided by the - // client and the request was not pushed due to e.g. the closed timestamp or - // contention). - util.hlc.Timestamp sst_timestamp = 9 - [(gogoproto.customname) = "SSTTimestamp", (gogoproto.nullable) = false]; + util.hlc.Timestamp sst_timestamp_to_request_timestamp = 6 [ + (gogoproto.customname) = "SSTTimestampToRequestTimestamp", (gogoproto.nullable) = false]; // DisallowConflicts will check for MVCC conflicts with existing keys, i.e. // scan for existing keys with a timestamp at or above the SST key and @@ -1680,7 +1672,7 @@ message AddSSTableRequest { // Note that this alone is not sufficient to guarantee serializability or // single-key linearizability, since it can write to a timestamp that another // reader has already observed, changing the value at that timestamp and above - // it. Use WriteAtRequestTimestamp in addition to guarantee serializability. + // it. Use with SSTTimestampToRequestTimestamp to guarantee serializability. // // Added in 22.1, so check the MVCCAddSSTable version gate before using. // @@ -2626,10 +2618,10 @@ message RangeFeedError { // RangeFeedSSTable is a variant of RangeFeedEvent that represents an AddSSTable // operation, containing the entire ingested SST. It is only emitted for -// SSTables written with WriteAtRequestTimestamp enabled, so it is guaranteed to -// comply with the closed timestamp. The Span and WriteTS fields are advisory, -// and contain the client-provided SST key span (may be wider than the SST data) -// and the MVCC timestamp used for all contained entries. +// SSTables written with SSTTimestampToRequestTimestamp enabled, so it is +// guaranteed to comply with the closed timestamp. The Span and WriteTS fields +// are advisory, and contain the client-provided SST key span (may be wider than +// the SST data) and the MVCC timestamp used for all contained entries. // // The entire SST is emitted even for registrations that have a narrower span, // it is up to the caller to prune the SST as appropriate. Catchup scans emit diff --git a/pkg/roachpb/api_test.go b/pkg/roachpb/api_test.go index 45d0ce748b8f..5b45ea34d975 100644 --- a/pkg/roachpb/api_test.go +++ b/pkg/roachpb/api_test.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/redact" gogoproto "github.com/gogo/protobuf/proto" @@ -315,7 +316,7 @@ func TestTenantConsumptionAddSub(t *testing.T) { func TestFlagCombinations(t *testing.T) { // Any non-zero-valued request variants that conditionally affect flags. reqVariants := []Request{ - &AddSSTableRequest{WriteAtRequestTimestamp: true}, + &AddSSTableRequest{SSTTimestampToRequestTimestamp: hlc.Timestamp{Logical: 1}}, &DeleteRangeRequest{Inline: true}, &GetRequest{KeyLocking: lock.Exclusive}, &ReverseScanRequest{KeyLocking: lock.Exclusive}, diff --git a/pkg/storage/sst.go b/pkg/storage/sst.go index 187b701729d0..cc87935da39b 100644 --- a/pkg/storage/sst.go +++ b/pkg/storage/sst.go @@ -183,8 +183,8 @@ func CheckSSTConflicts( // If the existing key has a timestamp at or above the SST key, return a // WriteTooOldError. Normally this could cause a transactional request to be // automatically retried after a read refresh, which we would only want to - // do if AddSSTable had WriteAtRequestTimestamp set, but AddSSTable cannot - // be used in transactions so we don't need to check. + // do if AddSSTable had SSTTimestampToRequestTimestamp set, but AddSSTable + // cannot be used in transactions so we don't need to check. if sstKey.Timestamp.LessEq(extKey.Timestamp) { return enginepb.MVCCStats{}, roachpb.NewWriteTooOldError( sstKey.Timestamp, extKey.Timestamp.Next(), sstKey.Key) @@ -221,17 +221,23 @@ func CheckSSTConflicts( return statsDiff, nil } -// UpdateSSTTimestamps replaces all MVCC timestamp in the provided SST with the -// given timestamp. All keys must have a non-zero timestamp, otherwise an error -// is returned to protect against accidental inclusion of intents or inline -// values. Tombstones are also rejected opportunistically, since we're iterating -// over the entire SST anyway. +// UpdateSSTTimestamps replaces all MVCC timestamp in the provided SST to the +// given timestamp. All keys must already have the given "from" timestamp. func UpdateSSTTimestamps( ctx context.Context, st *cluster.Settings, sst []byte, from, to hlc.Timestamp, concurrency int, ) ([]byte, error) { + if from.IsEmpty() { + return nil, errors.Errorf("from timestamp not given") + } + if to.IsEmpty() { + return nil, errors.Errorf("to timestamp not given") + } + sstOut := &MemFile{} sstOut.Buffer.Grow(len(sst)) - if concurrency > 0 && !from.IsEmpty() { + + // Fancy optimized Pebble SST rewriter. + if concurrency > 0 { defaults := DefaultPebbleOptions() opts := defaults.MakeReaderOptions() if fp := defaults.Levels[0].FilterPolicy; fp != nil && len(opts.Filters) == 0 { @@ -241,13 +247,16 @@ func UpdateSSTTimestamps( opts, sstOut, MakeIngestionWriterOptions(ctx, st), - encodeMVCCTimestampSuffix(from), encodeMVCCTimestampSuffix(to), + encodeMVCCTimestampSuffix(from), + encodeMVCCTimestampSuffix(to), concurrency, ); err != nil { return nil, err } return sstOut.Bytes(), nil } + + // Naïve read/write loop. writer := MakeIngestionSSTWriter(ctx, st, sstOut) defer writer.Close() @@ -263,11 +272,10 @@ func UpdateSSTTimestamps( } else if !ok { break } - if iter.UnsafeKey().Timestamp.IsEmpty() { - return nil, errors.New("inline values or intents are not supported") - } - if len(iter.UnsafeValue()) == 0 { - return nil, errors.New("SST values cannot be tombstones") + key := iter.UnsafeKey() + if key.Timestamp != from { + return nil, errors.Errorf("unexpected timestamp %s (expected %s) for key %s", + key.Timestamp, from, key.Key) } err = writer.PutMVCC(MVCCKey{Key: iter.UnsafeKey().Key, Timestamp: to}, iter.UnsafeValue()) if err != nil { diff --git a/pkg/storage/sst_test.go b/pkg/storage/sst_test.go index ece6fd874edf..3cf0dedd1bab 100644 --- a/pkg/storage/sst_test.go +++ b/pkg/storage/sst_test.go @@ -106,11 +106,12 @@ func BenchmarkUpdateSSTTimestamps(b *testing.B) { modeCounter // uint64 counter in first 8 bytes modeRandom // random values - sstSize = 0 - keyCount = 500000 - valueSize = 8 - valueMode = modeRandom - profile = false // cpuprofile.pprof + concurrency = 0 // 0 uses naïve replacement + sstSize = 0 + keyCount = 500000 + valueSize = 8 + valueMode = modeRandom + profile = false // cpuprofile.pprof ) if sstSize > 0 && keyCount > 0 { @@ -129,7 +130,7 @@ func BenchmarkUpdateSSTTimestamps(b *testing.B) { key := make([]byte, 8) value := make([]byte, valueSize) - ts := hlc.Timestamp{WallTime: 1} + sstTimestamp := hlc.Timestamp{WallTime: 1} var i uint64 for i = 0; (keyCount > 0 && i < keyCount) || (sstSize > 0 && sstFile.Len() < sstSize); i++ { binary.BigEndian.PutUint64(key, i) @@ -138,11 +139,8 @@ func BenchmarkUpdateSSTTimestamps(b *testing.B) { case modeZero: case modeCounter: binary.BigEndian.PutUint64(value, i) - ts.WallTime++ case modeRandom: r.Read(value) - ts.WallTime = r.Int63() - ts.Logical = r.Int31() default: b.Fatalf("unknown value mode %d", valueMode) } @@ -151,7 +149,7 @@ func BenchmarkUpdateSSTTimestamps(b *testing.B) { v.SetBytes(value) v.InitChecksum(key) - require.NoError(b, writer.PutMVCC(MVCCKey{Key: key, Timestamp: ts}, v.RawBytes)) + require.NoError(b, writer.PutMVCC(MVCCKey{Key: key, Timestamp: sstTimestamp}, v.RawBytes)) } writer.Close() b.Logf("%vMB %v keys", sstFile.Len()/1e6, i) @@ -165,10 +163,12 @@ func BenchmarkUpdateSSTTimestamps(b *testing.B) { defer pprof.StopCPUProfile() } + requestTimestamp := hlc.Timestamp{WallTime: 1634899098417970999, Logical: 9} + b.StartTimer() for i := 0; i < b.N; i++ { - ts := hlc.Timestamp{WallTime: 1634899098417970999, Logical: 9} - _, err := UpdateSSTTimestamps(ctx, st, sstFile.Bytes(), hlc.Timestamp{}, ts, 0) + _, err := UpdateSSTTimestamps( + ctx, st, sstFile.Bytes(), sstTimestamp, requestTimestamp, concurrency) require.NoError(b, err) } }