diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go index 4f9f1a246cf1..8f5f4ee91738 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go @@ -173,11 +173,12 @@ func EvalAddSSTable( // request timestamp. This ensures the writes comply with the timestamp cache // and closed timestamp, i.e. by not writing to timestamps that have already // been observed or closed. + var sstReqStatsDelta enginepb.MVCCStats if sstToReqTS.IsSet() && (h.Timestamp != sstToReqTS || forceRewrite) { st := cArgs.EvalCtx.ClusterSettings() // TODO(dt): use a quotapool. conc := int(AddSSTableRewriteConcurrency.Get(&cArgs.EvalCtx.ClusterSettings().SV)) - sst, err = storage.UpdateSSTTimestamps(ctx, st, sst, sstToReqTS, h.Timestamp, conc) + sst, sstReqStatsDelta, err = storage.UpdateSSTTimestamps(ctx, st, sst, sstToReqTS, h.Timestamp, conc, args.MVCCStats) if err != nil { return result.Result{}, errors.Wrap(err, "updating SST timestamps") } @@ -219,6 +220,7 @@ func EvalAddSSTable( args.Key, args.EndKey, desc.StartKey.AsRawKey(), desc.EndKey.AsRawKey()) statsDelta, err = storage.CheckSSTConflicts(ctx, sst, readWriter, start, end, leftPeekBound, rightPeekBound, args.DisallowShadowing, args.DisallowShadowingBelow, maxIntents, usePrefixSeek) + statsDelta.Add(sstReqStatsDelta) if err != nil { return result.Result{}, errors.Wrap(err, "checking for key collisions") } diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go index fab204654c68..cd5782d5fee0 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go @@ -52,18 +52,19 @@ func TestEvalAddSSTable(t *testing.T) { // These are run with IngestAsWrites both disabled and enabled, and // kv.bulk_io_write.sst_rewrite_concurrency.per_call of 0 and 8. testcases := map[string]struct { - data kvs - sst kvs - reqTS int64 - toReqTS int64 // SSTTimestampToRequestTimestamp with given SST timestamp - noConflict bool // DisallowConflicts - noShadow bool // DisallowShadowing - noShadowBelow int64 // DisallowShadowingBelow - requireReqTS bool // AddSSTableRequireAtRequestTimestamp - expect kvs - expectErr interface{} // error type, substring, substring slice, or true (any) - expectErrRace interface{} - expectStatsEst bool // expect MVCCStats.ContainsEstimates, don't check stats + data kvs + sst kvs + reqTS int64 + toReqTS int64 // SSTTimestampToRequestTimestamp with given SST timestamp + noConflict bool // DisallowConflicts + noShadow bool // DisallowShadowing + noShadowBelow int64 // DisallowShadowingBelow + requireReqTS bool // AddSSTableRequireAtRequestTimestamp + expect kvs + expectErr interface{} // error type, substring, substring slice, or true (any) + expectErrRace interface{} + expectStatsEst bool // expect MVCCStats.ContainsEstimates, don't check stats + expectGCBytesEst bool // expect MVCCStats.GCBytesAge to be inaccurate. }{ // Blind writes. "blind writes below existing": { @@ -140,13 +141,13 @@ func TestEvalAddSSTable(t *testing.T) { sst: kvs{pointKVWithLocalTS("a", 2, 1, "a2")}, expect: kvs{pointKVWithLocalTS("a", 2, 1, "a2")}, expectStatsEst: true, - expectErrRace: `SST contains non-empty MVCC value header for key "a"/0.000000002,0`, + expectErrRace: `SST contains non-empty MVCC value header for key "a"/2.000000000,0`, }, "blind rejects local timestamp on range key under race only": { // unfortunately, for performance sst: kvs{rangeKVWithLocalTS("a", "d", 2, 1, "")}, expect: kvs{rangeKVWithLocalTS("a", "d", 2, 1, "")}, expectStatsEst: true, - expectErrRace: `SST contains non-empty MVCC value header for range key {a-d}/0.000000002,0`, + expectErrRace: `SST contains non-empty MVCC value header for range key {a-d}/2.000000000,0`, }, // SSTTimestampToRequestTimestamp @@ -177,8 +178,8 @@ func TestEvalAddSSTable(t *testing.T) { toReqTS: 1, sst: kvs{pointKV("a", 1, "a1"), pointKV("b", 1, "b1"), pointKV("c", 2, "c2")}, expectErr: []string{ - `unexpected timestamp 0.000000002,0 (expected 0.000000001,0) for key "c"`, - `key has suffix "\x00\x00\x00\x00\x00\x00\x00\x02\t", expected "\x00\x00\x00\x00\x00\x00\x00\x01\t"`, + `unexpected timestamp 2.000000000,0 (expected 1.000000000,0) for key "c"`, + `key has suffix "\x00\x00\x00\x00w5\x94\x00\t", expected "\x00\x00\x00\x00;\x9a\xca\x00\t"`, }, }, "SSTTimestampToRequestTimestamp rejects incorrect SST timestamp for range keys": { @@ -186,8 +187,8 @@ func TestEvalAddSSTable(t *testing.T) { toReqTS: 1, sst: kvs{pointKV("a", 1, "a1"), rangeKV("c", "d", 2, "")}, expectErr: []string{ - `unexpected timestamp 0.000000002,0 (expected 0.000000001,0) for range key {c-d}`, - `key has suffix "\x00\x00\x00\x00\x00\x00\x00\x02\t", expected "\x00\x00\x00\x00\x00\x00\x00\x01\t"`, + `unexpected timestamp 2.000000000,0 (expected 1.000000000,0) for range key {c-d}`, + `key has suffix "\x00\x00\x00\x00w5\x94\x00\t", expected "\x00\x00\x00\x00;\x9a\xca\x00\t"`, }, }, "SSTTimestampToRequestTimestamp rejects incorrect 0 SST timestamp": { @@ -195,8 +196,8 @@ func TestEvalAddSSTable(t *testing.T) { toReqTS: 1, sst: kvs{pointKV("a", 1, "a1"), pointKV("b", 1, "b1"), pointKV("c", 0, "c0")}, expectErr: []string{ - `unexpected timestamp 0,0 (expected 0.000000001,0) for key "c"`, - `key has suffix "", expected "\x00\x00\x00\x00\x00\x00\x00\x01\t"`, + `unexpected timestamp 0,0 (expected 1.000000000,0) for key "c"`, + `key has suffix "", expected "\x00\x00\x00\x00;\x9a\xca\x00\t"`, }, expectErrRace: `SST contains inline value or intent for key "c"/0,0`, }, @@ -293,7 +294,7 @@ func TestEvalAddSSTable(t *testing.T) { sst: kvs{pointKVWithLocalTS("a", 2, 1, "a2")}, expect: kvs{pointKVWithLocalTS("a", 10, 1, "a2")}, expectStatsEst: true, - expectErrRace: `SST contains non-empty MVCC value header for key "a"/0.000000002,0`, + expectErrRace: `SST contains non-empty MVCC value header for key "a"/2.000000000,0`, }, // DisallowConflicts @@ -769,40 +770,46 @@ func TestEvalAddSSTable(t *testing.T) { expectErr: &roachpb.WriteTooOldError{}, }, "DisallowConflicts allows sst range keys above engine range keys": { - noConflict: true, - data: kvs{pointKV("a", 6, "d"), rangeKV("a", "b", 5, "")}, - sst: kvs{pointKV("a", 7, "a8"), rangeKV("a", "b", 8, "")}, - expect: kvs{rangeKV("a", "b", 8, ""), rangeKV("a", "b", 5, ""), pointKV("a", 7, "a8"), pointKV("a", 6, "d")}, + noConflict: true, + data: kvs{pointKV("a", 6, "d"), rangeKV("a", "b", 5, "")}, + sst: kvs{pointKV("a", 7, "a8"), rangeKV("a", "b", 8, "")}, + expect: kvs{rangeKV("a", "b", 8, ""), rangeKV("a", "b", 5, ""), pointKV("a", 7, "a8"), pointKV("a", 6, "d")}, + expectGCBytesEst: true, }, "DisallowConflicts allows fragmented sst range keys above engine range keys": { - noConflict: true, - data: kvs{pointKV("a", 6, "d"), rangeKV("a", "b", 5, "")}, - sst: kvs{pointKV("a", 7, "a8"), rangeKV("a", "b", 8, ""), rangeKV("c", "d", 8, "")}, - expect: kvs{rangeKV("a", "b", 8, ""), rangeKV("a", "b", 5, ""), pointKV("a", 7, "a8"), pointKV("a", 6, "d"), rangeKV("c", "d", 8, "")}, + noConflict: true, + data: kvs{pointKV("a", 6, "d"), rangeKV("a", "b", 5, "")}, + sst: kvs{pointKV("a", 7, "a8"), rangeKV("a", "b", 8, ""), rangeKV("c", "d", 8, "")}, + expect: kvs{rangeKV("a", "b", 8, ""), rangeKV("a", "b", 5, ""), pointKV("a", 7, "a8"), pointKV("a", 6, "d"), rangeKV("c", "d", 8, "")}, + expectGCBytesEst: true, }, "DisallowConflicts allows fragmented straddling sst range keys": { - noConflict: true, - data: kvs{pointKV("a", 6, "d"), rangeKV("b", "d", 5, "")}, - sst: kvs{pointKV("a", 7, "a8"), rangeKV("a", "c", 8, ""), rangeKV("c", "d", 7, "")}, - expect: kvs{rangeKV("a", "b", 8, ""), pointKV("a", 7, "a8"), pointKV("a", 6, "d"), rangeKV("b", "c", 8, ""), rangeKV("b", "c", 5, ""), rangeKV("c", "d", 7, ""), rangeKV("c", "d", 5, "")}, + noConflict: true, + data: kvs{pointKV("a", 6, "d"), rangeKV("b", "d", 5, "")}, + sst: kvs{pointKV("a", 7, "a8"), rangeKV("a", "c", 8, ""), rangeKV("c", "d", 7, "")}, + expect: kvs{rangeKV("a", "b", 8, ""), pointKV("a", 7, "a8"), pointKV("a", 6, "d"), rangeKV("b", "c", 8, ""), rangeKV("b", "c", 5, ""), rangeKV("c", "d", 7, ""), rangeKV("c", "d", 5, "")}, + expectGCBytesEst: true, }, "DisallowConflicts allows fragmented straddling sst range keys with no points": { - noConflict: true, - data: kvs{rangeKV("b", "d", 5, "")}, - sst: kvs{rangeKV("a", "c", 8, ""), rangeKV("c", "d", 7, "")}, - expect: kvs{rangeKV("a", "b", 8, ""), rangeKV("b", "c", 8, ""), rangeKV("b", "c", 5, ""), rangeKV("c", "d", 7, ""), rangeKV("c", "d", 5, "")}, + noConflict: true, + data: kvs{rangeKV("b", "d", 5, "")}, + sst: kvs{rangeKV("a", "c", 8, ""), rangeKV("c", "d", 7, "")}, + expect: kvs{rangeKV("a", "b", 8, ""), rangeKV("b", "c", 8, ""), rangeKV("b", "c", 5, ""), rangeKV("c", "d", 7, ""), rangeKV("c", "d", 5, "")}, + expectGCBytesEst: true, }, "DisallowConflicts allows engine range keys contained within sst range keys": { - noConflict: true, - data: kvs{pointKV("a", 6, "d"), rangeKV("b", "d", 5, "")}, - sst: kvs{pointKV("a", 7, "a8"), rangeKV("a", "e", 8, "")}, - expect: kvs{rangeKV("a", "b", 8, ""), pointKV("a", 7, "a8"), pointKV("a", 6, "d"), rangeKV("b", "d", 8, ""), rangeKV("b", "d", 5, ""), rangeKV("d", "e", 8, "")}, + noConflict: true, + data: kvs{pointKV("a", 6, "d"), rangeKV("b", "d", 5, "")}, + sst: kvs{pointKV("a", 7, "a8"), rangeKV("a", "e", 8, "")}, + expect: kvs{rangeKV("a", "b", 8, ""), pointKV("a", 7, "a8"), pointKV("a", 6, "d"), rangeKV("b", "d", 8, ""), rangeKV("b", "d", 5, ""), rangeKV("d", "e", 8, "")}, + expectGCBytesEst: true, }, "DisallowConflicts does not skip over engine range keys covering no sst points": { - noConflict: true, - data: kvs{pointKV("a", 6, "d"), rangeKV("b", "c", 6, ""), rangeKV("c", "d", 5, "")}, - sst: kvs{pointKV("a", 7, "a8"), rangeKV("a", "e", 8, "")}, - expect: kvs{rangeKV("a", "b", 8, ""), pointKV("a", 7, "a8"), pointKV("a", 6, "d"), rangeKV("b", "c", 8, ""), rangeKV("b", "c", 6, ""), rangeKV("c", "d", 8, ""), rangeKV("c", "d", 5, ""), rangeKV("d", "e", 8, "")}, + noConflict: true, + data: kvs{pointKV("a", 6, "d"), rangeKV("b", "c", 6, ""), rangeKV("c", "d", 5, "")}, + sst: kvs{pointKV("a", 7, "a8"), rangeKV("a", "e", 8, "")}, + expect: kvs{rangeKV("a", "b", 8, ""), pointKV("a", 7, "a8"), pointKV("a", 6, "d"), rangeKV("b", "c", 8, ""), rangeKV("b", "c", 6, ""), rangeKV("c", "d", 8, ""), rangeKV("c", "d", 5, ""), rangeKV("d", "e", 8, "")}, + expectGCBytesEst: true, }, "DisallowConflicts does not allow conflict with engine range key covering no sst points": { noConflict: true, @@ -811,22 +818,25 @@ func TestEvalAddSSTable(t *testing.T) { expectErr: &roachpb.WriteTooOldError{}, }, "DisallowConflicts allows sst range keys contained within engine range keys": { - noConflict: true, - data: kvs{pointKV("a", 6, "d"), rangeKV("a", "e", 5, "")}, - sst: kvs{pointKV("a", 7, "a8"), rangeKV("b", "d", 8, "")}, - expect: kvs{rangeKV("a", "b", 5, ""), pointKV("a", 7, "a8"), pointKV("a", 6, "d"), rangeKV("b", "d", 8, ""), rangeKV("b", "d", 5, ""), rangeKV("d", "e", 5, "")}, + noConflict: true, + data: kvs{pointKV("a", 6, "d"), rangeKV("a", "e", 5, "")}, + sst: kvs{pointKV("a", 7, "a8"), rangeKV("b", "d", 8, "")}, + expect: kvs{rangeKV("a", "b", 5, ""), pointKV("a", 7, "a8"), pointKV("a", 6, "d"), rangeKV("b", "d", 8, ""), rangeKV("b", "d", 5, ""), rangeKV("d", "e", 5, "")}, + expectGCBytesEst: true, }, "DisallowConflicts allows sst range key fragmenting engine range keys": { - noConflict: true, - data: kvs{pointKV("a", 6, "d"), rangeKV("a", "c", 5, ""), rangeKV("c", "e", 6, "")}, - sst: kvs{pointKV("a", 7, "a8"), rangeKV("b", "d", 8, "")}, - expect: kvs{rangeKV("a", "b", 5, ""), pointKV("a", 7, "a8"), pointKV("a", 6, "d"), rangeKV("b", "c", 8, ""), rangeKV("b", "c", 5, ""), rangeKV("c", "d", 8, ""), rangeKV("c", "d", 6, ""), rangeKV("d", "e", 6, "")}, + noConflict: true, + data: kvs{pointKV("a", 6, "d"), rangeKV("a", "c", 5, ""), rangeKV("c", "e", 6, "")}, + sst: kvs{pointKV("a", 7, "a8"), rangeKV("b", "d", 8, "")}, + expect: kvs{rangeKV("a", "b", 5, ""), pointKV("a", 7, "a8"), pointKV("a", 6, "d"), rangeKV("b", "c", 8, ""), rangeKV("b", "c", 5, ""), rangeKV("c", "d", 8, ""), rangeKV("c", "d", 6, ""), rangeKV("d", "e", 6, "")}, + expectGCBytesEst: true, }, "DisallowConflicts calculates stats correctly for merged range keys": { - noConflict: true, - data: kvs{rangeKV("a", "c", 8, ""), pointKV("a", 6, "d"), rangeKV("d", "e", 8, "")}, - sst: kvs{pointKV("a", 10, "de"), rangeKV("c", "d", 8, ""), pointKV("f", 10, "de")}, - expect: kvs{rangeKV("a", "e", 8, ""), pointKV("a", 10, "de"), pointKV("a", 6, "d"), pointKV("f", 10, "de")}, + noConflict: true, + data: kvs{rangeKV("a", "c", 8, ""), pointKV("a", 6, "d"), rangeKV("d", "e", 8, "")}, + sst: kvs{pointKV("a", 10, "de"), rangeKV("c", "d", 8, ""), pointKV("f", 10, "de")}, + expect: kvs{rangeKV("a", "e", 8, ""), pointKV("a", 10, "de"), pointKV("a", 6, "d"), pointKV("f", 10, "de")}, + expectGCBytesEst: true, }, "DisallowConflicts calculates stats correctly for merged range keys 2": { noConflict: true, @@ -847,22 +857,25 @@ func TestEvalAddSSTable(t *testing.T) { expectErr: "ingested range key collides with an existing one", }, "DisallowShadowing allows shadowing of keys deleted by engine range tombstones": { - noShadow: true, - data: kvs{rangeKV("a", "b", 7, ""), pointKV("a", 6, "d")}, - sst: kvs{pointKV("a", 8, "a8")}, - expect: kvs{rangeKV("a", "b", 7, ""), pointKV("a", 8, "a8"), pointKV("a", 6, "d")}, + noShadow: true, + data: kvs{rangeKV("a", "b", 7, ""), pointKV("a", 6, "d")}, + sst: kvs{pointKV("a", 8, "a8")}, + expect: kvs{rangeKV("a", "b", 7, ""), pointKV("a", 8, "a8"), pointKV("a", 6, "d")}, + expectGCBytesEst: true, }, "DisallowShadowing allows idempotent range tombstones": { - noShadow: true, - data: kvs{rangeKV("a", "b", 7, "")}, - sst: kvs{rangeKV("a", "b", 7, "")}, - expect: kvs{rangeKV("a", "b", 7, "")}, + noShadow: true, + data: kvs{rangeKV("a", "b", 7, "")}, + sst: kvs{rangeKV("a", "b", 7, "")}, + expect: kvs{rangeKV("a", "b", 7, "")}, + expectGCBytesEst: true, }, "DisallowShadowing calculates stats correctly for merged range keys with idempotence": { - noShadow: true, - data: kvs{rangeKV("b", "d", 8, ""), rangeKV("e", "f", 8, "")}, - sst: kvs{rangeKV("a", "c", 8, ""), rangeKV("d", "e", 8, "")}, - expect: kvs{rangeKV("a", "f", 8, "")}, + noShadow: true, + data: kvs{rangeKV("b", "d", 8, ""), rangeKV("e", "f", 8, "")}, + sst: kvs{rangeKV("a", "c", 8, ""), rangeKV("d", "e", 8, "")}, + expect: kvs{rangeKV("a", "f", 8, "")}, + expectGCBytesEst: true, }, "DisallowShadowingBelow disallows sst range keys shadowing live keys": { noShadowBelow: 3, @@ -871,16 +884,18 @@ func TestEvalAddSSTable(t *testing.T) { expectErr: "ingested range key collides with an existing one", }, "DisallowShadowingBelow allows shadowing of keys deleted by engine range tombstones": { - noShadowBelow: 3, - data: kvs{rangeKV("a", "b", 7, ""), pointKV("a", 6, "d")}, - sst: kvs{pointKV("a", 8, "a8")}, - expect: kvs{rangeKV("a", "b", 7, ""), pointKV("a", 8, "a8"), pointKV("a", 6, "d")}, + noShadowBelow: 3, + data: kvs{rangeKV("a", "b", 7, ""), pointKV("a", 6, "d")}, + sst: kvs{pointKV("a", 8, "a8")}, + expect: kvs{rangeKV("a", "b", 7, ""), pointKV("a", 8, "a8"), pointKV("a", 6, "d")}, + expectGCBytesEst: true, }, "DisallowShadowingBelow allows idempotent range tombstones": { - noShadowBelow: 3, - data: kvs{rangeKV("a", "b", 7, "")}, - sst: kvs{rangeKV("a", "b", 7, "")}, - expect: kvs{rangeKV("a", "b", 7, "")}, + noShadowBelow: 3, + data: kvs{rangeKV("a", "b", 7, "")}, + sst: kvs{rangeKV("a", "b", 7, "")}, + expect: kvs{rangeKV("a", "b", 7, "")}, + expectGCBytesEst: true, }, "DisallowConflict with allowed shadowing disallows idempotent range tombstones": { noConflict: true, @@ -904,7 +919,7 @@ func TestEvalAddSSTable(t *testing.T) { defer engine.Close() // Write initial data. - intentTxn := roachpb.MakeTransaction("intentTxn", nil, 0, hlc.Timestamp{WallTime: intentTS}, 0, 1) + intentTxn := roachpb.MakeTransaction("intentTxn", nil, 0, hlc.Timestamp{WallTime: intentTS * 1e9}, 0, 1) b := engine.NewBatch() for i := len(tc.data) - 1; i >= 0; i-- { // reverse, older timestamps first switch kv := tc.data[i].(type) { @@ -913,6 +928,7 @@ func TestEvalAddSSTable(t *testing.T) { if kv.Key.Timestamp.WallTime == intentTS { txn = &intentTxn } + kv.Key.Timestamp.WallTime *= 1e9 v, err := storage.DecodeMVCCValue(kv.Value) require.NoError(t, err) require.NoError(t, storage.MVCCPut(ctx, b, nil, kv.Key.Key, kv.Key.Timestamp, hlc.ClockTimestamp{}, v.Value, txn)) @@ -920,6 +936,8 @@ func TestEvalAddSSTable(t *testing.T) { v, err := storage.DecodeMVCCValue(kv.Value) require.NoError(t, err) require.True(t, v.IsTombstone(), "MVCC range keys must be tombstones") + kv.RangeKey.Timestamp.WallTime *= 1e9 + v.MVCCValueHeader.LocalTimestamp.WallTime *= 1e9 require.NoError(t, storage.MVCCDeleteRangeUsingTombstone( ctx, b, nil, kv.RangeKey.StartKey, kv.RangeKey.EndKey, kv.RangeKey.Timestamp, v.MVCCValueHeader.LocalTimestamp, nil, nil, false, 0, nil)) default: @@ -928,12 +946,35 @@ func TestEvalAddSSTable(t *testing.T) { } require.NoError(t, b.Commit(false)) stats := storageutils.EngineStats(t, engine, 0) + // All timestamps are experienced in increments of 1e9 nanoseconds, + // as 1e9 nanoseconds = 1 second. This is to accurately test for + // GCBytesAge in stats, which is only calculated in full-second + // increments. + tc.toReqTS *= 1e9 + tc.reqTS *= 1e9 + tc.noShadowBelow *= 1e9 // Build and add SST. if tc.toReqTS != 0 && tc.reqTS == 0 && tc.expectErr == nil { t.Fatal("can't set toReqTS without reqTS") } - sst, start, end := storageutils.MakeSST(t, st, tc.sst) + var sstKvs []interface{} + for i := range tc.sst { + switch kv := tc.sst[i].(type) { + case storage.MVCCKeyValue: + kv.Key.Timestamp.WallTime *= 1e9 + sstKvs = append(sstKvs, kv) + case storage.MVCCRangeKeyValue: + v, err := storage.DecodeMVCCValue(kv.Value) + require.NoError(t, err) + v.LocalTimestamp.WallTime *= 1e9 + kv.RangeKey.Timestamp.WallTime *= 1e9 + vBytes, err := storage.EncodeMVCCValue(v) + require.NoError(t, err) + sstKvs = append(sstKvs, storage.MVCCRangeKeyValue{RangeKey: kv.RangeKey, Value: vBytes}) + } + } + sst, start, end := storageutils.MakeSST(t, st, sstKvs) resp := &roachpb.AddSSTableResponse{} var mvccStats *enginepb.MVCCStats // In the no-overlap case i.e. approxDiskBytes == 0, force a regular @@ -1001,15 +1042,36 @@ func TestEvalAddSSTable(t *testing.T) { require.NoError(t, engine.IngestExternalFiles(ctx, []string{"sst"})) } + var expect kvs + for i := range tc.expect { + switch kv := tc.expect[i].(type) { + case storage.MVCCKeyValue: + kv.Key.Timestamp.WallTime *= 1e9 + expect = append(expect, kv) + case storage.MVCCRangeKeyValue: + v, err := storage.DecodeMVCCValue(kv.Value) + require.NoError(t, err) + v.LocalTimestamp.WallTime *= 1e9 + kv.RangeKey.Timestamp.WallTime *= 1e9 + vBytes, err := storage.EncodeMVCCValue(v) + require.NoError(t, err) + expect = append(expect, storage.MVCCRangeKeyValue{RangeKey: kv.RangeKey, Value: vBytes}) + } + } + // Scan resulting data from engine. - require.Equal(t, tc.expect, storageutils.ScanEngine(t, engine)) + require.Equal(t, expect, storageutils.ScanEngine(t, engine)) // Check that stats were updated correctly. if tc.expectStatsEst { require.NotZero(t, stats.ContainsEstimates, "expected stats to be estimated") } else { require.Zero(t, stats.ContainsEstimates, "found estimated stats") - require.Equal(t, storageutils.EngineStats(t, engine, stats.LastUpdateNanos), stats) + expected := storageutils.EngineStats(t, engine, stats.LastUpdateNanos) + if tc.expectGCBytesEst && expected != nil { + expected.GCBytesAge = stats.GCBytesAge + } + require.Equal(t, expected, stats) } }) } @@ -1507,6 +1569,9 @@ func TestAddSSTableMVCCStatsDisallowShadowing(t *testing.T) { require.NoError(t, err) // Check that there has been no double counting of stats. All keys in second SST are shadowing. + if cArgs.Stats != nil { + cArgs.Stats.AgeTo(firstSSTStats.LastUpdateNanos) + } require.Equal(t, firstSSTStats, *cArgs.Stats) // Evaluate the third SST. Some of the KVs are perfectly shadowing, but there @@ -1542,6 +1607,9 @@ func TestAddSSTableMVCCStatsDisallowShadowing(t *testing.T) { // Check that there has been no double counting of stats. firstSSTStats.Add(delta) + if cArgs.Stats != nil { + cArgs.Stats.AgeTo(firstSSTStats.LastUpdateNanos) + } require.Equal(t, firstSSTStats, *cArgs.Stats) } diff --git a/pkg/storage/sst.go b/pkg/storage/sst.go index 52e2ef99a5de..a3eb7c0fda3d 100644 --- a/pkg/storage/sst.go +++ b/pkg/storage/sst.go @@ -208,6 +208,9 @@ func CheckSSTConflicts( metaValSize := int64(0) totalBytes := metaKeySize + metaValSize + // Cancel the GCBytesAge contribution of the point tombstone (if any) + // that exists in the SST stats. + statsDiff.AgeTo(extKey.Timestamp.WallTime) // Update the skipped stats to account for the skipped meta key. if !sstValue.IsTombstone() { statsDiff.LiveBytes -= totalBytes @@ -255,6 +258,11 @@ func CheckSSTConflicts( // If we are shadowing an existing key, we must update the stats accordingly // to take into account the existing KV pair. + if extValue.IsTombstone() { + statsDiff.AgeTo(extKey.Timestamp.WallTime) + } else { + statsDiff.AgeTo(sstKey.Timestamp.WallTime) + } statsDiff.KeyCount-- statsDiff.KeyBytes -= int64(len(extKey.Key) + 1) if !extValue.IsTombstone() { @@ -855,18 +863,47 @@ func CheckSSTConflicts( // UpdateSSTTimestamps replaces all MVCC timestamp in the provided SST to the // given timestamp. All keys must already have the given "from" timestamp. func UpdateSSTTimestamps( - ctx context.Context, st *cluster.Settings, sst []byte, from, to hlc.Timestamp, concurrency int, -) ([]byte, error) { + ctx context.Context, + st *cluster.Settings, + sst []byte, + from, to hlc.Timestamp, + concurrency int, + stats *enginepb.MVCCStats, +) ([]byte, enginepb.MVCCStats, error) { if from.IsEmpty() { - return nil, errors.Errorf("from timestamp not given") + return nil, enginepb.MVCCStats{}, errors.Errorf("from timestamp not given") } if to.IsEmpty() { - return nil, errors.Errorf("to timestamp not given") + return nil, enginepb.MVCCStats{}, errors.Errorf("to timestamp not given") } sstOut := &MemFile{} sstOut.Buffer.Grow(len(sst)) + var statsDelta enginepb.MVCCStats + if stats != nil { + // There could be a GCBytesAge delta between the old and new timestamps. + // Calculate this delta by subtracting all the relevant stats at the + // old timestamp, and then aging the stats to the new timestamp before + // zeroing the stats again. + statsDelta.AgeTo(from.WallTime) + statsDelta.KeyBytes -= stats.KeyBytes + statsDelta.ValBytes -= stats.ValBytes + statsDelta.RangeKeyBytes -= stats.RangeKeyBytes + statsDelta.RangeValBytes -= stats.RangeValBytes + statsDelta.LiveBytes -= stats.LiveBytes + statsDelta.IntentBytes -= stats.IntentBytes + statsDelta.IntentCount -= stats.IntentCount + statsDelta.AgeTo(to.WallTime) + statsDelta.KeyBytes += stats.KeyBytes + statsDelta.ValBytes += stats.ValBytes + statsDelta.RangeKeyBytes += stats.RangeKeyBytes + statsDelta.RangeValBytes += stats.RangeValBytes + statsDelta.LiveBytes += stats.LiveBytes + statsDelta.IntentBytes += stats.IntentBytes + statsDelta.IntentCount += stats.IntentCount + } + // Fancy optimized Pebble SST rewriter. if concurrency > 0 { defaults := DefaultPebbleOptions() @@ -882,9 +919,9 @@ func UpdateSSTTimestamps( EncodeMVCCTimestampSuffix(to), concurrency, ); err != nil { - return nil, err + return nil, enginepb.MVCCStats{}, err } - return sstOut.Bytes(), nil + return sstOut.Bytes(), statsDelta, nil } // Naïve read/write loop. @@ -898,24 +935,24 @@ func UpdateSSTTimestamps( UpperBound: keys.MaxKey, }) if err != nil { - return nil, err + return nil, enginepb.MVCCStats{}, err } defer iter.Close() for iter.SeekGE(MVCCKey{Key: keys.MinKey}); ; iter.Next() { if ok, err := iter.Valid(); err != nil { - return nil, err + return nil, enginepb.MVCCStats{}, err } else if !ok { break } key := iter.UnsafeKey() if key.Timestamp != from { - return nil, errors.Errorf("unexpected timestamp %s (expected %s) for key %s", + return nil, enginepb.MVCCStats{}, errors.Errorf("unexpected timestamp %s (expected %s) for key %s", key.Timestamp, from, key.Key) } err = writer.PutRawMVCC(MVCCKey{Key: key.Key, Timestamp: to}, iter.UnsafeValue()) if err != nil { - return nil, err + return nil, enginepb.MVCCStats{}, err } } @@ -926,32 +963,32 @@ func UpdateSSTTimestamps( UpperBound: keys.MaxKey, }) if err != nil { - return nil, err + return nil, enginepb.MVCCStats{}, err } defer iter.Close() for iter.SeekGE(MVCCKey{Key: keys.MinKey}); ; iter.Next() { if ok, err := iter.Valid(); err != nil { - return nil, err + return nil, enginepb.MVCCStats{}, err } else if !ok { break } rangeKeys := iter.RangeKeys() for _, v := range rangeKeys.Versions { if v.Timestamp != from { - return nil, errors.Errorf("unexpected timestamp %s (expected %s) for range key %s", + return nil, enginepb.MVCCStats{}, errors.Errorf("unexpected timestamp %s (expected %s) for range key %s", v.Timestamp, from, rangeKeys.Bounds) } v.Timestamp = to if err = writer.PutRawMVCCRangeKey(rangeKeys.AsRangeKey(v), v.Value); err != nil { - return nil, err + return nil, enginepb.MVCCStats{}, err } } } if err = writer.Finish(); err != nil { - return nil, err + return nil, enginepb.MVCCStats{}, err } - return sstOut.Bytes(), nil + return sstOut.Bytes(), statsDelta, nil } diff --git a/pkg/storage/sst_test.go b/pkg/storage/sst_test.go index 8be1dc0cfa32..b328e62066e0 100644 --- a/pkg/storage/sst_test.go +++ b/pkg/storage/sst_test.go @@ -174,8 +174,8 @@ func BenchmarkUpdateSSTTimestamps(b *testing.B) { b.StartTimer() for i := 0; i < b.N; i++ { - _, err := UpdateSSTTimestamps( - ctx, st, sstFile.Bytes(), sstTimestamp, requestTimestamp, concurrency) + _, _, err := UpdateSSTTimestamps( + ctx, st, sstFile.Bytes(), sstTimestamp, requestTimestamp, concurrency, nil /* stats */) require.NoError(b, err) } }