From 97c2fb2abcac84dbce4d8a12f05f3cbff851bdfc Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Sat, 5 Feb 2022 11:25:23 +0000 Subject: [PATCH 1/3] hlc: add `Timestamp.IsSet()` This patch adds `Timestamp.IsSet()`, which is the inverse of `IsEmpty()`. In complex boolean expressions, `IsSet()` is significantly easier to parse than `!IsEmpty()`, which can avoid mistakes. Release note: None --- pkg/util/hlc/timestamp.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/util/hlc/timestamp.go b/pkg/util/hlc/timestamp.go index 0263ac1d3946..b66fdbbb377e 100644 --- a/pkg/util/hlc/timestamp.go +++ b/pkg/util/hlc/timestamp.go @@ -180,6 +180,11 @@ func (t Timestamp) IsEmpty() bool { return t == Timestamp{} } +// IsSet returns true if t is not an empty Timestamp. +func (t Timestamp) IsSet() bool { + return !t.IsEmpty() +} + // Add returns a timestamp with the WallTime and Logical components increased. // wallTime is expressed in nanos. // From 7746828bd2e0e9462eb9bc1278a91e856ce13a98 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 18 Feb 2022 11:43:28 +0000 Subject: [PATCH 2/3] testutils/sstutil: tweak `MakeSST()` parameters This makes `testing.T` the first parameter, as is customary, and replaces the unnecessary `Context` parameter with a background context as this is only used for logging. Release note: None --- .../rangefeed/rangefeed_external_test.go | 4 +-- .../batcheval/cmd_add_sstable_test.go | 28 +++++++++---------- pkg/kv/kvserver/replica_rankings_test.go | 2 +- pkg/testutils/sstutil/sstutil.go | 6 ++-- 4 files changed, 19 insertions(+), 21 deletions(-) diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go index 5cd040b68bc3..4682506b5013 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go @@ -511,7 +511,7 @@ func TestWithOnSSTable(t *testing.T) { now.Logical = 0 ts := now.WallTime sstKVs := []sstutil.KV{{"a", ts, "1"}, {"b", ts, "2"}, {"c", ts, "3"}, {"d", ts, "4"}, {"e", ts, "5"}} - sst, sstStart, sstEnd := sstutil.MakeSST(ctx, srv.ClusterSettings(), t, sstKVs) + sst, sstStart, sstEnd := sstutil.MakeSST(t, srv.ClusterSettings(), sstKVs) _, pErr := db.AddSSTableAtBatchTimestamp(ctx, sstStart, sstEnd, sst, false /* disallowConflicts */, false /* disallowShadowing */, hlc.Timestamp{}, nil, /* stats */ false /* ingestAsWrites */, now) @@ -587,7 +587,7 @@ func TestWithOnSSTableCatchesUpIfNotSet(t *testing.T) { ts := now.WallTime sstKVs := []sstutil.KV{{"a", ts, "1"}, {"b", ts, "2"}, {"c", ts, "3"}, {"d", ts, "4"}, {"e", ts, "5"}} expectKVs := []sstutil.KV{{"c", ts, "3"}, {"d", ts, "4"}} - sst, sstStart, sstEnd := sstutil.MakeSST(ctx, srv.ClusterSettings(), t, sstKVs) + sst, sstStart, sstEnd := sstutil.MakeSST(t, srv.ClusterSettings(), sstKVs) _, pErr := db.AddSSTableAtBatchTimestamp(ctx, sstStart, sstEnd, sst, false /* disallowConflicts */, false /* disallowShadowing */, hlc.Timestamp{}, nil, /* stats */ false /* ingestAsWrites */, now) diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go index dc5ac13cdc79..ce05f70b1f63 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go @@ -660,7 +660,7 @@ func TestEvalAddSSTable(t *testing.T) { stats := engineStats(t, engine) // Build and add SST. - sst, start, end := sstutil.MakeSST(ctx, st, t, tc.sst) + sst, start, end := sstutil.MakeSST(t, st, tc.sst) reqTS := hlc.Timestamp{WallTime: defaultReqTS} if tc.atReqTS != 0 { reqTS.WallTime = tc.atReqTS @@ -823,7 +823,7 @@ func TestEvalAddSSTableRangefeed(t *testing.T) { opLogger := storage.NewOpLoggerBatch(engine.NewBatch()) // Build and add SST. - sst, start, end := sstutil.MakeSST(ctx, st, t, tc.sst) + sst, start, end := sstutil.MakeSST(t, st, tc.sst) result, err := batcheval.EvalAddSSTable(ctx, opLogger, batcheval.CommandArgs{ EvalCtx: (&batcheval.MockEvalCtx{ClusterSettings: st}).EvalContext(), Header: roachpb.Header{ @@ -905,7 +905,7 @@ func runTestDBAddSSTable( cs := cluster.MakeTestingClusterSettings() { - sst, start, end := sstutil.MakeSST(ctx, cs, t, []sstutil.KV{{"bb", 2, "1"}}) + sst, start, end := sstutil.MakeSST(t, cs, []sstutil.KV{{"bb", 2, "1"}}) // Key is before the range in the request span. err := db.AddSSTable( @@ -947,7 +947,7 @@ func runTestDBAddSSTable( // Check that ingesting a key with an earlier mvcc timestamp doesn't affect // the value returned by Get. { - sst, start, end := sstutil.MakeSST(ctx, cs, t, []sstutil.KV{{"bb", 1, "2"}}) + sst, start, end := sstutil.MakeSST(t, cs, []sstutil.KV{{"bb", 1, "2"}}) require.NoError(t, db.AddSSTable( ctx, start, end, sst, allowConflicts, allowShadowing, allowShadowingBelow, nilStats, ingestAsSST, noTS)) r, err := db.Get(ctx, "bb") @@ -961,7 +961,7 @@ func runTestDBAddSSTable( // Key range in request span is not empty. First time through a different // key is present. Second time through checks the idempotency. { - sst, start, end := sstutil.MakeSST(ctx, cs, t, []sstutil.KV{{"bc", 1, "3"}}) + sst, start, end := sstutil.MakeSST(t, cs, []sstutil.KV{{"bc", 1, "3"}}) var before int64 if store != nil { @@ -997,7 +997,7 @@ func runTestDBAddSSTable( // ... and doing the same thing but via write-batch works the same. { - sst, start, end := sstutil.MakeSST(ctx, cs, t, []sstutil.KV{{"bd", 1, "3"}}) + sst, start, end := sstutil.MakeSST(t, cs, []sstutil.KV{{"bd", 1, "3"}}) var before int64 if store != nil { @@ -1073,7 +1073,7 @@ func TestAddSSTableMVCCStats(t *testing.T) { require.NoError(t, engine.PutMVCC(kv.MVCCKey(), kv.ValueBytes())) } - sst, start, end := sstutil.MakeSST(ctx, st, t, []sstutil.KV{ + sst, start, end := sstutil.MakeSST(t, st, []sstutil.KV{ {"a", 4, "aaaaaa"}, // mvcc-shadowed by existing delete. {"a", 2, "aa"}, // mvcc-shadowed within SST. {"c", 6, "ccc"}, // same TS as existing, LSM-shadows existing. @@ -1125,7 +1125,7 @@ func TestAddSSTableMVCCStats(t *testing.T) { require.Equal(t, engineStats(t, engine), statsEvaled) // Check stats for a single KV. - sst, start, end = sstutil.MakeSST(ctx, st, t, []sstutil.KV{{"zzzzzzz", ts.WallTime, "zzz"}}) + sst, start, end = sstutil.MakeSST(t, st, []sstutil.KV{{"zzzzzzz", ts.WallTime, "zzz"}}) cArgs = batcheval.CommandArgs{ EvalCtx: evalCtx, Header: roachpb.Header{Timestamp: ts}, @@ -1189,7 +1189,7 @@ func TestAddSSTableMVCCStatsDisallowShadowing(t *testing.T) { {"c", 2, "bb"}, {"h", 6, "hh"}, } - sst, start, end := sstutil.MakeSST(ctx, st, t, kvs) + sst, start, end := sstutil.MakeSST(t, st, kvs) // Accumulate stats across SST ingestion. commandStats := enginepb.MVCCStats{} @@ -1220,7 +1220,7 @@ func TestAddSSTableMVCCStatsDisallowShadowing(t *testing.T) { // Evaluate the second SST. Both the KVs are perfectly shadowing and should // not contribute to the stats. - sst, start, end = sstutil.MakeSST(ctx, st, t, []sstutil.KV{ + sst, start, end = sstutil.MakeSST(t, st, []sstutil.KV{ {"c", 2, "bb"}, // key has the same timestamp and value as the one present in the existing data. {"h", 6, "hh"}, // key has the same timestamp and value as the one present in the existing data. }) @@ -1239,7 +1239,7 @@ func TestAddSSTableMVCCStatsDisallowShadowing(t *testing.T) { // Evaluate the third SST. Two of the three KVs are perfectly shadowing, but // there is one valid KV which should contribute to the stats. - sst, start, end = sstutil.MakeSST(ctx, st, t, []sstutil.KV{ + sst, start, end = sstutil.MakeSST(t, st, []sstutil.KV{ {"c", 2, "bb"}, // key has the same timestamp and value as the one present in the existing data. {"e", 2, "ee"}, {"h", 6, "hh"}, // key has the same timestamp and value as the one present in the existing data. @@ -1288,7 +1288,7 @@ func TestAddSSTableIntentResolution(t *testing.T) { // Generate an SSTable that covers keys a, b, and c, and submit it with high // priority. This is going to abort the transaction above, encounter its // intent, and resolve it. - sst, start, end := sstutil.MakeSST(ctx, s.ClusterSettings(), t, []sstutil.KV{ + sst, start, end := sstutil.MakeSST(t, s.ClusterSettings(), []sstutil.KV{ {"a", 1, "1"}, {"b", 1, "2"}, {"c", 1, "3"}, @@ -1330,7 +1330,7 @@ func TestAddSSTableWriteAtRequestTimestampRespectsTSCache(t *testing.T) { txnTS := txn.CommitTimestamp() // Add an SST writing below the previous write. - sst, start, end := sstutil.MakeSST(ctx, s.ClusterSettings(), t, []sstutil.KV{{"key", 1, "sst"}}) + sst, start, end := sstutil.MakeSST(t, s.ClusterSettings(), []sstutil.KV{{"key", 1, "sst"}}) sstReq := &roachpb.AddSSTableRequest{ RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, Data: sst, @@ -1390,7 +1390,7 @@ func TestAddSSTableWriteAtRequestTimestampRespectsClosedTS(t *testing.T) { // Add an SST writing below the closed timestamp. It should get pushed above it. reqTS := closedTS.Prev() - sst, start, end := sstutil.MakeSST(ctx, store.ClusterSettings(), t, []sstutil.KV{{"key", 1, "sst"}}) + sst, start, end := sstutil.MakeSST(t, store.ClusterSettings(), []sstutil.KV{{"key", 1, "sst"}}) sstReq := &roachpb.AddSSTableRequest{ RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, Data: sst, diff --git a/pkg/kv/kvserver/replica_rankings_test.go b/pkg/kv/kvserver/replica_rankings_test.go index 02a9c4315013..5bd0fe48b298 100644 --- a/pkg/kv/kvserver/replica_rankings_test.go +++ b/pkg/kv/kvserver/replica_rankings_test.go @@ -111,7 +111,7 @@ func TestAddSSTQPSStat(t *testing.T) { } nextKey = nextKey.Next() } - sst, start, end := sstutil.MakeSST(ctx, ts.ClusterSettings(), t, sstKeys) + sst, start, end := sstutil.MakeSST(t, ts.ClusterSettings(), sstKeys) requestSize := float64(len(sst)) sstReq := &roachpb.AddSSTableRequest{ diff --git a/pkg/testutils/sstutil/sstutil.go b/pkg/testutils/sstutil/sstutil.go index 4655f2864ada..8ab97c0e270f 100644 --- a/pkg/testutils/sstutil/sstutil.go +++ b/pkg/testutils/sstutil/sstutil.go @@ -25,13 +25,11 @@ import ( // MakeSST builds a binary in-memory SST from the given tests data. It returns // the binary SST data as well as the start and end (exclusive) keys of the SST. -func MakeSST( - ctx context.Context, cs *cluster.Settings, t *testing.T, kvs []KV, -) ([]byte, roachpb.Key, roachpb.Key) { +func MakeSST(t *testing.T, st *cluster.Settings, kvs []KV) ([]byte, roachpb.Key, roachpb.Key) { t.Helper() sstFile := &storage.MemFile{} - writer := storage.MakeIngestionSSTWriter(ctx, cs, sstFile) + writer := storage.MakeIngestionSSTWriter(context.Background(), st, sstFile) defer writer.Close() start, end := keys.MaxKey, keys.MinKey From ef6a82bcc3083a3e336c4c6814003dbbafcdf52c Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Sat, 5 Feb 2022 13:34:55 +0000 Subject: [PATCH 3/3] kvserver: add merged `SSTTimestampToRequestTimestamp` param for `AddSSTable` This merges the `AddSSTable` parameters `WriteAtRequestTimestamp` and `SSTTimestamp` into a new `SSTTimestampToRequestTimestamp` parameter. This parameter specifies the MVCC timestamp of all existing timestamps in the SST, which will be rewritten to the request timestamp if they differ (e.g. if the request gets pushed). Both of the replaced parameters are new in 22.1, so this does not require a version gate and allows changing the type of one of the existing Protobuf fields. Release note: None --- pkg/clusterversion/cockroach_versions.go | 2 +- pkg/kv/batch.go | 18 +- pkg/kv/db.go | 4 +- pkg/kv/kvclient/rangefeed/config.go | 4 +- pkg/kv/kvserver/batcheval/cmd_add_sstable.go | 44 +- .../batcheval/cmd_add_sstable_test.go | 461 +++++++++--------- pkg/kv/kvserver/kvserverpb/proposer_kv.proto | 4 +- pkg/kv/kvserver/replica_rangefeed.go | 4 +- pkg/roachpb/api.go | 2 +- pkg/roachpb/api.proto | 44 +- pkg/roachpb/api_test.go | 3 +- pkg/storage/sst.go | 36 +- pkg/storage/sst_test.go | 24 +- 13 files changed, 334 insertions(+), 316 deletions(-) diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 77c95b0454f9..d79ab5197ac3 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -186,7 +186,7 @@ const ( // bundles when the query latency exceeds the user provided threshold. AlterSystemStmtDiagReqs // MVCCAddSSTable supports MVCC-compliant AddSSTable requests via the new - // WriteAtRequestTimestamp and DisallowConflicts parameters. + // SSTTimestampToRequestTimestamp and DisallowConflicts parameters. MVCCAddSSTable // InsertPublicSchemaNamespaceEntryOnRestore ensures all public schemas // have an entry in system.namespace upon being restored. diff --git a/pkg/kv/batch.go b/pkg/kv/batch.go index 320dc2483a23..3c248d9ab3a8 100644 --- a/pkg/kv/batch.go +++ b/pkg/kv/batch.go @@ -776,8 +776,7 @@ func (b *Batch) addSSTable( disallowShadowingBelow hlc.Timestamp, stats *enginepb.MVCCStats, ingestAsWrites bool, - writeAtRequestTimestamp bool, - sstTimestamp hlc.Timestamp, + sstTimestampToRequestTimestamp hlc.Timestamp, ) { begin, err := marshalKey(s) if err != nil { @@ -794,14 +793,13 @@ func (b *Batch) addSSTable( Key: begin, EndKey: end, }, - Data: data, - DisallowConflicts: disallowConflicts, - DisallowShadowing: disallowShadowing, - DisallowShadowingBelow: disallowShadowingBelow, - MVCCStats: stats, - IngestAsWrites: ingestAsWrites, - WriteAtRequestTimestamp: writeAtRequestTimestamp, - SSTTimestamp: sstTimestamp, + Data: data, + DisallowConflicts: disallowConflicts, + DisallowShadowing: disallowShadowing, + DisallowShadowingBelow: disallowShadowingBelow, + MVCCStats: stats, + IngestAsWrites: ingestAsWrites, + SSTTimestampToRequestTimestamp: sstTimestampToRequestTimestamp, } b.appendReqs(req) b.initResult(1, 0, notRaw, nil) diff --git a/pkg/kv/db.go b/pkg/kv/db.go index 88fd9b841329..05e834a1a220 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -688,7 +688,7 @@ func (db *DB) AddSSTable( ) error { b := &Batch{Header: roachpb.Header{Timestamp: batchTs}} b.addSSTable(begin, end, data, disallowConflicts, disallowShadowing, disallowShadowingBelow, - stats, ingestAsWrites, false /* writeAtBatchTS */, hlc.Timestamp{} /* sstTimestamp */) + stats, ingestAsWrites, hlc.Timestamp{} /* sstTimestampToRequestTimestamp */) return getOneErr(db.Run(ctx, b), b) } @@ -711,7 +711,7 @@ func (db *DB) AddSSTableAtBatchTimestamp( ) (hlc.Timestamp, error) { b := &Batch{Header: roachpb.Header{Timestamp: batchTs}} b.addSSTable(begin, end, data, disallowConflicts, disallowShadowing, disallowShadowingBelow, - stats, ingestAsWrites, true /* writeAtBatchTS */, batchTs) + stats, ingestAsWrites, batchTs) err := getOneErr(db.Run(ctx, b), b) if err != nil { return hlc.Timestamp{}, err diff --git a/pkg/kv/kvclient/rangefeed/config.go b/pkg/kv/kvclient/rangefeed/config.go index 77747b3ef3bf..56a0e235730a 100644 --- a/pkg/kv/kvclient/rangefeed/config.go +++ b/pkg/kv/kvclient/rangefeed/config.go @@ -163,8 +163,8 @@ func WithOnCheckpoint(f OnCheckpoint) Option { // requested by WithDiff, and callers must obtain these themselves if needed. // // Also note that AddSSTable requests that do not set the -// WriteAtRequestTimestamp flag, possibly writing below the closed timestamp, -// will cause affected rangefeeds to be disconnected with a terminal +// SSTTimestampToRequestTimestamp param, possibly writing below the closed +// timestamp, will cause affected rangefeeds to be disconnected with a terminal // MVCCHistoryMutationError and thus will not be emitted here -- there should be // no such requests into spans with rangefeeds across them, but it is up to // callers to ensure this. diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go index bb8faf9ac2b8..d123dc0b0bcc 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go @@ -38,10 +38,11 @@ func init() { RegisterReadWriteCommand(roachpb.AddSSTable, DefaultDeclareIsolatedKeys, EvalAddSSTable) } -var addSSTableRewriteConcurrency = settings.RegisterIntSetting( +// AddSSTableRewriteConcurrency sets the concurrency of a single SST rewrite. +var AddSSTableRewriteConcurrency = settings.RegisterIntSetting( settings.SystemOnly, "kv.bulk_io_write.sst_rewrite_concurrency.per_call", - "concurrency to use when rewriting a sstable timestamps by block, or 0 to use a loop", + "concurrency to use when rewriting sstable timestamps by block, or 0 to use a loop", int64(util.ConstantWithMetamorphicTestRange("addsst-rewrite-concurrency", 0, 0, 16)), settings.NonNegativeInt, ) @@ -56,6 +57,7 @@ func EvalAddSSTable( ms := cArgs.Stats start, end := storage.MVCCKey{Key: args.Key}, storage.MVCCKey{Key: args.EndKey} sst := args.Data + sstToReqTS := args.SSTTimestampToRequestTimestamp var span *tracing.Span var err error @@ -63,28 +65,24 @@ func EvalAddSSTable( defer span.Finish() log.Eventf(ctx, "evaluating AddSSTable [%s,%s)", start.Key, end.Key) - // Under the race detector, scan the SST contents and make sure it satisfies - // the AddSSTable requirements. We do not always perform these checks - // otherwise, due to the cost. + // Under the race detector, check that the SST contents satisfy AddSSTable + // requirements. We don't always do this otherwise, due to the cost. if util.RaceEnabled { - if err := assertSSTContents(sst, args.SSTTimestamp, args.MVCCStats); err != nil { + if err := assertSSTContents(sst, sstToReqTS, args.MVCCStats); err != nil { return result.Result{}, err } } - // If requested, rewrite the SST's MVCC timestamps to the request timestamp. - // This ensures the writes comply with the timestamp cache and closed - // timestamp, i.e. by not writing to timestamps that have already been - // observed or closed. - if args.WriteAtRequestTimestamp && - (args.SSTTimestamp.IsEmpty() || h.Timestamp != args.SSTTimestamp || - util.ConstantWithMetamorphicTestBool("addsst-rewrite-forced", false)) { + // If requested and necessary, rewrite the SST's MVCC timestamps to the + // request timestamp. This ensures the writes comply with the timestamp cache + // and closed timestamp, i.e. by not writing to timestamps that have already + // been observed or closed. + if sstToReqTS.IsSet() && (h.Timestamp != sstToReqTS || + util.ConstantWithMetamorphicTestBool("addsst-rewrite-forced", false)) { st := cArgs.EvalCtx.ClusterSettings() // TODO(dt): use a quotapool. - concurrency := int(addSSTableRewriteConcurrency.Get(&st.SV)) - sst, err = storage.UpdateSSTTimestamps( - ctx, st, sst, args.SSTTimestamp, h.Timestamp, concurrency, - ) + conc := int(AddSSTableRewriteConcurrency.Get(&cArgs.EvalCtx.ClusterSettings().SV)) + sst, err = storage.UpdateSSTTimestamps(ctx, st, sst, sstToReqTS, h.Timestamp, conc) if err != nil { return result.Result{}, errors.Wrap(err, "updating SST timestamps") } @@ -226,7 +224,7 @@ func EvalAddSSTable( ms.Add(stats) var mvccHistoryMutation *kvserverpb.ReplicatedEvalResult_MVCCHistoryMutation - if !args.WriteAtRequestTimestamp { + if sstToReqTS.IsEmpty() { mvccHistoryMutation = &kvserverpb.ReplicatedEvalResult_MVCCHistoryMutation{ Spans: []roachpb.Span{{Key: start.Key, EndKey: end.Key}}, } @@ -261,7 +259,7 @@ func EvalAddSSTable( // write by not allowing writing below existing keys, and we want to // retain parity with regular SST ingestion which does allow this. We // therefore record these operations ourselves. - if args.WriteAtRequestTimestamp { + if sstToReqTS.IsSet() { readWriter.LogLogicalOp(storage.MVCCWriteValueOpType, storage.MVCCLogicalOpDetails{ Key: k.Key, Timestamp: k.Timestamp, @@ -287,7 +285,7 @@ func EvalAddSSTable( Data: sst, CRC32: util.CRC32(sst), Span: roachpb.Span{Key: start.Key, EndKey: end.Key}, - AtWriteTimestamp: args.WriteAtRequestTimestamp, + AtWriteTimestamp: sstToReqTS.IsSet(), }, MVCCHistoryMutation: mvccHistoryMutation, }, @@ -325,9 +323,9 @@ func assertSSTContents(sst []byte, sstTimestamp hlc.Timestamp, stats *enginepb.M if len(value) == 0 { return errors.AssertionFailedf("SST contains tombstone for key %s", key) } - if !sstTimestamp.IsEmpty() && key.Timestamp != sstTimestamp { - return errors.AssertionFailedf("SST has incorrect timestamp %s for SST key %s (expected %s)", - key.Timestamp, key.Key, sstTimestamp) + if sstTimestamp.IsSet() && key.Timestamp != sstTimestamp { + return errors.AssertionFailedf("SST has unexpected timestamp %s (expected %s) for key %s", + key.Timestamp, sstTimestamp, key.Key) } iter.Next() } diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go index ce05f70b1f63..7633119980ea 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go @@ -48,21 +48,20 @@ func TestEvalAddSSTable(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - const defaultReqTS = 10 // request sent with this timestamp by default - const intentTS = 100 // values with this timestamp are written as intents + const intentTS = 100 // values with this timestamp are written as intents - // These are run with IngestAsWrites both disabled and enabled. - // nolint:composites + // These are run with IngestAsWrites both disabled and enabled, and + // kv.bulk_io_write.sst_rewrite_concurrency.per_call of 0 and 8. testcases := map[string]struct { data []sstutil.KV sst []sstutil.KV - sstTimestamp int64 // SSTTimestamp set to given timestamp - atReqTS int64 // WriteAtRequestTimestamp with given timestamp + reqTS int64 + toReqTS int64 // SSTTimestampToRequestTimestamp with given SST timestamp noConflict bool // DisallowConflicts noShadow bool // DisallowShadowing noShadowBelow int64 // DisallowShadowingBelow expect []sstutil.KV - expectErr interface{} // error type, substring, or true (any error) + expectErr interface{} // error type, substring, substring slice, or true (any) expectErrRace interface{} expectStatsEst bool // expect MVCCStats.ContainsEstimates, don't check stats }{ @@ -89,13 +88,19 @@ func TestEvalAddSSTable(t *testing.T) { sst: []sstutil.KV{{"a", 1, "sst"}, {"c", 1, "sst"}}, expectErr: &roachpb.WriteIntentError{}, }, - "blind writes tombstones": { // unfortunately, for performance + "blind ignores intent outside span": { + data: []sstutil.KV{{"b", intentTS, "b0"}}, + sst: []sstutil.KV{{"c", 1, "sst"}, {"d", 1, "sst"}}, + expect: []sstutil.KV{{"b", intentTS, "b0"}, {"c", 1, "sst"}, {"d", 1, "sst"}}, + expectStatsEst: true, + }, + "blind writes tombstones unless race": { // unfortunately, for performance sst: []sstutil.KV{{"a", 1, ""}}, expect: []sstutil.KV{{"a", 1, ""}}, expectStatsEst: true, expectErrRace: `SST contains tombstone for key "a"/0.000000001,0`, }, - "blind writes SST inline values": { // unfortunately, for performance + "blind writes SST inline values unless race": { // unfortunately, for performance sst: []sstutil.KV{{"a", 0, "inline"}}, expect: []sstutil.KV{{"a", 0, "inline"}}, expectStatsEst: true, @@ -108,96 +113,123 @@ func TestEvalAddSSTable(t *testing.T) { expectStatsEst: true, }, - // WriteAtRequestTimestamp - "WriteAtRequestTimestamp sets timestamp": { - atReqTS: 10, - sst: []sstutil.KV{{"a", 1, "a1"}, {"b", 3, "b3"}}, - expect: []sstutil.KV{{"a", 10, "a1"}, {"b", 10, "b3"}}, + // SSTTimestampToRequestTimestamp + "SSTTimestampToRequestTimestamp rewrites timestamp": { + reqTS: 10, + toReqTS: 1, + sst: []sstutil.KV{{"a", 1, "a1"}, {"b", 1, "b1"}}, + expect: []sstutil.KV{{"a", 10, "a1"}, {"b", 10, "b1"}}, expectStatsEst: true, }, - "WriteAtRequestTimestamp rejects tombstones": { - atReqTS: 10, - sst: []sstutil.KV{{"a", 1, ""}}, - expectErr: "SST values cannot be tombstones", - expectErrRace: `SST contains tombstone for key "a"/0.000000001,0`, + "SSTTimestampToRequestTimestamp writes tombstones unless race": { // unfortunately, for performance + reqTS: 10, + toReqTS: 1, + sst: []sstutil.KV{{"a", 1, ""}}, + expect: []sstutil.KV{{"a", 10, ""}}, + expectErrRace: `SST contains tombstone for key "a"/0.000000001,0`, + expectStatsEst: true, }, - "WriteAtRequestTimestamp rejects inline values": { - atReqTS: 10, - sst: []sstutil.KV{{"a", 0, "inline"}}, - expectErr: "inline values or intents are not supported", - expectErrRace: `SST contains inline value or intent for key "a"/0,0`, + "SSTTimestampToRequestTimestamp rejects incorrect SST timestamp": { + reqTS: 10, + toReqTS: 1, + sst: []sstutil.KV{{"a", 1, "a1"}, {"b", 1, "b1"}, {"c", 2, "c2"}}, + expectErr: []string{ + `unexpected timestamp 0.000000002,0 (expected 0.000000001,0) for key "c"`, + `key has suffix "\x00\x00\x00\x00\x00\x00\x00\x02\t", expected "\x00\x00\x00\x00\x00\x00\x00\x01\t"`, + }, }, - "WriteAtRequestTimestamp writes below and replaces": { - atReqTS: 5, + "SSTTimestampToRequestTimestamp rejects incorrect 0 SST timestamp": { + reqTS: 10, + toReqTS: 1, + sst: []sstutil.KV{{"a", 1, "a1"}, {"b", 1, "b1"}, {"c", 0, "c0"}}, + expectErr: []string{ + `unexpected timestamp 0,0 (expected 0.000000001,0) for key "c"`, + `key has suffix "", expected "\x00\x00\x00\x00\x00\x00\x00\x01\t"`, + }, + expectErrRace: `SST contains inline value or intent for key "c"/0,0`, + }, + "SSTTimestampToRequestTimestamp writes below and replaces": { + reqTS: 5, + toReqTS: 1, data: []sstutil.KV{{"a", 5, "a5"}, {"b", 7, "b7"}}, sst: []sstutil.KV{{"a", 1, "sst"}, {"b", 1, "sst"}}, expect: []sstutil.KV{{"a", 5, "sst"}, {"b", 7, "b7"}, {"b", 5, "sst"}}, expectStatsEst: true, }, - "WriteAtRequestTimestamp returns WriteIntentError for intents": { - atReqTS: 10, + "SSTTimestampToRequestTimestamp returns WriteIntentError for intents": { + reqTS: 10, + toReqTS: 1, data: []sstutil.KV{{"a", intentTS, "intent"}}, sst: []sstutil.KV{{"a", 1, "a@1"}}, expectErr: &roachpb.WriteIntentError{}, }, - "WriteAtRequestTimestamp errors with DisallowConflicts below existing": { - atReqTS: 5, + "SSTTimestampToRequestTimestamp errors with DisallowConflicts below existing": { + reqTS: 5, + toReqTS: 10, noConflict: true, data: []sstutil.KV{{"a", 5, "a5"}, {"b", 7, "b7"}}, sst: []sstutil.KV{{"a", 10, "sst"}, {"b", 10, "sst"}}, expectErr: &roachpb.WriteTooOldError{}, }, - "WriteAtRequestTimestamp succeeds with DisallowConflicts above existing": { - atReqTS: 8, + "SSTTimestampToRequestTimestamp succeeds with DisallowConflicts above existing": { + reqTS: 8, + toReqTS: 1, noConflict: true, data: []sstutil.KV{{"a", 5, "a5"}, {"b", 7, "b7"}}, sst: []sstutil.KV{{"a", 1, "sst"}, {"b", 1, "sst"}}, expect: []sstutil.KV{{"a", 8, "sst"}, {"a", 5, "a5"}, {"b", 8, "sst"}, {"b", 7, "b7"}}, }, - "WriteAtRequestTimestamp errors with DisallowShadowing below existing": { - atReqTS: 5, + "SSTTimestampToRequestTimestamp errors with DisallowShadowing below existing": { + reqTS: 5, + toReqTS: 10, noShadow: true, data: []sstutil.KV{{"a", 5, "a5"}, {"b", 7, "b7"}}, sst: []sstutil.KV{{"a", 10, "sst"}, {"b", 10, "sst"}}, expectErr: `ingested key collides with an existing one: "a"`, }, - "WriteAtRequestTimestamp errors with DisallowShadowing above existing": { - atReqTS: 8, + "SSTTimestampToRequestTimestamp errors with DisallowShadowing above existing": { + reqTS: 8, + toReqTS: 1, noShadow: true, data: []sstutil.KV{{"a", 5, "a5"}, {"b", 7, "b7"}}, sst: []sstutil.KV{{"a", 1, "sst"}, {"b", 1, "sst"}}, expectErr: `ingested key collides with an existing one: "a"`, }, - "WriteAtRequestTimestamp succeeds with DisallowShadowing above tombstones": { - atReqTS: 8, + "SSTTimestampToRequestTimestamp succeeds with DisallowShadowing above tombstones": { + reqTS: 8, + toReqTS: 1, noShadow: true, data: []sstutil.KV{{"a", 5, ""}, {"b", 7, ""}}, sst: []sstutil.KV{{"a", 1, "sst"}, {"b", 1, "sst"}}, expect: []sstutil.KV{{"a", 8, "sst"}, {"a", 5, ""}, {"b", 8, "sst"}, {"b", 7, ""}}, }, - "WriteAtRequestTimestamp succeeds with DisallowShadowing and idempotent writes": { - atReqTS: 5, + "SSTTimestampToRequestTimestamp succeeds with DisallowShadowing and idempotent writes": { + reqTS: 5, + toReqTS: 1, noShadow: true, data: []sstutil.KV{{"a", 5, "a5"}, {"b", 5, "b5"}}, sst: []sstutil.KV{{"a", 1, "a5"}, {"b", 1, "b5"}}, expect: []sstutil.KV{{"a", 5, "a5"}, {"b", 5, "b5"}}, }, - "WriteAtRequestTimestamp errors with DisallowShadowingBelow equal value above existing below limit": { - atReqTS: 7, + "SSTTimestampToRequestTimestamp errors with DisallowShadowingBelow equal value above existing below limit": { + reqTS: 7, + toReqTS: 10, noShadowBelow: 5, data: []sstutil.KV{{"a", 3, "a3"}}, sst: []sstutil.KV{{"a", 10, "a3"}}, expectErr: `ingested key collides with an existing one: "a"`, }, - "WriteAtRequestTimestamp errors with DisallowShadowingBelow errors above existing above limit": { - atReqTS: 7, + "SSTTimestampToRequestTimestamp errors with DisallowShadowingBelow errors above existing above limit": { + reqTS: 7, + toReqTS: 10, noShadowBelow: 5, data: []sstutil.KV{{"a", 6, "a6"}}, sst: []sstutil.KV{{"a", 10, "sst"}}, expectErr: `ingested key collides with an existing one: "a"`, }, - "WriteAtRequestTimestamp allows DisallowShadowingBelow equal value above existing above limit": { - atReqTS: 7, + "SSTTimestampToRequestTimestamp allows DisallowShadowingBelow equal value above existing above limit": { + reqTS: 7, + toReqTS: 10, noShadowBelow: 5, data: []sstutil.KV{{"a", 6, "a6"}}, sst: []sstutil.KV{{"a", 10, "a6"}}, @@ -612,158 +644,148 @@ func TestEvalAddSSTable(t *testing.T) { sst: []sstutil.KV{{"a", 7, "a8"}}, expectErr: &roachpb.WriteTooOldError{}, }, - - // SSTTimestamp - "SSTTimestamp works with WriteAtRequestTimestamp": { - atReqTS: 7, - data: []sstutil.KV{{"a", 6, "a6"}}, - sst: []sstutil.KV{{"a", 7, "a7"}}, - sstTimestamp: 7, - expect: []sstutil.KV{{"a", 7, "a7"}, {"a", 6, "a6"}}, - expectStatsEst: true, - }, - /* Disabled due to nondeterminism under metamorphic tests. SSTTimestamp will - * shortly be removed anyway. - "SSTTimestamp doesn't rewrite with incorrect timestamp, but errors under race": { - atReqTS: 8, - data: []sstutil.KV{{"a", 6, "a6"}}, - sst: []sstutil.KV{{"a", 7, "a7"}}, - sstTimestamp: 8, - expect: []sstutil.KV{{"a", 7, "a7"}, {"a", 6, "a6"}}, - expectErrRace: `incorrect timestamp 0.000000007,0 for SST key "a" (expected 0.000000008,0)`, - expectStatsEst: true, - },*/ } testutils.RunTrueAndFalse(t, "IngestAsWrites", func(t *testing.T, ingestAsWrites bool) { - for name, tc := range testcases { - t.Run(name, func(t *testing.T) { - st := cluster.MakeTestingClusterSettings() - ctx := context.Background() - - dir := t.TempDir() - engine, err := storage.Open(ctx, storage.Filesystem(filepath.Join(dir, "db")), storage.Settings(st)) - require.NoError(t, err) - defer engine.Close() - - // Write initial data. - intentTxn := roachpb.MakeTransaction("intentTxn", nil, 0, hlc.Timestamp{WallTime: intentTS}, 0, 1) - b := engine.NewBatch() - for i := len(tc.data) - 1; i >= 0; i-- { // reverse, older timestamps first - kv := tc.data[i] - var txn *roachpb.Transaction - if kv.WallTimestamp == intentTS { - txn = &intentTxn + testutils.RunValues(t, "RewriteConcurrency", []interface{}{0, 8}, func(t *testing.T, c interface{}) { + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + batcheval.AddSSTableRewriteConcurrency.Override(ctx, &st.SV, int64(c.(int))) + + dir := t.TempDir() + engine, err := storage.Open(ctx, storage.Filesystem(filepath.Join(dir, "db")), storage.Settings(st)) + require.NoError(t, err) + defer engine.Close() + + // Write initial data. + intentTxn := roachpb.MakeTransaction("intentTxn", nil, 0, hlc.Timestamp{WallTime: intentTS}, 0, 1) + b := engine.NewBatch() + for i := len(tc.data) - 1; i >= 0; i-- { // reverse, older timestamps first + kv := tc.data[i] + var txn *roachpb.Transaction + if kv.WallTimestamp == intentTS { + txn = &intentTxn + } + require.NoError(t, storage.MVCCPut(ctx, b, nil, kv.Key(), kv.Timestamp(), kv.Value(), txn)) } - require.NoError(t, storage.MVCCPut(ctx, b, nil, kv.Key(), kv.Timestamp(), kv.Value(), txn)) - } - require.NoError(t, b.Commit(false)) - stats := engineStats(t, engine) - - // Build and add SST. - sst, start, end := sstutil.MakeSST(t, st, tc.sst) - reqTS := hlc.Timestamp{WallTime: defaultReqTS} - if tc.atReqTS != 0 { - reqTS.WallTime = tc.atReqTS - } - resp := &roachpb.AddSSTableResponse{} - result, err := batcheval.EvalAddSSTable(ctx, engine, batcheval.CommandArgs{ - EvalCtx: (&batcheval.MockEvalCtx{ClusterSettings: st}).EvalContext(), - Stats: stats, - Header: roachpb.Header{ - Timestamp: reqTS, - }, - Args: &roachpb.AddSSTableRequest{ - RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, - Data: sst, - MVCCStats: sstutil.ComputeStats(t, sst), - DisallowConflicts: tc.noConflict, - DisallowShadowing: tc.noShadow, - DisallowShadowingBelow: hlc.Timestamp{WallTime: tc.noShadowBelow}, - WriteAtRequestTimestamp: tc.atReqTS != 0, - SSTTimestamp: hlc.Timestamp{WallTime: tc.sstTimestamp}, - IngestAsWrites: ingestAsWrites, - }, - }, resp) - - expectErr := tc.expectErr - if tc.expectErrRace != nil && util.RaceEnabled { - expectErr = tc.expectErrRace - } - if expectErr != nil { - require.Error(t, err) - if b, ok := expectErr.(bool); ok && b { - // any error is fine - } else if expectMsg, ok := expectErr.(string); ok { - require.Contains(t, err.Error(), expectMsg) - } else if e, ok := expectErr.(error); ok { - require.True(t, errors.HasType(err, e), "expected %T, got %v", e, err) - } else { - require.Fail(t, "invalid expectErr", "expectErr=%v", expectErr) + require.NoError(t, b.Commit(false)) + stats := engineStats(t, engine, 0) + + // Build and add SST. + if tc.toReqTS != 0 && tc.reqTS == 0 && tc.expectErr == nil { + t.Fatal("can't set toReqTS without reqTS") + } + sst, start, end := sstutil.MakeSST(t, st, tc.sst) + resp := &roachpb.AddSSTableResponse{} + result, err := batcheval.EvalAddSSTable(ctx, engine, batcheval.CommandArgs{ + EvalCtx: (&batcheval.MockEvalCtx{ClusterSettings: st}).EvalContext(), + Stats: stats, + Header: roachpb.Header{ + Timestamp: hlc.Timestamp{WallTime: tc.reqTS}, + }, + Args: &roachpb.AddSSTableRequest{ + RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, + Data: sst, + MVCCStats: sstutil.ComputeStats(t, sst), + DisallowConflicts: tc.noConflict, + DisallowShadowing: tc.noShadow, + DisallowShadowingBelow: hlc.Timestamp{WallTime: tc.noShadowBelow}, + SSTTimestampToRequestTimestamp: hlc.Timestamp{WallTime: tc.toReqTS}, + IngestAsWrites: ingestAsWrites, + }, + }, resp) + + expectErr := tc.expectErr + if tc.expectErrRace != nil && util.RaceEnabled { + expectErr = tc.expectErrRace + } + if expectErr != nil { + require.Error(t, err) + if b, ok := expectErr.(bool); ok && b { + // any error is fine + } else if expectMsg, ok := expectErr.(string); ok { + require.Contains(t, err.Error(), expectMsg) + } else if expectMsgs, ok := expectErr.([]string); ok { + var found bool + for _, msg := range expectMsgs { + if strings.Contains(err.Error(), msg) { + found = true + break + } + } + if !found { + t.Fatalf("%q does not contain any of %q", err, expectMsgs) + } + } else if e, ok := expectErr.(error); ok { + require.True(t, errors.HasType(err, e), "expected %T, got %v", e, err) + } else { + require.Fail(t, "invalid expectErr", "expectErr=%v", expectErr) + } + return } - return - } - require.NoError(t, err) - - if ingestAsWrites { - require.Nil(t, result.Replicated.AddSSTable) - } else { - require.NotNil(t, result.Replicated.AddSSTable) - sstPath := filepath.Join(dir, "sst") - require.NoError(t, engine.WriteFile(sstPath, result.Replicated.AddSSTable.Data)) - require.NoError(t, engine.IngestExternalFiles(ctx, []string{sstPath})) - } - - // Scan resulting data from engine. - iter := storage.NewMVCCIncrementalIterator(engine, storage.MVCCIncrementalIterOptions{ - EndKey: keys.MaxKey, - StartTime: hlc.MinTimestamp, - EndTime: hlc.MaxTimestamp, - IntentPolicy: storage.MVCCIncrementalIterIntentPolicyEmit, - InlinePolicy: storage.MVCCIncrementalIterInlinePolicyEmit, - }) - defer iter.Close() - iter.SeekGE(storage.MVCCKey{Key: keys.SystemPrefix}) - scan := []sstutil.KV{} - for { - ok, err := iter.Valid() require.NoError(t, err) - if !ok { - break + + if ingestAsWrites { + require.Nil(t, result.Replicated.AddSSTable) + } else { + require.NotNil(t, result.Replicated.AddSSTable) + sstPath := filepath.Join(dir, "sst") + require.NoError(t, engine.WriteFile(sstPath, result.Replicated.AddSSTable.Data)) + require.NoError(t, engine.IngestExternalFiles(ctx, []string{sstPath})) } - key := string(iter.Key().Key) - ts := iter.Key().Timestamp.WallTime - var value []byte - if iter.Key().IsValue() { - if len(iter.Value()) > 0 { - value, err = roachpb.Value{RawBytes: iter.Value()}.GetBytes() + + // Scan resulting data from engine. + iter := storage.NewMVCCIncrementalIterator(engine, storage.MVCCIncrementalIterOptions{ + EndKey: keys.MaxKey, + StartTime: hlc.MinTimestamp, + EndTime: hlc.MaxTimestamp, + IntentPolicy: storage.MVCCIncrementalIterIntentPolicyEmit, + InlinePolicy: storage.MVCCIncrementalIterInlinePolicyEmit, + }) + defer iter.Close() + iter.SeekGE(storage.MVCCKey{Key: keys.SystemPrefix}) + scan := []sstutil.KV{} + for { + ok, err := iter.Valid() + require.NoError(t, err) + if !ok { + break + } + key := string(iter.Key().Key) + ts := iter.Key().Timestamp.WallTime + var value []byte + if iter.Key().IsValue() { + if len(iter.Value()) > 0 { + value, err = roachpb.Value{RawBytes: iter.Value()}.GetBytes() + require.NoError(t, err) + } + } else { + var meta enginepb.MVCCMetadata + require.NoError(t, protoutil.Unmarshal(iter.UnsafeValue(), &meta)) + if meta.RawBytes == nil { + // Skip intent metadata records (value emitted separately). + iter.Next() + continue + } + value, err = roachpb.Value{RawBytes: meta.RawBytes}.GetBytes() require.NoError(t, err) } + scan = append(scan, sstutil.KV{key, ts, string(value)}) + iter.Next() + } + require.Equal(t, tc.expect, scan) + + // Check that stats were updated correctly. + if tc.expectStatsEst { + require.True(t, stats.ContainsEstimates > 0, "expected stats to be estimated") } else { - var meta enginepb.MVCCMetadata - require.NoError(t, protoutil.Unmarshal(iter.UnsafeValue(), &meta)) - if meta.RawBytes == nil { - // Skip intent metadata records (value emitted separately). - iter.Next() - continue - } - value, err = roachpb.Value{RawBytes: meta.RawBytes}.GetBytes() - require.NoError(t, err) + require.False(t, stats.ContainsEstimates > 0, "found estimated stats") + require.Equal(t, stats, engineStats(t, engine, stats.LastUpdateNanos)) } - scan = append(scan, sstutil.KV{key, ts, string(value)}) - iter.Next() - } - require.Equal(t, tc.expect, scan) - - // Check that stats were updated correctly. - if tc.expectStatsEst { - require.True(t, stats.ContainsEstimates > 0, "expected stats to be estimated") - } else { - require.False(t, stats.ContainsEstimates > 0, "found estimated stats") - stats.LastUpdateNanos = 0 // avoid spurious diffs - require.Equal(t, stats, engineStats(t, engine)) - } - }) - } + }) + } + }) }) } @@ -777,8 +799,8 @@ func TestEvalAddSSTableRangefeed(t *testing.T) { testcases := map[string]struct { sst []sstutil.KV - atReqTS bool // WriteAtRequestTimestamp - asWrites bool // IngestAsWrites + toReqTS int64 // SSTTimestampToRequestTimestamp + asWrites bool // IngestAsWrites expectHistoryMutation bool expectLogicalOps []enginepb.MVCCLogicalOp }{ @@ -787,9 +809,9 @@ func TestEvalAddSSTableRangefeed(t *testing.T) { expectHistoryMutation: true, expectLogicalOps: nil, }, - "WriteAtRequestTimestamp alone": { + "SSTTimestampToRequestTimestamp alone": { sst: []sstutil.KV{{"a", 1, "a1"}}, - atReqTS: true, + toReqTS: 1, expectHistoryMutation: false, expectLogicalOps: nil, }, @@ -799,10 +821,10 @@ func TestEvalAddSSTableRangefeed(t *testing.T) { expectHistoryMutation: true, expectLogicalOps: nil, }, - "IngestAsWrites and WriteAtRequestTimestamp": { - sst: []sstutil.KV{{"a", 1, "a1"}, {"b", 2, "b2"}}, + "IngestAsWrites and SSTTimestampToRequestTimestamp": { + sst: []sstutil.KV{{"a", 1, "a1"}, {"b", 1, "b1"}}, asWrites: true, - atReqTS: true, + toReqTS: 1, expectHistoryMutation: false, expectLogicalOps: []enginepb.MVCCLogicalOp{ // NOTE: Value is populated by the rangefeed processor, not MVCC, so it @@ -831,11 +853,11 @@ func TestEvalAddSSTableRangefeed(t *testing.T) { }, Stats: &enginepb.MVCCStats{}, Args: &roachpb.AddSSTableRequest{ - RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, - Data: sst, - MVCCStats: sstutil.ComputeStats(t, sst), - WriteAtRequestTimestamp: tc.atReqTS, - IngestAsWrites: tc.asWrites, + RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, + Data: sst, + MVCCStats: sstutil.ComputeStats(t, sst), + SSTTimestampToRequestTimestamp: hlc.Timestamp{WallTime: tc.toReqTS}, + IngestAsWrites: tc.asWrites, }, }, &roachpb.AddSSTableResponse{}) require.NoError(t, err) @@ -845,7 +867,7 @@ func TestEvalAddSSTableRangefeed(t *testing.T) { } else { require.NotNil(t, result.Replicated.AddSSTable) require.Equal(t, roachpb.Span{Key: start, EndKey: end}, result.Replicated.AddSSTable.Span) - require.Equal(t, tc.atReqTS, result.Replicated.AddSSTable.AtWriteTimestamp) + require.Equal(t, tc.toReqTS != 0, result.Replicated.AddSSTable.AtWriteTimestamp) } if tc.expectHistoryMutation { require.Equal(t, &kvserverpb.ReplicatedEvalResult_MVCCHistoryMutation{ @@ -1097,7 +1119,7 @@ func TestAddSSTableMVCCStats(t *testing.T) { // After EvalAddSSTable, cArgs.Stats contains a diff to the existing // stats. Make sure recomputing from scratch gets the same answer as // applying the diff to the stats - statsBefore := engineStats(t, engine) + statsBefore := engineStats(t, engine, 0) ts := hlc.Timestamp{WallTime: 7} cArgs := batcheval.CommandArgs{ EvalCtx: evalCtx, @@ -1121,8 +1143,7 @@ func TestAddSSTableMVCCStats(t *testing.T) { statsEvaled.Add(*cArgs.Stats) statsEvaled.Add(statsDelta) statsEvaled.ContainsEstimates = 0 - statsEvaled.LastUpdateNanos = 0 - require.Equal(t, engineStats(t, engine), statsEvaled) + require.Equal(t, engineStats(t, engine, statsEvaled.LastUpdateNanos), statsEvaled) // Check stats for a single KV. sst, start, end = sstutil.MakeSST(t, st, []sstutil.KV{{"zzzzzzz", ts.WallTime, "zzz"}}) @@ -1311,9 +1332,9 @@ func TestAddSSTableIntentResolution(t *testing.T) { require.Contains(t, err.Error(), "TransactionRetryWithProtoRefreshError: TransactionAbortedError") } -// TestAddSSTableWriteAtRequestTimestampRespectsTSCache checks that AddSSTable -// with WriteAtRequestTimestamp respects the timestamp cache. -func TestAddSSTableWriteAtRequestTimestampRespectsTSCache(t *testing.T) { +// TestAddSSTableSSTTimestampToRequestTimestampRespectsTSCache checks that AddSSTable +// with SSTTimestampToRequestTimestamp respects the timestamp cache. +func TestAddSSTableSSTTimestampToRequestTimestampRespectsTSCache(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1332,10 +1353,10 @@ func TestAddSSTableWriteAtRequestTimestampRespectsTSCache(t *testing.T) { // Add an SST writing below the previous write. sst, start, end := sstutil.MakeSST(t, s.ClusterSettings(), []sstutil.KV{{"key", 1, "sst"}}) sstReq := &roachpb.AddSSTableRequest{ - RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, - Data: sst, - MVCCStats: sstutil.ComputeStats(t, sst), - WriteAtRequestTimestamp: true, + RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, + Data: sst, + MVCCStats: sstutil.ComputeStats(t, sst), + SSTTimestampToRequestTimestamp: hlc.Timestamp{WallTime: 1}, } ba := roachpb.BatchRequest{ Header: roachpb.Header{Timestamp: txnTS.Prev()}, @@ -1364,9 +1385,9 @@ func TestAddSSTableWriteAtRequestTimestampRespectsTSCache(t *testing.T) { require.Equal(t, "sst", string(kv.ValueBytes())) } -// TestAddSSTableWriteAtRequestTimestampRespectsClosedTS checks that AddSSTable -// with WriteAtRequestTimestamp respects the closed timestamp. -func TestAddSSTableWriteAtRequestTimestampRespectsClosedTS(t *testing.T) { +// TestAddSSTableSSTTimestampToRequestTimestampRespectsClosedTS checks that AddSSTable +// with SSTTimestampToRequestTimestamp respects the closed timestamp. +func TestAddSSTableSSTTimestampToRequestTimestampRespectsClosedTS(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1392,10 +1413,10 @@ func TestAddSSTableWriteAtRequestTimestampRespectsClosedTS(t *testing.T) { reqTS := closedTS.Prev() sst, start, end := sstutil.MakeSST(t, store.ClusterSettings(), []sstutil.KV{{"key", 1, "sst"}}) sstReq := &roachpb.AddSSTableRequest{ - RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, - Data: sst, - MVCCStats: sstutil.ComputeStats(t, sst), - WriteAtRequestTimestamp: true, + RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, + Data: sst, + MVCCStats: sstutil.ComputeStats(t, sst), + SSTTimestampToRequestTimestamp: hlc.Timestamp{WallTime: 1}, } ba := roachpb.BatchRequest{ Header: roachpb.Header{Timestamp: reqTS}, @@ -1418,7 +1439,7 @@ func TestAddSSTableWriteAtRequestTimestampRespectsClosedTS(t *testing.T) { } // engineStats computes the MVCC stats for the given engine. -func engineStats(t *testing.T, engine storage.Engine) *enginepb.MVCCStats { +func engineStats(t *testing.T, engine storage.Engine, nowNanos int64) *enginepb.MVCCStats { t.Helper() iter := engine.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ @@ -1428,7 +1449,7 @@ func engineStats(t *testing.T, engine storage.Engine) *enginepb.MVCCStats { defer iter.Close() // We don't care about nowNanos, because the SST can't contain intents or // tombstones and all existing intents will be resolved. - stats, err := storage.ComputeStatsForRange(iter, keys.LocalMax, keys.MaxKey, 0) + stats, err := storage.ComputeStatsForRange(iter, keys.LocalMax, keys.MaxKey, nowNanos) require.NoError(t, err) return &stats } diff --git a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto index 241a00982512..2e19c02270b9 100644 --- a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto +++ b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto @@ -166,8 +166,8 @@ message ReplicatedEvalResult { uint32 crc32 = 2 [(gogoproto.customname) = "CRC32"]; roachpb.Span span = 3 [(gogoproto.nullable) = false]; // If true, all SST MVCC timestamps equal the WriteTimestamp. This is given - // by the WriteAtRequestTimestamp request parameter, which was introduced in - // 22.1 and only used with the MVCCAddSSTable cluster version. + // by the SSTTimestampToRequestTimestamp request parameter, which was + // introduced in 22.1 and only used with the MVCCAddSSTable cluster version. // // TODO(erikgrinaker): This field currently controls whether to emit an // AddSSTable event across the rangefeed. We could equivalently check diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index e84bdc4020f3..25f18ed294c3 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -638,8 +638,8 @@ func (r *Replica) handleLogicalOpLogRaftMuLocked( // handleSSTableRaftMuLocked emits an ingested SSTable from AddSSTable via the // rangefeed. These can be expected to have timestamps at the write timestamp -// (i.e. submitted with WriteAtRequestTimestamp) since we assert elsewhere that -// MVCCHistoryMutation commands disconnect rangefeeds. +// (i.e. submitted with SSTTimestampToRequestTimestamp) since we assert +// elsewhere that MVCCHistoryMutation commands disconnect rangefeeds. // // NB: We currently don't have memory budgeting for rangefeeds, instead using a // large buffered channel, so this can easily OOM the node. This is "fine" for diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index f4d0b4c8aa47..67d6c649e334 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -1335,7 +1335,7 @@ func (*AdminScatterRequest) flags() flag { return isAdmin | isR func (*AdminVerifyProtectedTimestampRequest) flags() flag { return isAdmin | isRange | isAlone } func (r *AddSSTableRequest) flags() flag { flags := isWrite | isRange | isAlone | isUnsplittable | canBackpressure - if r.WriteAtRequestTimestamp { + if r.SSTTimestampToRequestTimestamp.IsSet() { flags |= appliesTSCache } return flags diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index 2ce1436c216e..378d9c33b0c6 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -1600,9 +1600,8 @@ message AdminVerifyProtectedTimestampResponse { // AddSSTableRequest contains arguments to the AddSSTable method, which links an // SST file into the Pebble log-structured merge-tree. The SST must only contain // committed versioned values with non-zero MVCC timestamps (no intents or -// inline values) and no tombstones, but this is only fully enforced when -// WriteAtRequestTimestamp is enabled, for performance. It cannot be used in a -// transaction, cannot be split across ranges, and must be alone in a batch. +// inline values) and no tombstones. It cannot be used in a transaction, cannot +// be split across ranges, and must be alone in a batch. // // By default, AddSSTable will blindly write the SST contents into Pebble, with // fixed MVCC timestamps unaffected by pushes. This can violate many CRDB @@ -1615,8 +1614,8 @@ message AdminVerifyProtectedTimestampResponse { // The following parameters can be used to make AddSSTable enforce these // guarantees, at a performance cost: // -// * WriteAtRequestTimestamp: ensures compliance with the timestamp cache and -// closed timestamp, by rewriting SST timestamps to the request timestamp. +// * SSTTimestampToRequestTimestamp: ensures compliance with the timestamp cache +// and closed timestamp, by rewriting SST timestamps to the request timestamp. // Also emits the SST via the range feed. // // * DisallowConflicts, DisallowShadowing, or DisallowShadowingBelow: ensures @@ -1643,11 +1642,13 @@ message AddSSTableRequest { RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; bytes data = 2; - // WriteAtRequestTimestamp updates all MVCC timestamps in the SST to the - // request timestamp, even if the request gets pushed. This ensures the writes - // comply with the timestamp cache and closed timestamp. It also causes the - // AddSSTable to be emitted via the range feed, since it respects the closed - // timestamp. + // SSTTimestampToRequestTimestamp gives the timestamp used for all MVCC keys + // in the provided SST. If this timestamp differs from the request timestamp + // (e.g. if the request gets pushed) then all MVCC keys in the SST will be + // rewritten to the request timestamp during request evaluation. This ensures + // the writes comply with the timestamp cache and closed timestamp. It also + // causes the AddSSTable to be emitted via the range feed, since it respects + // the closed timestamp. // // Callers should always set this, except in very special circumstances when // the timestamp cache and closed timestamp can safely be ignored (e.g. @@ -1660,17 +1661,8 @@ message AddSSTableRequest { // serializability. // // Added in 22.1, so check the MVCCAddSSTable version gate before using. - bool write_at_request_timestamp = 6; - - // SSTTimestamp is a promise from the client that all MVCC timestamps in the - // SST equal the provided timestamp, and that there are no inline values, - // intents, or tombstones. When used together with WriteAtRequestTimestamp, - // this can avoid an SST rewrite (and the associated overhead) if the SST - // timestamp equals the request timestamp (i.e. if it was provided by the - // client and the request was not pushed due to e.g. the closed timestamp or - // contention). - util.hlc.Timestamp sst_timestamp = 9 - [(gogoproto.customname) = "SSTTimestamp", (gogoproto.nullable) = false]; + util.hlc.Timestamp sst_timestamp_to_request_timestamp = 6 [ + (gogoproto.customname) = "SSTTimestampToRequestTimestamp", (gogoproto.nullable) = false]; // DisallowConflicts will check for MVCC conflicts with existing keys, i.e. // scan for existing keys with a timestamp at or above the SST key and @@ -1680,7 +1672,7 @@ message AddSSTableRequest { // Note that this alone is not sufficient to guarantee serializability or // single-key linearizability, since it can write to a timestamp that another // reader has already observed, changing the value at that timestamp and above - // it. Use WriteAtRequestTimestamp in addition to guarantee serializability. + // it. Use with SSTTimestampToRequestTimestamp to guarantee serializability. // // Added in 22.1, so check the MVCCAddSSTable version gate before using. // @@ -2626,10 +2618,10 @@ message RangeFeedError { // RangeFeedSSTable is a variant of RangeFeedEvent that represents an AddSSTable // operation, containing the entire ingested SST. It is only emitted for -// SSTables written with WriteAtRequestTimestamp enabled, so it is guaranteed to -// comply with the closed timestamp. The Span and WriteTS fields are advisory, -// and contain the client-provided SST key span (may be wider than the SST data) -// and the MVCC timestamp used for all contained entries. +// SSTables written with SSTTimestampToRequestTimestamp enabled, so it is +// guaranteed to comply with the closed timestamp. The Span and WriteTS fields +// are advisory, and contain the client-provided SST key span (may be wider than +// the SST data) and the MVCC timestamp used for all contained entries. // // The entire SST is emitted even for registrations that have a narrower span, // it is up to the caller to prune the SST as appropriate. Catchup scans emit diff --git a/pkg/roachpb/api_test.go b/pkg/roachpb/api_test.go index 45d0ce748b8f..5b45ea34d975 100644 --- a/pkg/roachpb/api_test.go +++ b/pkg/roachpb/api_test.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/redact" gogoproto "github.com/gogo/protobuf/proto" @@ -315,7 +316,7 @@ func TestTenantConsumptionAddSub(t *testing.T) { func TestFlagCombinations(t *testing.T) { // Any non-zero-valued request variants that conditionally affect flags. reqVariants := []Request{ - &AddSSTableRequest{WriteAtRequestTimestamp: true}, + &AddSSTableRequest{SSTTimestampToRequestTimestamp: hlc.Timestamp{Logical: 1}}, &DeleteRangeRequest{Inline: true}, &GetRequest{KeyLocking: lock.Exclusive}, &ReverseScanRequest{KeyLocking: lock.Exclusive}, diff --git a/pkg/storage/sst.go b/pkg/storage/sst.go index 187b701729d0..cc87935da39b 100644 --- a/pkg/storage/sst.go +++ b/pkg/storage/sst.go @@ -183,8 +183,8 @@ func CheckSSTConflicts( // If the existing key has a timestamp at or above the SST key, return a // WriteTooOldError. Normally this could cause a transactional request to be // automatically retried after a read refresh, which we would only want to - // do if AddSSTable had WriteAtRequestTimestamp set, but AddSSTable cannot - // be used in transactions so we don't need to check. + // do if AddSSTable had SSTTimestampToRequestTimestamp set, but AddSSTable + // cannot be used in transactions so we don't need to check. if sstKey.Timestamp.LessEq(extKey.Timestamp) { return enginepb.MVCCStats{}, roachpb.NewWriteTooOldError( sstKey.Timestamp, extKey.Timestamp.Next(), sstKey.Key) @@ -221,17 +221,23 @@ func CheckSSTConflicts( return statsDiff, nil } -// UpdateSSTTimestamps replaces all MVCC timestamp in the provided SST with the -// given timestamp. All keys must have a non-zero timestamp, otherwise an error -// is returned to protect against accidental inclusion of intents or inline -// values. Tombstones are also rejected opportunistically, since we're iterating -// over the entire SST anyway. +// UpdateSSTTimestamps replaces all MVCC timestamp in the provided SST to the +// given timestamp. All keys must already have the given "from" timestamp. func UpdateSSTTimestamps( ctx context.Context, st *cluster.Settings, sst []byte, from, to hlc.Timestamp, concurrency int, ) ([]byte, error) { + if from.IsEmpty() { + return nil, errors.Errorf("from timestamp not given") + } + if to.IsEmpty() { + return nil, errors.Errorf("to timestamp not given") + } + sstOut := &MemFile{} sstOut.Buffer.Grow(len(sst)) - if concurrency > 0 && !from.IsEmpty() { + + // Fancy optimized Pebble SST rewriter. + if concurrency > 0 { defaults := DefaultPebbleOptions() opts := defaults.MakeReaderOptions() if fp := defaults.Levels[0].FilterPolicy; fp != nil && len(opts.Filters) == 0 { @@ -241,13 +247,16 @@ func UpdateSSTTimestamps( opts, sstOut, MakeIngestionWriterOptions(ctx, st), - encodeMVCCTimestampSuffix(from), encodeMVCCTimestampSuffix(to), + encodeMVCCTimestampSuffix(from), + encodeMVCCTimestampSuffix(to), concurrency, ); err != nil { return nil, err } return sstOut.Bytes(), nil } + + // Naïve read/write loop. writer := MakeIngestionSSTWriter(ctx, st, sstOut) defer writer.Close() @@ -263,11 +272,10 @@ func UpdateSSTTimestamps( } else if !ok { break } - if iter.UnsafeKey().Timestamp.IsEmpty() { - return nil, errors.New("inline values or intents are not supported") - } - if len(iter.UnsafeValue()) == 0 { - return nil, errors.New("SST values cannot be tombstones") + key := iter.UnsafeKey() + if key.Timestamp != from { + return nil, errors.Errorf("unexpected timestamp %s (expected %s) for key %s", + key.Timestamp, from, key.Key) } err = writer.PutMVCC(MVCCKey{Key: iter.UnsafeKey().Key, Timestamp: to}, iter.UnsafeValue()) if err != nil { diff --git a/pkg/storage/sst_test.go b/pkg/storage/sst_test.go index ece6fd874edf..3cf0dedd1bab 100644 --- a/pkg/storage/sst_test.go +++ b/pkg/storage/sst_test.go @@ -106,11 +106,12 @@ func BenchmarkUpdateSSTTimestamps(b *testing.B) { modeCounter // uint64 counter in first 8 bytes modeRandom // random values - sstSize = 0 - keyCount = 500000 - valueSize = 8 - valueMode = modeRandom - profile = false // cpuprofile.pprof + concurrency = 0 // 0 uses naïve replacement + sstSize = 0 + keyCount = 500000 + valueSize = 8 + valueMode = modeRandom + profile = false // cpuprofile.pprof ) if sstSize > 0 && keyCount > 0 { @@ -129,7 +130,7 @@ func BenchmarkUpdateSSTTimestamps(b *testing.B) { key := make([]byte, 8) value := make([]byte, valueSize) - ts := hlc.Timestamp{WallTime: 1} + sstTimestamp := hlc.Timestamp{WallTime: 1} var i uint64 for i = 0; (keyCount > 0 && i < keyCount) || (sstSize > 0 && sstFile.Len() < sstSize); i++ { binary.BigEndian.PutUint64(key, i) @@ -138,11 +139,8 @@ func BenchmarkUpdateSSTTimestamps(b *testing.B) { case modeZero: case modeCounter: binary.BigEndian.PutUint64(value, i) - ts.WallTime++ case modeRandom: r.Read(value) - ts.WallTime = r.Int63() - ts.Logical = r.Int31() default: b.Fatalf("unknown value mode %d", valueMode) } @@ -151,7 +149,7 @@ func BenchmarkUpdateSSTTimestamps(b *testing.B) { v.SetBytes(value) v.InitChecksum(key) - require.NoError(b, writer.PutMVCC(MVCCKey{Key: key, Timestamp: ts}, v.RawBytes)) + require.NoError(b, writer.PutMVCC(MVCCKey{Key: key, Timestamp: sstTimestamp}, v.RawBytes)) } writer.Close() b.Logf("%vMB %v keys", sstFile.Len()/1e6, i) @@ -165,10 +163,12 @@ func BenchmarkUpdateSSTTimestamps(b *testing.B) { defer pprof.StopCPUProfile() } + requestTimestamp := hlc.Timestamp{WallTime: 1634899098417970999, Logical: 9} + b.StartTimer() for i := 0; i < b.N; i++ { - ts := hlc.Timestamp{WallTime: 1634899098417970999, Logical: 9} - _, err := UpdateSSTTimestamps(ctx, st, sstFile.Bytes(), hlc.Timestamp{}, ts, 0) + _, err := UpdateSSTTimestamps( + ctx, st, sstFile.Bytes(), sstTimestamp, requestTimestamp, concurrency) require.NoError(b, err) } }