diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 77c95b0454f9..d79ab5197ac3 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -186,7 +186,7 @@ const ( // bundles when the query latency exceeds the user provided threshold. AlterSystemStmtDiagReqs // MVCCAddSSTable supports MVCC-compliant AddSSTable requests via the new - // WriteAtRequestTimestamp and DisallowConflicts parameters. + // SSTTimestampToRequestTimestamp and DisallowConflicts parameters. MVCCAddSSTable // InsertPublicSchemaNamespaceEntryOnRestore ensures all public schemas // have an entry in system.namespace upon being restored. diff --git a/pkg/kv/batch.go b/pkg/kv/batch.go index 320dc2483a23..3c248d9ab3a8 100644 --- a/pkg/kv/batch.go +++ b/pkg/kv/batch.go @@ -776,8 +776,7 @@ func (b *Batch) addSSTable( disallowShadowingBelow hlc.Timestamp, stats *enginepb.MVCCStats, ingestAsWrites bool, - writeAtRequestTimestamp bool, - sstTimestamp hlc.Timestamp, + sstTimestampToRequestTimestamp hlc.Timestamp, ) { begin, err := marshalKey(s) if err != nil { @@ -794,14 +793,13 @@ func (b *Batch) addSSTable( Key: begin, EndKey: end, }, - Data: data, - DisallowConflicts: disallowConflicts, - DisallowShadowing: disallowShadowing, - DisallowShadowingBelow: disallowShadowingBelow, - MVCCStats: stats, - IngestAsWrites: ingestAsWrites, - WriteAtRequestTimestamp: writeAtRequestTimestamp, - SSTTimestamp: sstTimestamp, + Data: data, + DisallowConflicts: disallowConflicts, + DisallowShadowing: disallowShadowing, + DisallowShadowingBelow: disallowShadowingBelow, + MVCCStats: stats, + IngestAsWrites: ingestAsWrites, + SSTTimestampToRequestTimestamp: sstTimestampToRequestTimestamp, } b.appendReqs(req) b.initResult(1, 0, notRaw, nil) diff --git a/pkg/kv/db.go b/pkg/kv/db.go index 88fd9b841329..05e834a1a220 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -688,7 +688,7 @@ func (db *DB) AddSSTable( ) error { b := &Batch{Header: roachpb.Header{Timestamp: batchTs}} b.addSSTable(begin, end, data, disallowConflicts, disallowShadowing, disallowShadowingBelow, - stats, ingestAsWrites, false /* writeAtBatchTS */, hlc.Timestamp{} /* sstTimestamp */) + stats, ingestAsWrites, hlc.Timestamp{} /* sstTimestampToRequestTimestamp */) return getOneErr(db.Run(ctx, b), b) } @@ -711,7 +711,7 @@ func (db *DB) AddSSTableAtBatchTimestamp( ) (hlc.Timestamp, error) { b := &Batch{Header: roachpb.Header{Timestamp: batchTs}} b.addSSTable(begin, end, data, disallowConflicts, disallowShadowing, disallowShadowingBelow, - stats, ingestAsWrites, true /* writeAtBatchTS */, batchTs) + stats, ingestAsWrites, batchTs) err := getOneErr(db.Run(ctx, b), b) if err != nil { return hlc.Timestamp{}, err diff --git a/pkg/kv/kvclient/rangefeed/config.go b/pkg/kv/kvclient/rangefeed/config.go index 77747b3ef3bf..56a0e235730a 100644 --- a/pkg/kv/kvclient/rangefeed/config.go +++ b/pkg/kv/kvclient/rangefeed/config.go @@ -163,8 +163,8 @@ func WithOnCheckpoint(f OnCheckpoint) Option { // requested by WithDiff, and callers must obtain these themselves if needed. // // Also note that AddSSTable requests that do not set the -// WriteAtRequestTimestamp flag, possibly writing below the closed timestamp, -// will cause affected rangefeeds to be disconnected with a terminal +// SSTTimestampToRequestTimestamp param, possibly writing below the closed +// timestamp, will cause affected rangefeeds to be disconnected with a terminal // MVCCHistoryMutationError and thus will not be emitted here -- there should be // no such requests into spans with rangefeeds across them, but it is up to // callers to ensure this. diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go index 5cd040b68bc3..4682506b5013 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go @@ -511,7 +511,7 @@ func TestWithOnSSTable(t *testing.T) { now.Logical = 0 ts := now.WallTime sstKVs := []sstutil.KV{{"a", ts, "1"}, {"b", ts, "2"}, {"c", ts, "3"}, {"d", ts, "4"}, {"e", ts, "5"}} - sst, sstStart, sstEnd := sstutil.MakeSST(ctx, srv.ClusterSettings(), t, sstKVs) + sst, sstStart, sstEnd := sstutil.MakeSST(t, srv.ClusterSettings(), sstKVs) _, pErr := db.AddSSTableAtBatchTimestamp(ctx, sstStart, sstEnd, sst, false /* disallowConflicts */, false /* disallowShadowing */, hlc.Timestamp{}, nil, /* stats */ false /* ingestAsWrites */, now) @@ -587,7 +587,7 @@ func TestWithOnSSTableCatchesUpIfNotSet(t *testing.T) { ts := now.WallTime sstKVs := []sstutil.KV{{"a", ts, "1"}, {"b", ts, "2"}, {"c", ts, "3"}, {"d", ts, "4"}, {"e", ts, "5"}} expectKVs := []sstutil.KV{{"c", ts, "3"}, {"d", ts, "4"}} - sst, sstStart, sstEnd := sstutil.MakeSST(ctx, srv.ClusterSettings(), t, sstKVs) + sst, sstStart, sstEnd := sstutil.MakeSST(t, srv.ClusterSettings(), sstKVs) _, pErr := db.AddSSTableAtBatchTimestamp(ctx, sstStart, sstEnd, sst, false /* disallowConflicts */, false /* disallowShadowing */, hlc.Timestamp{}, nil, /* stats */ false /* ingestAsWrites */, now) diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go index bb8faf9ac2b8..d123dc0b0bcc 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go @@ -38,10 +38,11 @@ func init() { RegisterReadWriteCommand(roachpb.AddSSTable, DefaultDeclareIsolatedKeys, EvalAddSSTable) } -var addSSTableRewriteConcurrency = settings.RegisterIntSetting( +// AddSSTableRewriteConcurrency sets the concurrency of a single SST rewrite. +var AddSSTableRewriteConcurrency = settings.RegisterIntSetting( settings.SystemOnly, "kv.bulk_io_write.sst_rewrite_concurrency.per_call", - "concurrency to use when rewriting a sstable timestamps by block, or 0 to use a loop", + "concurrency to use when rewriting sstable timestamps by block, or 0 to use a loop", int64(util.ConstantWithMetamorphicTestRange("addsst-rewrite-concurrency", 0, 0, 16)), settings.NonNegativeInt, ) @@ -56,6 +57,7 @@ func EvalAddSSTable( ms := cArgs.Stats start, end := storage.MVCCKey{Key: args.Key}, storage.MVCCKey{Key: args.EndKey} sst := args.Data + sstToReqTS := args.SSTTimestampToRequestTimestamp var span *tracing.Span var err error @@ -63,28 +65,24 @@ func EvalAddSSTable( defer span.Finish() log.Eventf(ctx, "evaluating AddSSTable [%s,%s)", start.Key, end.Key) - // Under the race detector, scan the SST contents and make sure it satisfies - // the AddSSTable requirements. We do not always perform these checks - // otherwise, due to the cost. + // Under the race detector, check that the SST contents satisfy AddSSTable + // requirements. We don't always do this otherwise, due to the cost. if util.RaceEnabled { - if err := assertSSTContents(sst, args.SSTTimestamp, args.MVCCStats); err != nil { + if err := assertSSTContents(sst, sstToReqTS, args.MVCCStats); err != nil { return result.Result{}, err } } - // If requested, rewrite the SST's MVCC timestamps to the request timestamp. - // This ensures the writes comply with the timestamp cache and closed - // timestamp, i.e. by not writing to timestamps that have already been - // observed or closed. - if args.WriteAtRequestTimestamp && - (args.SSTTimestamp.IsEmpty() || h.Timestamp != args.SSTTimestamp || - util.ConstantWithMetamorphicTestBool("addsst-rewrite-forced", false)) { + // If requested and necessary, rewrite the SST's MVCC timestamps to the + // request timestamp. This ensures the writes comply with the timestamp cache + // and closed timestamp, i.e. by not writing to timestamps that have already + // been observed or closed. + if sstToReqTS.IsSet() && (h.Timestamp != sstToReqTS || + util.ConstantWithMetamorphicTestBool("addsst-rewrite-forced", false)) { st := cArgs.EvalCtx.ClusterSettings() // TODO(dt): use a quotapool. - concurrency := int(addSSTableRewriteConcurrency.Get(&st.SV)) - sst, err = storage.UpdateSSTTimestamps( - ctx, st, sst, args.SSTTimestamp, h.Timestamp, concurrency, - ) + conc := int(AddSSTableRewriteConcurrency.Get(&cArgs.EvalCtx.ClusterSettings().SV)) + sst, err = storage.UpdateSSTTimestamps(ctx, st, sst, sstToReqTS, h.Timestamp, conc) if err != nil { return result.Result{}, errors.Wrap(err, "updating SST timestamps") } @@ -226,7 +224,7 @@ func EvalAddSSTable( ms.Add(stats) var mvccHistoryMutation *kvserverpb.ReplicatedEvalResult_MVCCHistoryMutation - if !args.WriteAtRequestTimestamp { + if sstToReqTS.IsEmpty() { mvccHistoryMutation = &kvserverpb.ReplicatedEvalResult_MVCCHistoryMutation{ Spans: []roachpb.Span{{Key: start.Key, EndKey: end.Key}}, } @@ -261,7 +259,7 @@ func EvalAddSSTable( // write by not allowing writing below existing keys, and we want to // retain parity with regular SST ingestion which does allow this. We // therefore record these operations ourselves. - if args.WriteAtRequestTimestamp { + if sstToReqTS.IsSet() { readWriter.LogLogicalOp(storage.MVCCWriteValueOpType, storage.MVCCLogicalOpDetails{ Key: k.Key, Timestamp: k.Timestamp, @@ -287,7 +285,7 @@ func EvalAddSSTable( Data: sst, CRC32: util.CRC32(sst), Span: roachpb.Span{Key: start.Key, EndKey: end.Key}, - AtWriteTimestamp: args.WriteAtRequestTimestamp, + AtWriteTimestamp: sstToReqTS.IsSet(), }, MVCCHistoryMutation: mvccHistoryMutation, }, @@ -325,9 +323,9 @@ func assertSSTContents(sst []byte, sstTimestamp hlc.Timestamp, stats *enginepb.M if len(value) == 0 { return errors.AssertionFailedf("SST contains tombstone for key %s", key) } - if !sstTimestamp.IsEmpty() && key.Timestamp != sstTimestamp { - return errors.AssertionFailedf("SST has incorrect timestamp %s for SST key %s (expected %s)", - key.Timestamp, key.Key, sstTimestamp) + if sstTimestamp.IsSet() && key.Timestamp != sstTimestamp { + return errors.AssertionFailedf("SST has unexpected timestamp %s (expected %s) for key %s", + key.Timestamp, sstTimestamp, key.Key) } iter.Next() } diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go index dc5ac13cdc79..7633119980ea 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go @@ -48,21 +48,20 @@ func TestEvalAddSSTable(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - const defaultReqTS = 10 // request sent with this timestamp by default - const intentTS = 100 // values with this timestamp are written as intents + const intentTS = 100 // values with this timestamp are written as intents - // These are run with IngestAsWrites both disabled and enabled. - // nolint:composites + // These are run with IngestAsWrites both disabled and enabled, and + // kv.bulk_io_write.sst_rewrite_concurrency.per_call of 0 and 8. testcases := map[string]struct { data []sstutil.KV sst []sstutil.KV - sstTimestamp int64 // SSTTimestamp set to given timestamp - atReqTS int64 // WriteAtRequestTimestamp with given timestamp + reqTS int64 + toReqTS int64 // SSTTimestampToRequestTimestamp with given SST timestamp noConflict bool // DisallowConflicts noShadow bool // DisallowShadowing noShadowBelow int64 // DisallowShadowingBelow expect []sstutil.KV - expectErr interface{} // error type, substring, or true (any error) + expectErr interface{} // error type, substring, substring slice, or true (any) expectErrRace interface{} expectStatsEst bool // expect MVCCStats.ContainsEstimates, don't check stats }{ @@ -89,13 +88,19 @@ func TestEvalAddSSTable(t *testing.T) { sst: []sstutil.KV{{"a", 1, "sst"}, {"c", 1, "sst"}}, expectErr: &roachpb.WriteIntentError{}, }, - "blind writes tombstones": { // unfortunately, for performance + "blind ignores intent outside span": { + data: []sstutil.KV{{"b", intentTS, "b0"}}, + sst: []sstutil.KV{{"c", 1, "sst"}, {"d", 1, "sst"}}, + expect: []sstutil.KV{{"b", intentTS, "b0"}, {"c", 1, "sst"}, {"d", 1, "sst"}}, + expectStatsEst: true, + }, + "blind writes tombstones unless race": { // unfortunately, for performance sst: []sstutil.KV{{"a", 1, ""}}, expect: []sstutil.KV{{"a", 1, ""}}, expectStatsEst: true, expectErrRace: `SST contains tombstone for key "a"/0.000000001,0`, }, - "blind writes SST inline values": { // unfortunately, for performance + "blind writes SST inline values unless race": { // unfortunately, for performance sst: []sstutil.KV{{"a", 0, "inline"}}, expect: []sstutil.KV{{"a", 0, "inline"}}, expectStatsEst: true, @@ -108,96 +113,123 @@ func TestEvalAddSSTable(t *testing.T) { expectStatsEst: true, }, - // WriteAtRequestTimestamp - "WriteAtRequestTimestamp sets timestamp": { - atReqTS: 10, - sst: []sstutil.KV{{"a", 1, "a1"}, {"b", 3, "b3"}}, - expect: []sstutil.KV{{"a", 10, "a1"}, {"b", 10, "b3"}}, + // SSTTimestampToRequestTimestamp + "SSTTimestampToRequestTimestamp rewrites timestamp": { + reqTS: 10, + toReqTS: 1, + sst: []sstutil.KV{{"a", 1, "a1"}, {"b", 1, "b1"}}, + expect: []sstutil.KV{{"a", 10, "a1"}, {"b", 10, "b1"}}, expectStatsEst: true, }, - "WriteAtRequestTimestamp rejects tombstones": { - atReqTS: 10, - sst: []sstutil.KV{{"a", 1, ""}}, - expectErr: "SST values cannot be tombstones", - expectErrRace: `SST contains tombstone for key "a"/0.000000001,0`, + "SSTTimestampToRequestTimestamp writes tombstones unless race": { // unfortunately, for performance + reqTS: 10, + toReqTS: 1, + sst: []sstutil.KV{{"a", 1, ""}}, + expect: []sstutil.KV{{"a", 10, ""}}, + expectErrRace: `SST contains tombstone for key "a"/0.000000001,0`, + expectStatsEst: true, }, - "WriteAtRequestTimestamp rejects inline values": { - atReqTS: 10, - sst: []sstutil.KV{{"a", 0, "inline"}}, - expectErr: "inline values or intents are not supported", - expectErrRace: `SST contains inline value or intent for key "a"/0,0`, + "SSTTimestampToRequestTimestamp rejects incorrect SST timestamp": { + reqTS: 10, + toReqTS: 1, + sst: []sstutil.KV{{"a", 1, "a1"}, {"b", 1, "b1"}, {"c", 2, "c2"}}, + expectErr: []string{ + `unexpected timestamp 0.000000002,0 (expected 0.000000001,0) for key "c"`, + `key has suffix "\x00\x00\x00\x00\x00\x00\x00\x02\t", expected "\x00\x00\x00\x00\x00\x00\x00\x01\t"`, + }, }, - "WriteAtRequestTimestamp writes below and replaces": { - atReqTS: 5, + "SSTTimestampToRequestTimestamp rejects incorrect 0 SST timestamp": { + reqTS: 10, + toReqTS: 1, + sst: []sstutil.KV{{"a", 1, "a1"}, {"b", 1, "b1"}, {"c", 0, "c0"}}, + expectErr: []string{ + `unexpected timestamp 0,0 (expected 0.000000001,0) for key "c"`, + `key has suffix "", expected "\x00\x00\x00\x00\x00\x00\x00\x01\t"`, + }, + expectErrRace: `SST contains inline value or intent for key "c"/0,0`, + }, + "SSTTimestampToRequestTimestamp writes below and replaces": { + reqTS: 5, + toReqTS: 1, data: []sstutil.KV{{"a", 5, "a5"}, {"b", 7, "b7"}}, sst: []sstutil.KV{{"a", 1, "sst"}, {"b", 1, "sst"}}, expect: []sstutil.KV{{"a", 5, "sst"}, {"b", 7, "b7"}, {"b", 5, "sst"}}, expectStatsEst: true, }, - "WriteAtRequestTimestamp returns WriteIntentError for intents": { - atReqTS: 10, + "SSTTimestampToRequestTimestamp returns WriteIntentError for intents": { + reqTS: 10, + toReqTS: 1, data: []sstutil.KV{{"a", intentTS, "intent"}}, sst: []sstutil.KV{{"a", 1, "a@1"}}, expectErr: &roachpb.WriteIntentError{}, }, - "WriteAtRequestTimestamp errors with DisallowConflicts below existing": { - atReqTS: 5, + "SSTTimestampToRequestTimestamp errors with DisallowConflicts below existing": { + reqTS: 5, + toReqTS: 10, noConflict: true, data: []sstutil.KV{{"a", 5, "a5"}, {"b", 7, "b7"}}, sst: []sstutil.KV{{"a", 10, "sst"}, {"b", 10, "sst"}}, expectErr: &roachpb.WriteTooOldError{}, }, - "WriteAtRequestTimestamp succeeds with DisallowConflicts above existing": { - atReqTS: 8, + "SSTTimestampToRequestTimestamp succeeds with DisallowConflicts above existing": { + reqTS: 8, + toReqTS: 1, noConflict: true, data: []sstutil.KV{{"a", 5, "a5"}, {"b", 7, "b7"}}, sst: []sstutil.KV{{"a", 1, "sst"}, {"b", 1, "sst"}}, expect: []sstutil.KV{{"a", 8, "sst"}, {"a", 5, "a5"}, {"b", 8, "sst"}, {"b", 7, "b7"}}, }, - "WriteAtRequestTimestamp errors with DisallowShadowing below existing": { - atReqTS: 5, + "SSTTimestampToRequestTimestamp errors with DisallowShadowing below existing": { + reqTS: 5, + toReqTS: 10, noShadow: true, data: []sstutil.KV{{"a", 5, "a5"}, {"b", 7, "b7"}}, sst: []sstutil.KV{{"a", 10, "sst"}, {"b", 10, "sst"}}, expectErr: `ingested key collides with an existing one: "a"`, }, - "WriteAtRequestTimestamp errors with DisallowShadowing above existing": { - atReqTS: 8, + "SSTTimestampToRequestTimestamp errors with DisallowShadowing above existing": { + reqTS: 8, + toReqTS: 1, noShadow: true, data: []sstutil.KV{{"a", 5, "a5"}, {"b", 7, "b7"}}, sst: []sstutil.KV{{"a", 1, "sst"}, {"b", 1, "sst"}}, expectErr: `ingested key collides with an existing one: "a"`, }, - "WriteAtRequestTimestamp succeeds with DisallowShadowing above tombstones": { - atReqTS: 8, + "SSTTimestampToRequestTimestamp succeeds with DisallowShadowing above tombstones": { + reqTS: 8, + toReqTS: 1, noShadow: true, data: []sstutil.KV{{"a", 5, ""}, {"b", 7, ""}}, sst: []sstutil.KV{{"a", 1, "sst"}, {"b", 1, "sst"}}, expect: []sstutil.KV{{"a", 8, "sst"}, {"a", 5, ""}, {"b", 8, "sst"}, {"b", 7, ""}}, }, - "WriteAtRequestTimestamp succeeds with DisallowShadowing and idempotent writes": { - atReqTS: 5, + "SSTTimestampToRequestTimestamp succeeds with DisallowShadowing and idempotent writes": { + reqTS: 5, + toReqTS: 1, noShadow: true, data: []sstutil.KV{{"a", 5, "a5"}, {"b", 5, "b5"}}, sst: []sstutil.KV{{"a", 1, "a5"}, {"b", 1, "b5"}}, expect: []sstutil.KV{{"a", 5, "a5"}, {"b", 5, "b5"}}, }, - "WriteAtRequestTimestamp errors with DisallowShadowingBelow equal value above existing below limit": { - atReqTS: 7, + "SSTTimestampToRequestTimestamp errors with DisallowShadowingBelow equal value above existing below limit": { + reqTS: 7, + toReqTS: 10, noShadowBelow: 5, data: []sstutil.KV{{"a", 3, "a3"}}, sst: []sstutil.KV{{"a", 10, "a3"}}, expectErr: `ingested key collides with an existing one: "a"`, }, - "WriteAtRequestTimestamp errors with DisallowShadowingBelow errors above existing above limit": { - atReqTS: 7, + "SSTTimestampToRequestTimestamp errors with DisallowShadowingBelow errors above existing above limit": { + reqTS: 7, + toReqTS: 10, noShadowBelow: 5, data: []sstutil.KV{{"a", 6, "a6"}}, sst: []sstutil.KV{{"a", 10, "sst"}}, expectErr: `ingested key collides with an existing one: "a"`, }, - "WriteAtRequestTimestamp allows DisallowShadowingBelow equal value above existing above limit": { - atReqTS: 7, + "SSTTimestampToRequestTimestamp allows DisallowShadowingBelow equal value above existing above limit": { + reqTS: 7, + toReqTS: 10, noShadowBelow: 5, data: []sstutil.KV{{"a", 6, "a6"}}, sst: []sstutil.KV{{"a", 10, "a6"}}, @@ -612,158 +644,148 @@ func TestEvalAddSSTable(t *testing.T) { sst: []sstutil.KV{{"a", 7, "a8"}}, expectErr: &roachpb.WriteTooOldError{}, }, - - // SSTTimestamp - "SSTTimestamp works with WriteAtRequestTimestamp": { - atReqTS: 7, - data: []sstutil.KV{{"a", 6, "a6"}}, - sst: []sstutil.KV{{"a", 7, "a7"}}, - sstTimestamp: 7, - expect: []sstutil.KV{{"a", 7, "a7"}, {"a", 6, "a6"}}, - expectStatsEst: true, - }, - /* Disabled due to nondeterminism under metamorphic tests. SSTTimestamp will - * shortly be removed anyway. - "SSTTimestamp doesn't rewrite with incorrect timestamp, but errors under race": { - atReqTS: 8, - data: []sstutil.KV{{"a", 6, "a6"}}, - sst: []sstutil.KV{{"a", 7, "a7"}}, - sstTimestamp: 8, - expect: []sstutil.KV{{"a", 7, "a7"}, {"a", 6, "a6"}}, - expectErrRace: `incorrect timestamp 0.000000007,0 for SST key "a" (expected 0.000000008,0)`, - expectStatsEst: true, - },*/ } testutils.RunTrueAndFalse(t, "IngestAsWrites", func(t *testing.T, ingestAsWrites bool) { - for name, tc := range testcases { - t.Run(name, func(t *testing.T) { - st := cluster.MakeTestingClusterSettings() - ctx := context.Background() - - dir := t.TempDir() - engine, err := storage.Open(ctx, storage.Filesystem(filepath.Join(dir, "db")), storage.Settings(st)) - require.NoError(t, err) - defer engine.Close() - - // Write initial data. - intentTxn := roachpb.MakeTransaction("intentTxn", nil, 0, hlc.Timestamp{WallTime: intentTS}, 0, 1) - b := engine.NewBatch() - for i := len(tc.data) - 1; i >= 0; i-- { // reverse, older timestamps first - kv := tc.data[i] - var txn *roachpb.Transaction - if kv.WallTimestamp == intentTS { - txn = &intentTxn + testutils.RunValues(t, "RewriteConcurrency", []interface{}{0, 8}, func(t *testing.T, c interface{}) { + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + batcheval.AddSSTableRewriteConcurrency.Override(ctx, &st.SV, int64(c.(int))) + + dir := t.TempDir() + engine, err := storage.Open(ctx, storage.Filesystem(filepath.Join(dir, "db")), storage.Settings(st)) + require.NoError(t, err) + defer engine.Close() + + // Write initial data. + intentTxn := roachpb.MakeTransaction("intentTxn", nil, 0, hlc.Timestamp{WallTime: intentTS}, 0, 1) + b := engine.NewBatch() + for i := len(tc.data) - 1; i >= 0; i-- { // reverse, older timestamps first + kv := tc.data[i] + var txn *roachpb.Transaction + if kv.WallTimestamp == intentTS { + txn = &intentTxn + } + require.NoError(t, storage.MVCCPut(ctx, b, nil, kv.Key(), kv.Timestamp(), kv.Value(), txn)) } - require.NoError(t, storage.MVCCPut(ctx, b, nil, kv.Key(), kv.Timestamp(), kv.Value(), txn)) - } - require.NoError(t, b.Commit(false)) - stats := engineStats(t, engine) - - // Build and add SST. - sst, start, end := sstutil.MakeSST(ctx, st, t, tc.sst) - reqTS := hlc.Timestamp{WallTime: defaultReqTS} - if tc.atReqTS != 0 { - reqTS.WallTime = tc.atReqTS - } - resp := &roachpb.AddSSTableResponse{} - result, err := batcheval.EvalAddSSTable(ctx, engine, batcheval.CommandArgs{ - EvalCtx: (&batcheval.MockEvalCtx{ClusterSettings: st}).EvalContext(), - Stats: stats, - Header: roachpb.Header{ - Timestamp: reqTS, - }, - Args: &roachpb.AddSSTableRequest{ - RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, - Data: sst, - MVCCStats: sstutil.ComputeStats(t, sst), - DisallowConflicts: tc.noConflict, - DisallowShadowing: tc.noShadow, - DisallowShadowingBelow: hlc.Timestamp{WallTime: tc.noShadowBelow}, - WriteAtRequestTimestamp: tc.atReqTS != 0, - SSTTimestamp: hlc.Timestamp{WallTime: tc.sstTimestamp}, - IngestAsWrites: ingestAsWrites, - }, - }, resp) - - expectErr := tc.expectErr - if tc.expectErrRace != nil && util.RaceEnabled { - expectErr = tc.expectErrRace - } - if expectErr != nil { - require.Error(t, err) - if b, ok := expectErr.(bool); ok && b { - // any error is fine - } else if expectMsg, ok := expectErr.(string); ok { - require.Contains(t, err.Error(), expectMsg) - } else if e, ok := expectErr.(error); ok { - require.True(t, errors.HasType(err, e), "expected %T, got %v", e, err) - } else { - require.Fail(t, "invalid expectErr", "expectErr=%v", expectErr) + require.NoError(t, b.Commit(false)) + stats := engineStats(t, engine, 0) + + // Build and add SST. + if tc.toReqTS != 0 && tc.reqTS == 0 && tc.expectErr == nil { + t.Fatal("can't set toReqTS without reqTS") + } + sst, start, end := sstutil.MakeSST(t, st, tc.sst) + resp := &roachpb.AddSSTableResponse{} + result, err := batcheval.EvalAddSSTable(ctx, engine, batcheval.CommandArgs{ + EvalCtx: (&batcheval.MockEvalCtx{ClusterSettings: st}).EvalContext(), + Stats: stats, + Header: roachpb.Header{ + Timestamp: hlc.Timestamp{WallTime: tc.reqTS}, + }, + Args: &roachpb.AddSSTableRequest{ + RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, + Data: sst, + MVCCStats: sstutil.ComputeStats(t, sst), + DisallowConflicts: tc.noConflict, + DisallowShadowing: tc.noShadow, + DisallowShadowingBelow: hlc.Timestamp{WallTime: tc.noShadowBelow}, + SSTTimestampToRequestTimestamp: hlc.Timestamp{WallTime: tc.toReqTS}, + IngestAsWrites: ingestAsWrites, + }, + }, resp) + + expectErr := tc.expectErr + if tc.expectErrRace != nil && util.RaceEnabled { + expectErr = tc.expectErrRace + } + if expectErr != nil { + require.Error(t, err) + if b, ok := expectErr.(bool); ok && b { + // any error is fine + } else if expectMsg, ok := expectErr.(string); ok { + require.Contains(t, err.Error(), expectMsg) + } else if expectMsgs, ok := expectErr.([]string); ok { + var found bool + for _, msg := range expectMsgs { + if strings.Contains(err.Error(), msg) { + found = true + break + } + } + if !found { + t.Fatalf("%q does not contain any of %q", err, expectMsgs) + } + } else if e, ok := expectErr.(error); ok { + require.True(t, errors.HasType(err, e), "expected %T, got %v", e, err) + } else { + require.Fail(t, "invalid expectErr", "expectErr=%v", expectErr) + } + return } - return - } - require.NoError(t, err) - - if ingestAsWrites { - require.Nil(t, result.Replicated.AddSSTable) - } else { - require.NotNil(t, result.Replicated.AddSSTable) - sstPath := filepath.Join(dir, "sst") - require.NoError(t, engine.WriteFile(sstPath, result.Replicated.AddSSTable.Data)) - require.NoError(t, engine.IngestExternalFiles(ctx, []string{sstPath})) - } - - // Scan resulting data from engine. - iter := storage.NewMVCCIncrementalIterator(engine, storage.MVCCIncrementalIterOptions{ - EndKey: keys.MaxKey, - StartTime: hlc.MinTimestamp, - EndTime: hlc.MaxTimestamp, - IntentPolicy: storage.MVCCIncrementalIterIntentPolicyEmit, - InlinePolicy: storage.MVCCIncrementalIterInlinePolicyEmit, - }) - defer iter.Close() - iter.SeekGE(storage.MVCCKey{Key: keys.SystemPrefix}) - scan := []sstutil.KV{} - for { - ok, err := iter.Valid() require.NoError(t, err) - if !ok { - break + + if ingestAsWrites { + require.Nil(t, result.Replicated.AddSSTable) + } else { + require.NotNil(t, result.Replicated.AddSSTable) + sstPath := filepath.Join(dir, "sst") + require.NoError(t, engine.WriteFile(sstPath, result.Replicated.AddSSTable.Data)) + require.NoError(t, engine.IngestExternalFiles(ctx, []string{sstPath})) } - key := string(iter.Key().Key) - ts := iter.Key().Timestamp.WallTime - var value []byte - if iter.Key().IsValue() { - if len(iter.Value()) > 0 { - value, err = roachpb.Value{RawBytes: iter.Value()}.GetBytes() + + // Scan resulting data from engine. + iter := storage.NewMVCCIncrementalIterator(engine, storage.MVCCIncrementalIterOptions{ + EndKey: keys.MaxKey, + StartTime: hlc.MinTimestamp, + EndTime: hlc.MaxTimestamp, + IntentPolicy: storage.MVCCIncrementalIterIntentPolicyEmit, + InlinePolicy: storage.MVCCIncrementalIterInlinePolicyEmit, + }) + defer iter.Close() + iter.SeekGE(storage.MVCCKey{Key: keys.SystemPrefix}) + scan := []sstutil.KV{} + for { + ok, err := iter.Valid() + require.NoError(t, err) + if !ok { + break + } + key := string(iter.Key().Key) + ts := iter.Key().Timestamp.WallTime + var value []byte + if iter.Key().IsValue() { + if len(iter.Value()) > 0 { + value, err = roachpb.Value{RawBytes: iter.Value()}.GetBytes() + require.NoError(t, err) + } + } else { + var meta enginepb.MVCCMetadata + require.NoError(t, protoutil.Unmarshal(iter.UnsafeValue(), &meta)) + if meta.RawBytes == nil { + // Skip intent metadata records (value emitted separately). + iter.Next() + continue + } + value, err = roachpb.Value{RawBytes: meta.RawBytes}.GetBytes() require.NoError(t, err) } + scan = append(scan, sstutil.KV{key, ts, string(value)}) + iter.Next() + } + require.Equal(t, tc.expect, scan) + + // Check that stats were updated correctly. + if tc.expectStatsEst { + require.True(t, stats.ContainsEstimates > 0, "expected stats to be estimated") } else { - var meta enginepb.MVCCMetadata - require.NoError(t, protoutil.Unmarshal(iter.UnsafeValue(), &meta)) - if meta.RawBytes == nil { - // Skip intent metadata records (value emitted separately). - iter.Next() - continue - } - value, err = roachpb.Value{RawBytes: meta.RawBytes}.GetBytes() - require.NoError(t, err) + require.False(t, stats.ContainsEstimates > 0, "found estimated stats") + require.Equal(t, stats, engineStats(t, engine, stats.LastUpdateNanos)) } - scan = append(scan, sstutil.KV{key, ts, string(value)}) - iter.Next() - } - require.Equal(t, tc.expect, scan) - - // Check that stats were updated correctly. - if tc.expectStatsEst { - require.True(t, stats.ContainsEstimates > 0, "expected stats to be estimated") - } else { - require.False(t, stats.ContainsEstimates > 0, "found estimated stats") - stats.LastUpdateNanos = 0 // avoid spurious diffs - require.Equal(t, stats, engineStats(t, engine)) - } - }) - } + }) + } + }) }) } @@ -777,8 +799,8 @@ func TestEvalAddSSTableRangefeed(t *testing.T) { testcases := map[string]struct { sst []sstutil.KV - atReqTS bool // WriteAtRequestTimestamp - asWrites bool // IngestAsWrites + toReqTS int64 // SSTTimestampToRequestTimestamp + asWrites bool // IngestAsWrites expectHistoryMutation bool expectLogicalOps []enginepb.MVCCLogicalOp }{ @@ -787,9 +809,9 @@ func TestEvalAddSSTableRangefeed(t *testing.T) { expectHistoryMutation: true, expectLogicalOps: nil, }, - "WriteAtRequestTimestamp alone": { + "SSTTimestampToRequestTimestamp alone": { sst: []sstutil.KV{{"a", 1, "a1"}}, - atReqTS: true, + toReqTS: 1, expectHistoryMutation: false, expectLogicalOps: nil, }, @@ -799,10 +821,10 @@ func TestEvalAddSSTableRangefeed(t *testing.T) { expectHistoryMutation: true, expectLogicalOps: nil, }, - "IngestAsWrites and WriteAtRequestTimestamp": { - sst: []sstutil.KV{{"a", 1, "a1"}, {"b", 2, "b2"}}, + "IngestAsWrites and SSTTimestampToRequestTimestamp": { + sst: []sstutil.KV{{"a", 1, "a1"}, {"b", 1, "b1"}}, asWrites: true, - atReqTS: true, + toReqTS: 1, expectHistoryMutation: false, expectLogicalOps: []enginepb.MVCCLogicalOp{ // NOTE: Value is populated by the rangefeed processor, not MVCC, so it @@ -823,7 +845,7 @@ func TestEvalAddSSTableRangefeed(t *testing.T) { opLogger := storage.NewOpLoggerBatch(engine.NewBatch()) // Build and add SST. - sst, start, end := sstutil.MakeSST(ctx, st, t, tc.sst) + sst, start, end := sstutil.MakeSST(t, st, tc.sst) result, err := batcheval.EvalAddSSTable(ctx, opLogger, batcheval.CommandArgs{ EvalCtx: (&batcheval.MockEvalCtx{ClusterSettings: st}).EvalContext(), Header: roachpb.Header{ @@ -831,11 +853,11 @@ func TestEvalAddSSTableRangefeed(t *testing.T) { }, Stats: &enginepb.MVCCStats{}, Args: &roachpb.AddSSTableRequest{ - RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, - Data: sst, - MVCCStats: sstutil.ComputeStats(t, sst), - WriteAtRequestTimestamp: tc.atReqTS, - IngestAsWrites: tc.asWrites, + RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, + Data: sst, + MVCCStats: sstutil.ComputeStats(t, sst), + SSTTimestampToRequestTimestamp: hlc.Timestamp{WallTime: tc.toReqTS}, + IngestAsWrites: tc.asWrites, }, }, &roachpb.AddSSTableResponse{}) require.NoError(t, err) @@ -845,7 +867,7 @@ func TestEvalAddSSTableRangefeed(t *testing.T) { } else { require.NotNil(t, result.Replicated.AddSSTable) require.Equal(t, roachpb.Span{Key: start, EndKey: end}, result.Replicated.AddSSTable.Span) - require.Equal(t, tc.atReqTS, result.Replicated.AddSSTable.AtWriteTimestamp) + require.Equal(t, tc.toReqTS != 0, result.Replicated.AddSSTable.AtWriteTimestamp) } if tc.expectHistoryMutation { require.Equal(t, &kvserverpb.ReplicatedEvalResult_MVCCHistoryMutation{ @@ -905,7 +927,7 @@ func runTestDBAddSSTable( cs := cluster.MakeTestingClusterSettings() { - sst, start, end := sstutil.MakeSST(ctx, cs, t, []sstutil.KV{{"bb", 2, "1"}}) + sst, start, end := sstutil.MakeSST(t, cs, []sstutil.KV{{"bb", 2, "1"}}) // Key is before the range in the request span. err := db.AddSSTable( @@ -947,7 +969,7 @@ func runTestDBAddSSTable( // Check that ingesting a key with an earlier mvcc timestamp doesn't affect // the value returned by Get. { - sst, start, end := sstutil.MakeSST(ctx, cs, t, []sstutil.KV{{"bb", 1, "2"}}) + sst, start, end := sstutil.MakeSST(t, cs, []sstutil.KV{{"bb", 1, "2"}}) require.NoError(t, db.AddSSTable( ctx, start, end, sst, allowConflicts, allowShadowing, allowShadowingBelow, nilStats, ingestAsSST, noTS)) r, err := db.Get(ctx, "bb") @@ -961,7 +983,7 @@ func runTestDBAddSSTable( // Key range in request span is not empty. First time through a different // key is present. Second time through checks the idempotency. { - sst, start, end := sstutil.MakeSST(ctx, cs, t, []sstutil.KV{{"bc", 1, "3"}}) + sst, start, end := sstutil.MakeSST(t, cs, []sstutil.KV{{"bc", 1, "3"}}) var before int64 if store != nil { @@ -997,7 +1019,7 @@ func runTestDBAddSSTable( // ... and doing the same thing but via write-batch works the same. { - sst, start, end := sstutil.MakeSST(ctx, cs, t, []sstutil.KV{{"bd", 1, "3"}}) + sst, start, end := sstutil.MakeSST(t, cs, []sstutil.KV{{"bd", 1, "3"}}) var before int64 if store != nil { @@ -1073,7 +1095,7 @@ func TestAddSSTableMVCCStats(t *testing.T) { require.NoError(t, engine.PutMVCC(kv.MVCCKey(), kv.ValueBytes())) } - sst, start, end := sstutil.MakeSST(ctx, st, t, []sstutil.KV{ + sst, start, end := sstutil.MakeSST(t, st, []sstutil.KV{ {"a", 4, "aaaaaa"}, // mvcc-shadowed by existing delete. {"a", 2, "aa"}, // mvcc-shadowed within SST. {"c", 6, "ccc"}, // same TS as existing, LSM-shadows existing. @@ -1097,7 +1119,7 @@ func TestAddSSTableMVCCStats(t *testing.T) { // After EvalAddSSTable, cArgs.Stats contains a diff to the existing // stats. Make sure recomputing from scratch gets the same answer as // applying the diff to the stats - statsBefore := engineStats(t, engine) + statsBefore := engineStats(t, engine, 0) ts := hlc.Timestamp{WallTime: 7} cArgs := batcheval.CommandArgs{ EvalCtx: evalCtx, @@ -1121,11 +1143,10 @@ func TestAddSSTableMVCCStats(t *testing.T) { statsEvaled.Add(*cArgs.Stats) statsEvaled.Add(statsDelta) statsEvaled.ContainsEstimates = 0 - statsEvaled.LastUpdateNanos = 0 - require.Equal(t, engineStats(t, engine), statsEvaled) + require.Equal(t, engineStats(t, engine, statsEvaled.LastUpdateNanos), statsEvaled) // Check stats for a single KV. - sst, start, end = sstutil.MakeSST(ctx, st, t, []sstutil.KV{{"zzzzzzz", ts.WallTime, "zzz"}}) + sst, start, end = sstutil.MakeSST(t, st, []sstutil.KV{{"zzzzzzz", ts.WallTime, "zzz"}}) cArgs = batcheval.CommandArgs{ EvalCtx: evalCtx, Header: roachpb.Header{Timestamp: ts}, @@ -1189,7 +1210,7 @@ func TestAddSSTableMVCCStatsDisallowShadowing(t *testing.T) { {"c", 2, "bb"}, {"h", 6, "hh"}, } - sst, start, end := sstutil.MakeSST(ctx, st, t, kvs) + sst, start, end := sstutil.MakeSST(t, st, kvs) // Accumulate stats across SST ingestion. commandStats := enginepb.MVCCStats{} @@ -1220,7 +1241,7 @@ func TestAddSSTableMVCCStatsDisallowShadowing(t *testing.T) { // Evaluate the second SST. Both the KVs are perfectly shadowing and should // not contribute to the stats. - sst, start, end = sstutil.MakeSST(ctx, st, t, []sstutil.KV{ + sst, start, end = sstutil.MakeSST(t, st, []sstutil.KV{ {"c", 2, "bb"}, // key has the same timestamp and value as the one present in the existing data. {"h", 6, "hh"}, // key has the same timestamp and value as the one present in the existing data. }) @@ -1239,7 +1260,7 @@ func TestAddSSTableMVCCStatsDisallowShadowing(t *testing.T) { // Evaluate the third SST. Two of the three KVs are perfectly shadowing, but // there is one valid KV which should contribute to the stats. - sst, start, end = sstutil.MakeSST(ctx, st, t, []sstutil.KV{ + sst, start, end = sstutil.MakeSST(t, st, []sstutil.KV{ {"c", 2, "bb"}, // key has the same timestamp and value as the one present in the existing data. {"e", 2, "ee"}, {"h", 6, "hh"}, // key has the same timestamp and value as the one present in the existing data. @@ -1288,7 +1309,7 @@ func TestAddSSTableIntentResolution(t *testing.T) { // Generate an SSTable that covers keys a, b, and c, and submit it with high // priority. This is going to abort the transaction above, encounter its // intent, and resolve it. - sst, start, end := sstutil.MakeSST(ctx, s.ClusterSettings(), t, []sstutil.KV{ + sst, start, end := sstutil.MakeSST(t, s.ClusterSettings(), []sstutil.KV{ {"a", 1, "1"}, {"b", 1, "2"}, {"c", 1, "3"}, @@ -1311,9 +1332,9 @@ func TestAddSSTableIntentResolution(t *testing.T) { require.Contains(t, err.Error(), "TransactionRetryWithProtoRefreshError: TransactionAbortedError") } -// TestAddSSTableWriteAtRequestTimestampRespectsTSCache checks that AddSSTable -// with WriteAtRequestTimestamp respects the timestamp cache. -func TestAddSSTableWriteAtRequestTimestampRespectsTSCache(t *testing.T) { +// TestAddSSTableSSTTimestampToRequestTimestampRespectsTSCache checks that AddSSTable +// with SSTTimestampToRequestTimestamp respects the timestamp cache. +func TestAddSSTableSSTTimestampToRequestTimestampRespectsTSCache(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1330,12 +1351,12 @@ func TestAddSSTableWriteAtRequestTimestampRespectsTSCache(t *testing.T) { txnTS := txn.CommitTimestamp() // Add an SST writing below the previous write. - sst, start, end := sstutil.MakeSST(ctx, s.ClusterSettings(), t, []sstutil.KV{{"key", 1, "sst"}}) + sst, start, end := sstutil.MakeSST(t, s.ClusterSettings(), []sstutil.KV{{"key", 1, "sst"}}) sstReq := &roachpb.AddSSTableRequest{ - RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, - Data: sst, - MVCCStats: sstutil.ComputeStats(t, sst), - WriteAtRequestTimestamp: true, + RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, + Data: sst, + MVCCStats: sstutil.ComputeStats(t, sst), + SSTTimestampToRequestTimestamp: hlc.Timestamp{WallTime: 1}, } ba := roachpb.BatchRequest{ Header: roachpb.Header{Timestamp: txnTS.Prev()}, @@ -1364,9 +1385,9 @@ func TestAddSSTableWriteAtRequestTimestampRespectsTSCache(t *testing.T) { require.Equal(t, "sst", string(kv.ValueBytes())) } -// TestAddSSTableWriteAtRequestTimestampRespectsClosedTS checks that AddSSTable -// with WriteAtRequestTimestamp respects the closed timestamp. -func TestAddSSTableWriteAtRequestTimestampRespectsClosedTS(t *testing.T) { +// TestAddSSTableSSTTimestampToRequestTimestampRespectsClosedTS checks that AddSSTable +// with SSTTimestampToRequestTimestamp respects the closed timestamp. +func TestAddSSTableSSTTimestampToRequestTimestampRespectsClosedTS(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1390,12 +1411,12 @@ func TestAddSSTableWriteAtRequestTimestampRespectsClosedTS(t *testing.T) { // Add an SST writing below the closed timestamp. It should get pushed above it. reqTS := closedTS.Prev() - sst, start, end := sstutil.MakeSST(ctx, store.ClusterSettings(), t, []sstutil.KV{{"key", 1, "sst"}}) + sst, start, end := sstutil.MakeSST(t, store.ClusterSettings(), []sstutil.KV{{"key", 1, "sst"}}) sstReq := &roachpb.AddSSTableRequest{ - RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, - Data: sst, - MVCCStats: sstutil.ComputeStats(t, sst), - WriteAtRequestTimestamp: true, + RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, + Data: sst, + MVCCStats: sstutil.ComputeStats(t, sst), + SSTTimestampToRequestTimestamp: hlc.Timestamp{WallTime: 1}, } ba := roachpb.BatchRequest{ Header: roachpb.Header{Timestamp: reqTS}, @@ -1418,7 +1439,7 @@ func TestAddSSTableWriteAtRequestTimestampRespectsClosedTS(t *testing.T) { } // engineStats computes the MVCC stats for the given engine. -func engineStats(t *testing.T, engine storage.Engine) *enginepb.MVCCStats { +func engineStats(t *testing.T, engine storage.Engine, nowNanos int64) *enginepb.MVCCStats { t.Helper() iter := engine.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ @@ -1428,7 +1449,7 @@ func engineStats(t *testing.T, engine storage.Engine) *enginepb.MVCCStats { defer iter.Close() // We don't care about nowNanos, because the SST can't contain intents or // tombstones and all existing intents will be resolved. - stats, err := storage.ComputeStatsForRange(iter, keys.LocalMax, keys.MaxKey, 0) + stats, err := storage.ComputeStatsForRange(iter, keys.LocalMax, keys.MaxKey, nowNanos) require.NoError(t, err) return &stats } diff --git a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto index 241a00982512..2e19c02270b9 100644 --- a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto +++ b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto @@ -166,8 +166,8 @@ message ReplicatedEvalResult { uint32 crc32 = 2 [(gogoproto.customname) = "CRC32"]; roachpb.Span span = 3 [(gogoproto.nullable) = false]; // If true, all SST MVCC timestamps equal the WriteTimestamp. This is given - // by the WriteAtRequestTimestamp request parameter, which was introduced in - // 22.1 and only used with the MVCCAddSSTable cluster version. + // by the SSTTimestampToRequestTimestamp request parameter, which was + // introduced in 22.1 and only used with the MVCCAddSSTable cluster version. // // TODO(erikgrinaker): This field currently controls whether to emit an // AddSSTable event across the rangefeed. We could equivalently check diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index e84bdc4020f3..25f18ed294c3 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -638,8 +638,8 @@ func (r *Replica) handleLogicalOpLogRaftMuLocked( // handleSSTableRaftMuLocked emits an ingested SSTable from AddSSTable via the // rangefeed. These can be expected to have timestamps at the write timestamp -// (i.e. submitted with WriteAtRequestTimestamp) since we assert elsewhere that -// MVCCHistoryMutation commands disconnect rangefeeds. +// (i.e. submitted with SSTTimestampToRequestTimestamp) since we assert +// elsewhere that MVCCHistoryMutation commands disconnect rangefeeds. // // NB: We currently don't have memory budgeting for rangefeeds, instead using a // large buffered channel, so this can easily OOM the node. This is "fine" for diff --git a/pkg/kv/kvserver/replica_rankings_test.go b/pkg/kv/kvserver/replica_rankings_test.go index 02a9c4315013..5bd0fe48b298 100644 --- a/pkg/kv/kvserver/replica_rankings_test.go +++ b/pkg/kv/kvserver/replica_rankings_test.go @@ -111,7 +111,7 @@ func TestAddSSTQPSStat(t *testing.T) { } nextKey = nextKey.Next() } - sst, start, end := sstutil.MakeSST(ctx, ts.ClusterSettings(), t, sstKeys) + sst, start, end := sstutil.MakeSST(t, ts.ClusterSettings(), sstKeys) requestSize := float64(len(sst)) sstReq := &roachpb.AddSSTableRequest{ diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index f4d0b4c8aa47..67d6c649e334 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -1335,7 +1335,7 @@ func (*AdminScatterRequest) flags() flag { return isAdmin | isR func (*AdminVerifyProtectedTimestampRequest) flags() flag { return isAdmin | isRange | isAlone } func (r *AddSSTableRequest) flags() flag { flags := isWrite | isRange | isAlone | isUnsplittable | canBackpressure - if r.WriteAtRequestTimestamp { + if r.SSTTimestampToRequestTimestamp.IsSet() { flags |= appliesTSCache } return flags diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index 2ce1436c216e..378d9c33b0c6 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -1600,9 +1600,8 @@ message AdminVerifyProtectedTimestampResponse { // AddSSTableRequest contains arguments to the AddSSTable method, which links an // SST file into the Pebble log-structured merge-tree. The SST must only contain // committed versioned values with non-zero MVCC timestamps (no intents or -// inline values) and no tombstones, but this is only fully enforced when -// WriteAtRequestTimestamp is enabled, for performance. It cannot be used in a -// transaction, cannot be split across ranges, and must be alone in a batch. +// inline values) and no tombstones. It cannot be used in a transaction, cannot +// be split across ranges, and must be alone in a batch. // // By default, AddSSTable will blindly write the SST contents into Pebble, with // fixed MVCC timestamps unaffected by pushes. This can violate many CRDB @@ -1615,8 +1614,8 @@ message AdminVerifyProtectedTimestampResponse { // The following parameters can be used to make AddSSTable enforce these // guarantees, at a performance cost: // -// * WriteAtRequestTimestamp: ensures compliance with the timestamp cache and -// closed timestamp, by rewriting SST timestamps to the request timestamp. +// * SSTTimestampToRequestTimestamp: ensures compliance with the timestamp cache +// and closed timestamp, by rewriting SST timestamps to the request timestamp. // Also emits the SST via the range feed. // // * DisallowConflicts, DisallowShadowing, or DisallowShadowingBelow: ensures @@ -1643,11 +1642,13 @@ message AddSSTableRequest { RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; bytes data = 2; - // WriteAtRequestTimestamp updates all MVCC timestamps in the SST to the - // request timestamp, even if the request gets pushed. This ensures the writes - // comply with the timestamp cache and closed timestamp. It also causes the - // AddSSTable to be emitted via the range feed, since it respects the closed - // timestamp. + // SSTTimestampToRequestTimestamp gives the timestamp used for all MVCC keys + // in the provided SST. If this timestamp differs from the request timestamp + // (e.g. if the request gets pushed) then all MVCC keys in the SST will be + // rewritten to the request timestamp during request evaluation. This ensures + // the writes comply with the timestamp cache and closed timestamp. It also + // causes the AddSSTable to be emitted via the range feed, since it respects + // the closed timestamp. // // Callers should always set this, except in very special circumstances when // the timestamp cache and closed timestamp can safely be ignored (e.g. @@ -1660,17 +1661,8 @@ message AddSSTableRequest { // serializability. // // Added in 22.1, so check the MVCCAddSSTable version gate before using. - bool write_at_request_timestamp = 6; - - // SSTTimestamp is a promise from the client that all MVCC timestamps in the - // SST equal the provided timestamp, and that there are no inline values, - // intents, or tombstones. When used together with WriteAtRequestTimestamp, - // this can avoid an SST rewrite (and the associated overhead) if the SST - // timestamp equals the request timestamp (i.e. if it was provided by the - // client and the request was not pushed due to e.g. the closed timestamp or - // contention). - util.hlc.Timestamp sst_timestamp = 9 - [(gogoproto.customname) = "SSTTimestamp", (gogoproto.nullable) = false]; + util.hlc.Timestamp sst_timestamp_to_request_timestamp = 6 [ + (gogoproto.customname) = "SSTTimestampToRequestTimestamp", (gogoproto.nullable) = false]; // DisallowConflicts will check for MVCC conflicts with existing keys, i.e. // scan for existing keys with a timestamp at or above the SST key and @@ -1680,7 +1672,7 @@ message AddSSTableRequest { // Note that this alone is not sufficient to guarantee serializability or // single-key linearizability, since it can write to a timestamp that another // reader has already observed, changing the value at that timestamp and above - // it. Use WriteAtRequestTimestamp in addition to guarantee serializability. + // it. Use with SSTTimestampToRequestTimestamp to guarantee serializability. // // Added in 22.1, so check the MVCCAddSSTable version gate before using. // @@ -2626,10 +2618,10 @@ message RangeFeedError { // RangeFeedSSTable is a variant of RangeFeedEvent that represents an AddSSTable // operation, containing the entire ingested SST. It is only emitted for -// SSTables written with WriteAtRequestTimestamp enabled, so it is guaranteed to -// comply with the closed timestamp. The Span and WriteTS fields are advisory, -// and contain the client-provided SST key span (may be wider than the SST data) -// and the MVCC timestamp used for all contained entries. +// SSTables written with SSTTimestampToRequestTimestamp enabled, so it is +// guaranteed to comply with the closed timestamp. The Span and WriteTS fields +// are advisory, and contain the client-provided SST key span (may be wider than +// the SST data) and the MVCC timestamp used for all contained entries. // // The entire SST is emitted even for registrations that have a narrower span, // it is up to the caller to prune the SST as appropriate. Catchup scans emit diff --git a/pkg/roachpb/api_test.go b/pkg/roachpb/api_test.go index 45d0ce748b8f..5b45ea34d975 100644 --- a/pkg/roachpb/api_test.go +++ b/pkg/roachpb/api_test.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/redact" gogoproto "github.com/gogo/protobuf/proto" @@ -315,7 +316,7 @@ func TestTenantConsumptionAddSub(t *testing.T) { func TestFlagCombinations(t *testing.T) { // Any non-zero-valued request variants that conditionally affect flags. reqVariants := []Request{ - &AddSSTableRequest{WriteAtRequestTimestamp: true}, + &AddSSTableRequest{SSTTimestampToRequestTimestamp: hlc.Timestamp{Logical: 1}}, &DeleteRangeRequest{Inline: true}, &GetRequest{KeyLocking: lock.Exclusive}, &ReverseScanRequest{KeyLocking: lock.Exclusive}, diff --git a/pkg/storage/sst.go b/pkg/storage/sst.go index 187b701729d0..cc87935da39b 100644 --- a/pkg/storage/sst.go +++ b/pkg/storage/sst.go @@ -183,8 +183,8 @@ func CheckSSTConflicts( // If the existing key has a timestamp at or above the SST key, return a // WriteTooOldError. Normally this could cause a transactional request to be // automatically retried after a read refresh, which we would only want to - // do if AddSSTable had WriteAtRequestTimestamp set, but AddSSTable cannot - // be used in transactions so we don't need to check. + // do if AddSSTable had SSTTimestampToRequestTimestamp set, but AddSSTable + // cannot be used in transactions so we don't need to check. if sstKey.Timestamp.LessEq(extKey.Timestamp) { return enginepb.MVCCStats{}, roachpb.NewWriteTooOldError( sstKey.Timestamp, extKey.Timestamp.Next(), sstKey.Key) @@ -221,17 +221,23 @@ func CheckSSTConflicts( return statsDiff, nil } -// UpdateSSTTimestamps replaces all MVCC timestamp in the provided SST with the -// given timestamp. All keys must have a non-zero timestamp, otherwise an error -// is returned to protect against accidental inclusion of intents or inline -// values. Tombstones are also rejected opportunistically, since we're iterating -// over the entire SST anyway. +// UpdateSSTTimestamps replaces all MVCC timestamp in the provided SST to the +// given timestamp. All keys must already have the given "from" timestamp. func UpdateSSTTimestamps( ctx context.Context, st *cluster.Settings, sst []byte, from, to hlc.Timestamp, concurrency int, ) ([]byte, error) { + if from.IsEmpty() { + return nil, errors.Errorf("from timestamp not given") + } + if to.IsEmpty() { + return nil, errors.Errorf("to timestamp not given") + } + sstOut := &MemFile{} sstOut.Buffer.Grow(len(sst)) - if concurrency > 0 && !from.IsEmpty() { + + // Fancy optimized Pebble SST rewriter. + if concurrency > 0 { defaults := DefaultPebbleOptions() opts := defaults.MakeReaderOptions() if fp := defaults.Levels[0].FilterPolicy; fp != nil && len(opts.Filters) == 0 { @@ -241,13 +247,16 @@ func UpdateSSTTimestamps( opts, sstOut, MakeIngestionWriterOptions(ctx, st), - encodeMVCCTimestampSuffix(from), encodeMVCCTimestampSuffix(to), + encodeMVCCTimestampSuffix(from), + encodeMVCCTimestampSuffix(to), concurrency, ); err != nil { return nil, err } return sstOut.Bytes(), nil } + + // Naïve read/write loop. writer := MakeIngestionSSTWriter(ctx, st, sstOut) defer writer.Close() @@ -263,11 +272,10 @@ func UpdateSSTTimestamps( } else if !ok { break } - if iter.UnsafeKey().Timestamp.IsEmpty() { - return nil, errors.New("inline values or intents are not supported") - } - if len(iter.UnsafeValue()) == 0 { - return nil, errors.New("SST values cannot be tombstones") + key := iter.UnsafeKey() + if key.Timestamp != from { + return nil, errors.Errorf("unexpected timestamp %s (expected %s) for key %s", + key.Timestamp, from, key.Key) } err = writer.PutMVCC(MVCCKey{Key: iter.UnsafeKey().Key, Timestamp: to}, iter.UnsafeValue()) if err != nil { diff --git a/pkg/storage/sst_test.go b/pkg/storage/sst_test.go index ece6fd874edf..3cf0dedd1bab 100644 --- a/pkg/storage/sst_test.go +++ b/pkg/storage/sst_test.go @@ -106,11 +106,12 @@ func BenchmarkUpdateSSTTimestamps(b *testing.B) { modeCounter // uint64 counter in first 8 bytes modeRandom // random values - sstSize = 0 - keyCount = 500000 - valueSize = 8 - valueMode = modeRandom - profile = false // cpuprofile.pprof + concurrency = 0 // 0 uses naïve replacement + sstSize = 0 + keyCount = 500000 + valueSize = 8 + valueMode = modeRandom + profile = false // cpuprofile.pprof ) if sstSize > 0 && keyCount > 0 { @@ -129,7 +130,7 @@ func BenchmarkUpdateSSTTimestamps(b *testing.B) { key := make([]byte, 8) value := make([]byte, valueSize) - ts := hlc.Timestamp{WallTime: 1} + sstTimestamp := hlc.Timestamp{WallTime: 1} var i uint64 for i = 0; (keyCount > 0 && i < keyCount) || (sstSize > 0 && sstFile.Len() < sstSize); i++ { binary.BigEndian.PutUint64(key, i) @@ -138,11 +139,8 @@ func BenchmarkUpdateSSTTimestamps(b *testing.B) { case modeZero: case modeCounter: binary.BigEndian.PutUint64(value, i) - ts.WallTime++ case modeRandom: r.Read(value) - ts.WallTime = r.Int63() - ts.Logical = r.Int31() default: b.Fatalf("unknown value mode %d", valueMode) } @@ -151,7 +149,7 @@ func BenchmarkUpdateSSTTimestamps(b *testing.B) { v.SetBytes(value) v.InitChecksum(key) - require.NoError(b, writer.PutMVCC(MVCCKey{Key: key, Timestamp: ts}, v.RawBytes)) + require.NoError(b, writer.PutMVCC(MVCCKey{Key: key, Timestamp: sstTimestamp}, v.RawBytes)) } writer.Close() b.Logf("%vMB %v keys", sstFile.Len()/1e6, i) @@ -165,10 +163,12 @@ func BenchmarkUpdateSSTTimestamps(b *testing.B) { defer pprof.StopCPUProfile() } + requestTimestamp := hlc.Timestamp{WallTime: 1634899098417970999, Logical: 9} + b.StartTimer() for i := 0; i < b.N; i++ { - ts := hlc.Timestamp{WallTime: 1634899098417970999, Logical: 9} - _, err := UpdateSSTTimestamps(ctx, st, sstFile.Bytes(), hlc.Timestamp{}, ts, 0) + _, err := UpdateSSTTimestamps( + ctx, st, sstFile.Bytes(), sstTimestamp, requestTimestamp, concurrency) require.NoError(b, err) } } diff --git a/pkg/testutils/sstutil/sstutil.go b/pkg/testutils/sstutil/sstutil.go index 4655f2864ada..8ab97c0e270f 100644 --- a/pkg/testutils/sstutil/sstutil.go +++ b/pkg/testutils/sstutil/sstutil.go @@ -25,13 +25,11 @@ import ( // MakeSST builds a binary in-memory SST from the given tests data. It returns // the binary SST data as well as the start and end (exclusive) keys of the SST. -func MakeSST( - ctx context.Context, cs *cluster.Settings, t *testing.T, kvs []KV, -) ([]byte, roachpb.Key, roachpb.Key) { +func MakeSST(t *testing.T, st *cluster.Settings, kvs []KV) ([]byte, roachpb.Key, roachpb.Key) { t.Helper() sstFile := &storage.MemFile{} - writer := storage.MakeIngestionSSTWriter(ctx, cs, sstFile) + writer := storage.MakeIngestionSSTWriter(context.Background(), st, sstFile) defer writer.Close() start, end := keys.MaxKey, keys.MinKey diff --git a/pkg/util/hlc/timestamp.go b/pkg/util/hlc/timestamp.go index 0263ac1d3946..b66fdbbb377e 100644 --- a/pkg/util/hlc/timestamp.go +++ b/pkg/util/hlc/timestamp.go @@ -180,6 +180,11 @@ func (t Timestamp) IsEmpty() bool { return t == Timestamp{} } +// IsSet returns true if t is not an empty Timestamp. +func (t Timestamp) IsSet() bool { + return !t.IsEmpty() +} + // Add returns a timestamp with the WallTime and Logical components increased. // wallTime is expressed in nanos. //