From a8a10a45831ee96565945edb6c7323893a5f8398 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 28 Sep 2022 12:48:49 +0100 Subject: [PATCH 1/4] kvserver: remove legacy snapshot and diff code This commit removes the proto and code for RaftDataSnapshot. This type was used for reporting replica snapshots and computing diffs, but now this functionality has been removed in favor of storage checkpoints and offline tooling. Release note: None --- pkg/kv/kvserver/BUILD.bazel | 2 - pkg/kv/kvserver/consistency_queue_test.go | 28 +-- pkg/kv/kvserver/helpers_test.go | 13 ++ pkg/kv/kvserver/replica_consistency.go | 57 +---- pkg/kv/kvserver/replica_consistency_diff.go | 208 ------------------ pkg/kv/kvserver/replica_consistency_test.go | 19 +- pkg/kv/kvserver/replica_test.go | 109 +-------- pkg/kv/kvserver/store.go | 4 +- pkg/kv/kvserver/store_test.go | 9 +- .../testdata/replica_consistency_sha512 | 7 - pkg/roachpb/internal_raft.proto | 24 -- 11 files changed, 32 insertions(+), 448 deletions(-) delete mode 100644 pkg/kv/kvserver/replica_consistency_diff.go diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index b3c2c5099899..ce4530c376e4 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -39,7 +39,6 @@ go_library( "replica_closedts.go", "replica_command.go", "replica_consistency.go", - "replica_consistency_diff.go", "replica_corruption.go", "replica_destroy.go", "replica_eval_context.go", @@ -168,7 +167,6 @@ go_library( "//pkg/util", "//pkg/util/admission", "//pkg/util/admission/admissionpb", - "//pkg/util/bufalloc", "//pkg/util/buildutil", "//pkg/util/circuit", "//pkg/util/contextutil", diff --git a/pkg/kv/kvserver/consistency_queue_test.go b/pkg/kv/kvserver/consistency_queue_test.go index 095ab2fcc0d6..7ade78a3b623 100644 --- a/pkg/kv/kvserver/consistency_queue_test.go +++ b/pkg/kv/kvserver/consistency_queue_test.go @@ -347,9 +347,8 @@ func TestCheckConsistencyInconsistent(t *testing.T) { assert.Contains(t, resp.Result[0].Detail, `[minority]`) assert.Contains(t, resp.Result[0].Detail, `stats`) - // Checkpoints should have been created on all stores. Load replica snapshots - // from them. - snaps := make([]roachpb.RaftSnapshotData, numStores) + // Checkpoints should have been created on all stores. + hashes := make([][]byte, numStores) for i := 0; i < numStores; i++ { cps := onDiskCheckpointPaths(i) require.Len(t, cps, 1) @@ -379,23 +378,14 @@ func TestCheckConsistencyInconsistent(t *testing.T) { })) require.NotNil(t, desc) - // Load the content of the problematic range. - snap, err := kvserver.LoadRaftSnapshotDataForTesting(context.Background(), *desc, cpEng) + // Compute a checksum over the content of the problematic range. + hash, err := kvserver.ChecksumRange(context.Background(), *desc, cpEng) require.NoError(t, err) - snaps[i] = snap - } - - assert.Empty(t, kvserver.DiffRange(&snaps[0], &snaps[2])) // s1 and s3 agree - diff := kvserver.DiffRange(&snaps[0], &snaps[1]) - diff[0].Timestamp = hlc.Timestamp{Logical: 987, WallTime: 123} // for determinism - wantDiff := `--- leaseholder -+++ follower -+0.000000123,987 "e" -+ ts:1970-01-01 00:00:00.000000123 +0000 UTC -+ value:"\x00\x00\x00\x00\x01T" -+ raw mvcc_key/value: 6500000000000000007b000003db0d 000000000154 -` - assert.Equal(t, wantDiff, diff.String()) + hashes[i] = hash + } + + assert.Equal(t, hashes[0], hashes[2]) // s1 and s3 agree + assert.NotEqual(t, hashes[0], hashes[1]) // s2 diverged // A death rattle should have been written on s2 (store index 1). eng := store1.Engine() diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 1d4032700165..9984c51c7871 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -550,3 +550,16 @@ func WatchForDisappearingReplicas(t testing.TB, store *Store) { } } } + +// ChecksumRange returns a checksum over the KV data of the given range. +func ChecksumRange( + ctx context.Context, desc roachpb.RangeDescriptor, snap storage.Reader, +) ([]byte, error) { + var r *Replica // TODO(pavelkalinnikov): make this less ugly. + lim := quotapool.NewRateLimiter("test", 1<<30, 1<<30) + res, err := r.sha512(ctx, desc, snap, roachpb.ChecksumMode_CHECK_FULL, lim) + if err != nil { + return nil, err + } + return res.SHA512[:], nil +} diff --git a/pkg/kv/kvserver/replica_consistency.go b/pkg/kv/kvserver/replica_consistency.go index 76793390cff3..9a527e8e30ee 100644 --- a/pkg/kv/kvserver/replica_consistency.go +++ b/pkg/kv/kvserver/replica_consistency.go @@ -19,7 +19,6 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" @@ -30,7 +29,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/fs" - "github.com/cockroachdb/cockroach/pkg/util/bufalloc" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -482,34 +480,18 @@ type replicaHash struct { PersistedMS, RecomputedMS enginepb.MVCCStats } -// LoadRaftSnapshotDataForTesting returns all the KV data of the given range. -// Only for testing. -func LoadRaftSnapshotDataForTesting( - ctx context.Context, rd roachpb.RangeDescriptor, store storage.Reader, -) (roachpb.RaftSnapshotData, error) { - var r *Replica - var snap roachpb.RaftSnapshotData - lim := quotapool.NewRateLimiter("test", 1<<20, 1<<20) - if _, err := r.sha512(ctx, rd, store, &snap, roachpb.ChecksumMode_CHECK_FULL, lim); err != nil { - return roachpb.RaftSnapshotData{}, err - } - return snap, nil -} - // sha512 computes the SHA512 hash of all the replica data at the snapshot. // It will dump all the kv data into snapshot if it is provided. func (*Replica) sha512( ctx context.Context, desc roachpb.RangeDescriptor, snap storage.Reader, - snapshot *roachpb.RaftSnapshotData, mode roachpb.ChecksumMode, limiter *quotapool.RateLimiter, ) (*replicaHash, error) { statsOnly := mode == roachpb.ChecksumMode_CHECK_STATS // Iterate over all the data in the range. - var alloc bufalloc.ByteAllocator var intBuf [8]byte var legacyTimestamp hlc.LegacyTimestamp var timestampBuf []byte @@ -520,17 +502,6 @@ func (*Replica) sha512( if err := limiter.WaitN(ctx, int64(len(unsafeKey.Key)+len(unsafeValue))); err != nil { return err } - - if snapshot != nil { - // Add (a copy of) the kv pair into the debug message. - kv := roachpb.RaftSnapshotData_KeyValue{ - Timestamp: unsafeKey.Timestamp, - } - alloc, kv.Key = alloc.Copy(unsafeKey.Key, 0) - alloc, kv.Value = alloc.Copy(unsafeValue, 0) - snapshot.KV = append(snapshot.KV, kv) - } - // Encode the length of the key and value. binary.LittleEndian.PutUint64(intBuf[:], uint64(len(unsafeKey.Key))) if _, err := hasher.Write(intBuf[:]); err != nil { @@ -566,18 +537,6 @@ func (*Replica) sha512( if err != nil { return err } - - if snapshot != nil { - // Add (a copy of) the range key into the debug message. - rkv := roachpb.RaftSnapshotData_RangeKeyValue{ - Timestamp: rangeKV.RangeKey.Timestamp, - } - alloc, rkv.StartKey = alloc.Copy(rangeKV.RangeKey.StartKey, 0) - alloc, rkv.EndKey = alloc.Copy(rangeKV.RangeKey.EndKey, 0) - alloc, rkv.Value = alloc.Copy(rangeKV.Value, 0) - snapshot.RangeKV = append(snapshot.RangeKV, rkv) - } - // Encode the length of the start key and end key. binary.LittleEndian.PutUint64(intBuf[:], uint64(len(rangeKV.RangeKey.StartKey))) if _, err := hasher.Write(intBuf[:]); err != nil { @@ -639,19 +598,6 @@ func (*Replica) sha512( if err != nil { return nil, err } - if snapshot != nil { - // Add LeaseAppliedState to the snapshot. - kv := roachpb.RaftSnapshotData_KeyValue{ - Timestamp: hlc.Timestamp{}, - } - kv.Key = keys.RangeAppliedStateKey(desc.RangeID) - var v roachpb.Value - if err := v.SetProto(rangeAppliedState); err != nil { - return nil, err - } - kv.Value = v.RawBytes - snapshot.KV = append(snapshot.KV, kv) - } if _, err := hasher.Write(b); err != nil { return nil, err } @@ -725,6 +671,7 @@ func (r *Replica) computeChecksumPostApply( defer taskCancel() defer snap.Close() defer cleanup() + // Wait until the CollectChecksum request handler joins in and learns about // the starting computation, and then start it. if err := contextutil.RunWithTimeout(ctx, taskName, consistencyCheckSyncTimeout, @@ -743,7 +690,7 @@ func (r *Replica) computeChecksumPostApply( ); err != nil { log.Errorf(ctx, "checksum collection did not join: %v", err) } else { - result, err := r.sha512(ctx, desc, snap, nil /* snapshot */, cc.Mode, r.store.consistencyLimiter) + result, err := r.sha512(ctx, desc, snap, cc.Mode, r.store.consistencyLimiter) if err != nil { log.Errorf(ctx, "checksum computation failed: %v", err) result = nil diff --git a/pkg/kv/kvserver/replica_consistency_diff.go b/pkg/kv/kvserver/replica_consistency_diff.go deleted file mode 100644 index cce3c96802bf..000000000000 --- a/pkg/kv/kvserver/replica_consistency_diff.go +++ /dev/null @@ -1,208 +0,0 @@ -// Copyright 2014 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package kvserver - -import ( - "bytes" - fmt "fmt" - - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/storage" - "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/redact" -) - -// ReplicaSnapshotDiff is a part of a []ReplicaSnapshotDiff which represents a diff between -// two replica snapshots. For now it's only a diff between their KV pairs. -type ReplicaSnapshotDiff struct { - // LeaseHolder is set to true of this kv pair is only present on the lease - // holder. - LeaseHolder bool - Key roachpb.Key - EndKey roachpb.Key // only set for range keys (MVCCRangeKey) - Timestamp hlc.Timestamp - Value []byte -} - -// ReplicaSnapshotDiffSlice groups multiple ReplicaSnapshotDiff records and -// exposes a formatting helper. -type ReplicaSnapshotDiffSlice []ReplicaSnapshotDiff - -// SafeFormat implements redact.SafeFormatter. -func (rsds ReplicaSnapshotDiffSlice) SafeFormat(buf redact.SafePrinter, _ rune) { - buf.Printf("--- leaseholder\n+++ follower\n") - for _, d := range rsds { - prefix := redact.SafeString("+") - if d.LeaseHolder { - // Lease holder (RHS) has something follower (LHS) does not have. - prefix = redact.SafeString("-") - } - if len(d.EndKey) > 0 { - const rangeKVFormat = `%s%s %s -%s ts:%s -%s value:%s -%s raw from/to/ts/value: %x %x %x %x -` - buf.Printf(rangeKVFormat, - prefix, d.Timestamp, roachpb.Span{Key: d.Key, EndKey: d.EndKey}, - prefix, d.Timestamp.GoTime(), - prefix, fmt.Sprintf("%q", d.Value), // just print the raw value for now - prefix, storage.EncodeMVCCKeyPrefix(d.Key), storage.EncodeMVCCKeyPrefix(d.EndKey), - storage.EncodeMVCCTimestampSuffix(d.Timestamp), d.Value) - - } else { - const kvFormat = `%s%s %s -%s ts:%s -%s value:%s -%s raw mvcc_key/value: %x %x -` - mvccKey := storage.MVCCKey{Key: d.Key, Timestamp: d.Timestamp} - buf.Printf(kvFormat, - prefix, d.Timestamp, d.Key, - prefix, d.Timestamp.GoTime(), - prefix, SprintMVCCKeyValue( - storage.MVCCKeyValue{Key: mvccKey, Value: d.Value}, false /* printKey */), - prefix, storage.EncodeMVCCKey(mvccKey), d.Value) - } - } -} - -func (rsds ReplicaSnapshotDiffSlice) String() string { - return redact.StringWithoutMarkers(rsds) -} - -// DiffRange diffs the two KV dumps between the leaseholder and the replica. -func DiffRange(l, r *roachpb.RaftSnapshotData) ReplicaSnapshotDiffSlice { - if l == nil || r == nil { - return nil - } - var diff ReplicaSnapshotDiffSlice - diff = append(diff, diffKVs(l.KV, r.KV)...) - diff = append(diff, diffRangeKeys(l.RangeKV, r.RangeKV)...) - return diff -} - -func diffKVs(l, r []roachpb.RaftSnapshotData_KeyValue) ReplicaSnapshotDiffSlice { - var diff []ReplicaSnapshotDiff - i, j := 0, 0 - for { - var e, v roachpb.RaftSnapshotData_KeyValue - if i < len(l) { - e = l[i] - } - if j < len(r) { - v = r[j] - } - - addLeaseHolder := func() { - diff = append(diff, ReplicaSnapshotDiff{LeaseHolder: true, Key: e.Key, Timestamp: e.Timestamp, Value: e.Value}) - i++ - } - addReplica := func() { - diff = append(diff, ReplicaSnapshotDiff{LeaseHolder: false, Key: v.Key, Timestamp: v.Timestamp, Value: v.Value}) - j++ - } - - // Compare keys. - var comp int - if e.Key == nil && v.Key == nil { - // Done! - break - } else if e.Key == nil { - // Finished traversing over all the lease holder keys. - comp = 1 - } else if v.Key == nil { - // Finished traversing over all the replica keys. - comp = -1 - } else { - // Both lease holder and replica keys exist. Compare them. - comp = storage.MVCCKey{Key: e.Key, Timestamp: e.Timestamp}.Compare( - storage.MVCCKey{Key: v.Key, Timestamp: v.Timestamp}) - } - - switch comp { - case -1: - addLeaseHolder() - case 1: - addReplica() - case 0: - if !bytes.Equal(e.Value, v.Value) { - addLeaseHolder() - addReplica() - } else { - // No diff; skip. - i++ - j++ - } - } - } - return diff -} - -func diffRangeKeys(l, r []roachpb.RaftSnapshotData_RangeKeyValue) ReplicaSnapshotDiffSlice { - var diff []ReplicaSnapshotDiff - i, j := 0, 0 - for { - var e, v roachpb.RaftSnapshotData_RangeKeyValue - if i < len(l) { - e = l[i] - } - if j < len(r) { - v = r[j] - } - - addLeaseHolder := func() { - diff = append(diff, ReplicaSnapshotDiff{LeaseHolder: true, Key: e.StartKey, EndKey: e.EndKey, - Timestamp: e.Timestamp, Value: e.Value}) - i++ - } - addReplica := func() { - diff = append(diff, ReplicaSnapshotDiff{LeaseHolder: false, Key: v.StartKey, EndKey: v.EndKey, - Timestamp: v.Timestamp, Value: v.Value}) - j++ - } - - // Compare keys. - var comp int - if e.StartKey == nil && e.EndKey == nil && v.StartKey == nil && v.EndKey == nil { - // Done! - break - } else if e.StartKey == nil && e.EndKey == nil { - // Finished traversing over all the lease holder keys. - comp = 1 - } else if v.StartKey == nil && v.EndKey == nil { - // Finished traversing over all the replica keys. - comp = -1 - } else { - // Both lease holder and replica keys exist. Compare them. - eMVCC := storage.MVCCRangeKey{StartKey: e.StartKey, EndKey: e.EndKey, Timestamp: e.Timestamp} - vMVCC := storage.MVCCRangeKey{StartKey: v.StartKey, EndKey: v.EndKey, Timestamp: v.Timestamp} - comp = eMVCC.Compare(vMVCC) - } - - switch comp { - case -1: - addLeaseHolder() - case 1: - addReplica() - case 0: - if !bytes.Equal(e.Value, v.Value) { - addLeaseHolder() - addReplica() - } else { - // No diff; skip. - i++ - j++ - } - } - } - return diff -} diff --git a/pkg/kv/kvserver/replica_consistency_test.go b/pkg/kv/kvserver/replica_consistency_test.go index 599b60d9fe78..dbee6d69a60e 100644 --- a/pkg/kv/kvserver/replica_consistency_test.go +++ b/pkg/kv/kvserver/replica_consistency_test.go @@ -176,7 +176,7 @@ func TestReplicaChecksumSHA512(t *testing.T) { } // Hash the empty state. - rh, err := repl.sha512(ctx, desc, eng, nil, roachpb.ChecksumMode_CHECK_FULL, lim) + rh, err := repl.sha512(ctx, desc, eng, roachpb.ChecksumMode_CHECK_FULL, lim) require.NoError(t, err) fmt.Fprintf(sb, "checksum0: %x\n", rh.SHA512[:]) @@ -214,14 +214,13 @@ func TestReplicaChecksumSHA512(t *testing.T) { require.NoError(t, storage.MVCCPut(ctx, eng, nil, key, ts, localTS, value, nil)) } - rh, err = repl.sha512(ctx, desc, eng, nil, roachpb.ChecksumMode_CHECK_FULL, lim) + rh, err = repl.sha512(ctx, desc, eng, roachpb.ChecksumMode_CHECK_FULL, lim) require.NoError(t, err) fmt.Fprintf(sb, "checksum%d: %x\n", i+1, rh.SHA512[:]) } - // Run another check to obtain a snapshot and stats for the final state. - kvSnapshot := roachpb.RaftSnapshotData{} - rh, err = repl.sha512(ctx, desc, eng, &kvSnapshot, roachpb.ChecksumMode_CHECK_FULL, lim) + // Run another check to obtain stats for the final state. + rh, err = repl.sha512(ctx, desc, eng, roachpb.ChecksumMode_CHECK_FULL, lim) require.NoError(t, err) jsonpb := protoutil.JSONPb{Indent: " "} @@ -229,15 +228,5 @@ func TestReplicaChecksumSHA512(t *testing.T) { require.NoError(t, err) fmt.Fprintf(sb, "stats: %s\n", string(json)) - fmt.Fprint(sb, "snapshot:\n") - for _, kv := range kvSnapshot.KV { - fmt.Fprintf(sb, " %s=%q\n", storage.MVCCKey{Key: kv.Key, Timestamp: kv.Timestamp}, kv.Value) - } - for _, rkv := range kvSnapshot.RangeKV { - fmt.Fprintf(sb, " %s=%q\n", - storage.MVCCRangeKey{StartKey: rkv.StartKey, EndKey: rkv.EndKey, Timestamp: rkv.Timestamp}, - rkv.Value) - } - echotest.Require(t, sb.String(), testutils.TestDataPath(t, "replica_consistency_sha512")) } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 2d24789d73ab..6be47e1ba076 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -53,7 +53,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/testutils/echotest" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" @@ -7551,112 +7550,6 @@ func TestNewReplicaCorruptionError(t *testing.T) { } } -func TestDiffRange(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - // TODO(tschottdorf): this test should really pass the data through a - // RocksDB engine to verify that the original snapshots sort correctly. - - require.Empty(t, DiffRange(nil, nil)) - - timestamp := hlc.Timestamp{WallTime: 1729, Logical: 1} - value := []byte("foo") - - // Construct the two snapshots. - leaderSnapshot := &roachpb.RaftSnapshotData{ - KV: []roachpb.RaftSnapshotData_KeyValue{ - {Key: []byte("a"), Timestamp: timestamp, Value: value}, - {Key: []byte("abc"), Timestamp: timestamp, Value: value}, - {Key: []byte("abcd"), Timestamp: timestamp, Value: value}, - {Key: []byte("abcde"), Timestamp: timestamp, Value: value}, - // Timestamps sort in descending order, with the notable exception - // of the zero timestamp, which sorts first. - {Key: []byte("abcdefg"), Timestamp: hlc.Timestamp{}, Value: value}, - {Key: []byte("abcdefg"), Timestamp: timestamp, Value: value}, - {Key: []byte("abcdefg"), Timestamp: timestamp.Add(0, -1), Value: value}, - {Key: []byte("abcdefgh"), Timestamp: timestamp, Value: value}, - {Key: []byte("x"), Timestamp: timestamp, Value: value}, - {Key: []byte("y"), Timestamp: timestamp, Value: value}, - // Both 'zeroleft' and 'zeroright' share the version at (1,1), but - // a zero timestamp (=meta) key pair exists on the leader or - // follower, respectively. - {Key: []byte("zeroleft"), Timestamp: hlc.Timestamp{}, Value: value}, - {Key: []byte("zeroleft"), Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1}, Value: value}, - {Key: []byte("zeroright"), Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1}, Value: value}, - }, - // Fragmented range keys. - RangeKV: []roachpb.RaftSnapshotData_RangeKeyValue{ - {StartKey: []byte("A"), EndKey: []byte("C"), Timestamp: timestamp}, - {StartKey: []byte("A"), EndKey: []byte("C"), Timestamp: timestamp.Add(0, -1)}, - {StartKey: []byte("C"), EndKey: []byte("F"), Timestamp: timestamp}, - {StartKey: []byte("P"), EndKey: []byte("R"), Timestamp: timestamp}, - {StartKey: []byte("S"), EndKey: []byte("T"), Timestamp: timestamp, Value: []byte{1}}, - {StartKey: []byte("X"), EndKey: []byte("Z"), Timestamp: timestamp}, - }, - } - - // No diff works. - require.Empty(t, DiffRange(leaderSnapshot, leaderSnapshot)) - - replicaSnapshot := &roachpb.RaftSnapshotData{ - KV: []roachpb.RaftSnapshotData_KeyValue{ - {Key: []byte("ab"), Timestamp: timestamp, Value: value}, - {Key: []byte("abc"), Timestamp: timestamp, Value: value}, - {Key: []byte("abcde"), Timestamp: timestamp, Value: value}, - {Key: []byte("abcdef"), Timestamp: timestamp, Value: value}, - {Key: []byte("abcdefg"), Timestamp: hlc.Timestamp{}, Value: value}, - {Key: []byte("abcdefg"), Timestamp: timestamp.Add(0, 1), Value: value}, - {Key: []byte("abcdefg"), Timestamp: timestamp, Value: value}, - {Key: []byte("abcdefgh"), Timestamp: timestamp, Value: value}, - {Key: []byte("x"), Timestamp: timestamp, Value: []byte("bar")}, - {Key: []byte("z"), Timestamp: timestamp, Value: value}, - {Key: []byte("zeroleft"), Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1}, Value: value}, - {Key: []byte("zeroright"), Timestamp: hlc.Timestamp{}, Value: value}, - {Key: []byte("zeroright"), Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1}, Value: value}, - }, - RangeKV: []roachpb.RaftSnapshotData_RangeKeyValue{ - {StartKey: []byte("A"), EndKey: []byte("C"), Timestamp: timestamp}, - {StartKey: []byte("E"), EndKey: []byte("G"), Timestamp: timestamp}, - {StartKey: []byte("Q"), EndKey: []byte("R"), Timestamp: timestamp}, - {StartKey: []byte("S"), EndKey: []byte("T"), Timestamp: timestamp, Value: []byte{2}}, - {StartKey: []byte("X"), EndKey: []byte("Z"), Timestamp: timestamp.Add(0, 1)}, - }, - } - - // The expected diff. - eDiff := ReplicaSnapshotDiffSlice{ - {LeaseHolder: true, Key: []byte("a"), Timestamp: timestamp, Value: value}, - {LeaseHolder: false, Key: []byte("ab"), Timestamp: timestamp, Value: value}, - {LeaseHolder: true, Key: []byte("abcd"), Timestamp: timestamp, Value: value}, - {LeaseHolder: false, Key: []byte("abcdef"), Timestamp: timestamp, Value: value}, - {LeaseHolder: false, Key: []byte("abcdefg"), Timestamp: timestamp.Add(0, 1), Value: value}, - {LeaseHolder: true, Key: []byte("abcdefg"), Timestamp: timestamp.Add(0, -1), Value: value}, - {LeaseHolder: true, Key: []byte("x"), Timestamp: timestamp, Value: value}, - {LeaseHolder: false, Key: []byte("x"), Timestamp: timestamp, Value: []byte("bar")}, - {LeaseHolder: true, Key: []byte("y"), Timestamp: timestamp, Value: value}, - {LeaseHolder: false, Key: []byte("z"), Timestamp: timestamp, Value: value}, - {LeaseHolder: true, Key: []byte("zeroleft"), Timestamp: hlc.Timestamp{}, Value: value}, - {LeaseHolder: false, Key: []byte("zeroright"), Timestamp: hlc.Timestamp{}, Value: value}, - - {LeaseHolder: true, Key: []byte("A"), EndKey: []byte("C"), Timestamp: timestamp.Add(0, -1)}, - {LeaseHolder: true, Key: []byte("C"), EndKey: []byte("F"), Timestamp: timestamp}, - {LeaseHolder: false, Key: []byte("E"), EndKey: []byte("G"), Timestamp: timestamp}, - {LeaseHolder: true, Key: []byte("P"), EndKey: []byte("R"), Timestamp: timestamp}, - {LeaseHolder: false, Key: []byte("Q"), EndKey: []byte("R"), Timestamp: timestamp}, - {LeaseHolder: true, Key: []byte("S"), EndKey: []byte("T"), Timestamp: timestamp, Value: []byte{1}}, - {LeaseHolder: false, Key: []byte("S"), EndKey: []byte("T"), Timestamp: timestamp, Value: []byte{2}}, - {LeaseHolder: false, Key: []byte("X"), EndKey: []byte("Z"), Timestamp: timestamp.Add(0, 1)}, - {LeaseHolder: true, Key: []byte("X"), EndKey: []byte("Z"), Timestamp: timestamp}, - } - diff := DiffRange(leaderSnapshot, replicaSnapshot) - require.Equal(t, eDiff, diff) - - // Assert the stringifed output. This is what the consistency checker - // will actually print. - echotest.Require(t, diff.String(), testutils.TestDataPath(t, "replica_consistency_diff")) -} - func TestSyncSnapshot(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -10539,7 +10432,7 @@ func TestReplicaServersideRefreshes(t *testing.T) { snap := tc.engine.NewSnapshot() defer snap.Close() res, err := tc.repl.sha512(ctx, *tc.repl.Desc(), tc.engine, - nil /* diff */, roachpb.ChecksumMode_CHECK_FULL, + roachpb.ChecksumMode_CHECK_FULL, quotapool.NewRateLimiter("ConsistencyQueue", quotapool.Limit(math.MaxFloat64), math.MaxInt64)) if err != nil { return hlc.Timestamp{}, err diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 2bbdc0b27177..39ab617d3dcc 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -1118,8 +1118,8 @@ var logRangeAndNodeEventsEnabled = func() *settings.BoolSetting { // ConsistencyTestingKnobs is a BatchEvalTestingKnobs struct used to control the // behavior of the consistency checker for tests. type ConsistencyTestingKnobs struct { - // If non-nil, OnBadChecksumFatal is called by CheckConsistency() (instead of - // calling log.Fatal) on a checksum mismatch. + // If non-nil, OnBadChecksumFatal is called on a replica with a mismatching + // checksum, instead of log.Fatal. OnBadChecksumFatal func(roachpb.StoreIdent) ConsistencyQueueResultHook func(response roachpb.CheckConsistencyResponse) diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 84165bf0ae01..098bbfd6ccd2 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -2776,13 +2776,6 @@ func TestStoreRemovePlaceholderOnRaftIgnored(t *testing.T) { t.Fatal(err) } - // Generate a minimal fake snapshot. - snapData := &roachpb.RaftSnapshotData{} - data, err := protoutil.Marshal(snapData) - if err != nil { - t.Fatal(err) - } - // Wrap the snapshot in a minimal header. The request will be dropped // because the Raft log index and term are less than the hard state written // above. @@ -2803,7 +2796,7 @@ func TestStoreRemovePlaceholderOnRaftIgnored(t *testing.T) { Message: raftpb.Message{ Type: raftpb.MsgSnap, Snapshot: raftpb.Snapshot{ - Data: data, + Data: []byte{}, Metadata: raftpb.SnapshotMetadata{ Index: 1, Term: 1, diff --git a/pkg/kv/kvserver/testdata/replica_consistency_sha512 b/pkg/kv/kvserver/testdata/replica_consistency_sha512 index c82d11f7634d..633127e52a1d 100644 --- a/pkg/kv/kvserver/testdata/replica_consistency_sha512 +++ b/pkg/kv/kvserver/testdata/replica_consistency_sha512 @@ -19,10 +19,3 @@ stats: { "rangeValCount": "2", "rangeValBytes": "9" } -snapshot: - "a"/0.000000002,0="" - "a"/0.000000001,0="\x00\x00\x00\x00\x03a1" - "b"/0.000000003,0="\x00\x00\x00\x04e\n\x02\b\x02\x00\x00\x00\x00\x03b3" - "i"/0,0="\x12\x04\b\x00\x10\x00\x18\x00 \x00(\x002\a\x00\x00\x00\x00\x03i0" - {p-q}/0.000000001,0="" - {x-z}/0.000000009,0="\x00\x00\x00\x04e\n\x02\b\b" diff --git a/pkg/roachpb/internal_raft.proto b/pkg/roachpb/internal_raft.proto index 61cb3cb26e0b..b4ed72670811 100644 --- a/pkg/roachpb/internal_raft.proto +++ b/pkg/roachpb/internal_raft.proto @@ -34,30 +34,6 @@ message RangeTombstone { (gogoproto.customname) = "NextReplicaID", (gogoproto.casttype) = "ReplicaID"]; } -// RaftSnapshotData is a historical vestige that used to carry snapshot data, -// but is now only used in CollectChecksumResponse to carry range KV data to -// generate diffs for checksum mismatches. -message RaftSnapshotData { - message KeyValue { - optional bytes key = 1; - optional bytes value = 2; - optional util.hlc.Timestamp timestamp = 3 [(gogoproto.nullable) = false]; - } - repeated KeyValue KV = 2 [(gogoproto.nullable) = false, - (gogoproto.customname) = "KV"]; - - message RangeKeyValue { - optional bytes start_key = 1; - optional bytes end_key = 2; - optional util.hlc.Timestamp timestamp = 3 [(gogoproto.nullable) = false]; - optional bytes value = 4; - } - repeated RangeKeyValue range_kv = 4 [(gogoproto.nullable) = false, - (gogoproto.customname) = "RangeKV"]; - - reserved 1, 3; -} - message RaftReplicaID { option (gogoproto.equal) = true; option (gogoproto.populate) = true; From dcd32738b79bda3496069160873a07261f7033ab Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Mon, 17 Oct 2022 10:19:37 -0400 Subject: [PATCH 2/4] roachtest: de-flake admission-control/tpcc-olap Fixes #89600. This test makes use of `workload querybench`, which is only available in the `workload` binary. This test started failing after \#89482 where we (attempted) to replace all uses of `./workload` with `./cockroach workload`. Release note: None --- pkg/cmd/roachtest/tests/admission_control_tpcc_overload.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/cmd/roachtest/tests/admission_control_tpcc_overload.go b/pkg/cmd/roachtest/tests/admission_control_tpcc_overload.go index 0117163f2845..60d23a449565 100644 --- a/pkg/cmd/roachtest/tests/admission_control_tpcc_overload.go +++ b/pkg/cmd/roachtest/tests/admission_control_tpcc_overload.go @@ -51,6 +51,10 @@ func (s tpccOLAPSpec) run(ctx context.Context, t test.Test, c cluster.Cluster) { ctx, t, c, tpccOptions{ Warehouses: s.Warehouses, SetupType: usingImport, }) + // We make use of querybench below, only available through the `workload` + // binary. + c.Put(ctx, t.DeprecatedWorkload(), "./workload", workloadNode) + const queryFileName = "queries.sql" // querybench expects the entire query to be on a single line. queryLine := `"` + strings.Replace(tpccOlapQuery, "\n", " ", -1) + `"` From bbdd88cad62ac6422a34bda0789ce19c8239cf62 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Mon, 17 Oct 2022 18:16:07 +0100 Subject: [PATCH 3/4] kvserver: make replica sha512 a function This function does not use the replica, so does not have to be a method. Epic: None Release note: None --- pkg/kv/kvserver/helpers_test.go | 3 +-- pkg/kv/kvserver/replica_consistency.go | 9 +++++---- pkg/kv/kvserver/replica_consistency_test.go | 7 +++---- pkg/kv/kvserver/replica_test.go | 2 +- 4 files changed, 10 insertions(+), 11 deletions(-) diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 9984c51c7871..8c861611b9b9 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -555,9 +555,8 @@ func WatchForDisappearingReplicas(t testing.TB, store *Store) { func ChecksumRange( ctx context.Context, desc roachpb.RangeDescriptor, snap storage.Reader, ) ([]byte, error) { - var r *Replica // TODO(pavelkalinnikov): make this less ugly. lim := quotapool.NewRateLimiter("test", 1<<30, 1<<30) - res, err := r.sha512(ctx, desc, snap, roachpb.ChecksumMode_CHECK_FULL, lim) + res, err := replicaSHA512(ctx, desc, snap, roachpb.ChecksumMode_CHECK_FULL, lim) if err != nil { return nil, err } diff --git a/pkg/kv/kvserver/replica_consistency.go b/pkg/kv/kvserver/replica_consistency.go index 9a527e8e30ee..f450fe4538f9 100644 --- a/pkg/kv/kvserver/replica_consistency.go +++ b/pkg/kv/kvserver/replica_consistency.go @@ -480,9 +480,10 @@ type replicaHash struct { PersistedMS, RecomputedMS enginepb.MVCCStats } -// sha512 computes the SHA512 hash of all the replica data at the snapshot. -// It will dump all the kv data into snapshot if it is provided. -func (*Replica) sha512( +// replicaSHA512 computes the SHA512 hash of the replica data at the given +// snapshot. Either the full replicated state is taken into account, or only +// RangeAppliedState (which includes MVCC stats), depending on the mode. +func replicaSHA512( ctx context.Context, desc roachpb.RangeDescriptor, snap storage.Reader, @@ -690,7 +691,7 @@ func (r *Replica) computeChecksumPostApply( ); err != nil { log.Errorf(ctx, "checksum collection did not join: %v", err) } else { - result, err := r.sha512(ctx, desc, snap, cc.Mode, r.store.consistencyLimiter) + result, err := replicaSHA512(ctx, desc, snap, cc.Mode, r.store.consistencyLimiter) if err != nil { log.Errorf(ctx, "checksum computation failed: %v", err) result = nil diff --git a/pkg/kv/kvserver/replica_consistency_test.go b/pkg/kv/kvserver/replica_consistency_test.go index dbee6d69a60e..eea1e318aa94 100644 --- a/pkg/kv/kvserver/replica_consistency_test.go +++ b/pkg/kv/kvserver/replica_consistency_test.go @@ -168,7 +168,6 @@ func TestReplicaChecksumSHA512(t *testing.T) { eng := storage.NewDefaultInMemForTesting() defer eng.Close() - repl := &Replica{} // We don't actually need the replica at all, just the method. desc := roachpb.RangeDescriptor{ RangeID: 1, StartKey: roachpb.RKey("a"), @@ -176,7 +175,7 @@ func TestReplicaChecksumSHA512(t *testing.T) { } // Hash the empty state. - rh, err := repl.sha512(ctx, desc, eng, roachpb.ChecksumMode_CHECK_FULL, lim) + rh, err := replicaSHA512(ctx, desc, eng, roachpb.ChecksumMode_CHECK_FULL, lim) require.NoError(t, err) fmt.Fprintf(sb, "checksum0: %x\n", rh.SHA512[:]) @@ -214,13 +213,13 @@ func TestReplicaChecksumSHA512(t *testing.T) { require.NoError(t, storage.MVCCPut(ctx, eng, nil, key, ts, localTS, value, nil)) } - rh, err = repl.sha512(ctx, desc, eng, roachpb.ChecksumMode_CHECK_FULL, lim) + rh, err = replicaSHA512(ctx, desc, eng, roachpb.ChecksumMode_CHECK_FULL, lim) require.NoError(t, err) fmt.Fprintf(sb, "checksum%d: %x\n", i+1, rh.SHA512[:]) } // Run another check to obtain stats for the final state. - rh, err = repl.sha512(ctx, desc, eng, roachpb.ChecksumMode_CHECK_FULL, lim) + rh, err = replicaSHA512(ctx, desc, eng, roachpb.ChecksumMode_CHECK_FULL, lim) require.NoError(t, err) jsonpb := protoutil.JSONPb{Indent: " "} diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 6be47e1ba076..c020ab0cc65f 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -10431,7 +10431,7 @@ func TestReplicaServersideRefreshes(t *testing.T) { // Regression test for #31870. snap := tc.engine.NewSnapshot() defer snap.Close() - res, err := tc.repl.sha512(ctx, *tc.repl.Desc(), tc.engine, + res, err := replicaSHA512(ctx, *tc.repl.Desc(), tc.engine, roachpb.ChecksumMode_CHECK_FULL, quotapool.NewRateLimiter("ConsistencyQueue", quotapool.Limit(math.MaxFloat64), math.MaxInt64)) if err != nil { From d8976d94f355f5ae3144e8e7ae7ba62fe083fe1d Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Fri, 14 Oct 2022 15:45:07 -0400 Subject: [PATCH 4/4] roachtest: Introduce a test to overwhelm nodes This test is here to check behavior of the system as the SQL load greatly exceeds what the nodes are able to handle. In the future we want to evaulate how this is handled, but today this will cause nodes to OOM. Informs #89142. Release note: None --- pkg/cmd/roachtest/tests/admission_control.go | 1 + .../tests/admission_control_tpcc_overload.go | 37 +++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/pkg/cmd/roachtest/tests/admission_control.go b/pkg/cmd/roachtest/tests/admission_control.go index 4d3835f14493..a6bf482efd9c 100644 --- a/pkg/cmd/roachtest/tests/admission_control.go +++ b/pkg/cmd/roachtest/tests/admission_control.go @@ -32,6 +32,7 @@ func registerAdmission(r registry.Registry) { registerMultiStoreOverload(r) registerSnapshotOverload(r) registerTPCCOverload(r) + registerTPCCSevereOverload(r) // TODO(irfansharif): Once registerMultiTenantFairness is unskipped and // observed to be non-flaky for 3-ish months, transfer ownership to the AC diff --git a/pkg/cmd/roachtest/tests/admission_control_tpcc_overload.go b/pkg/cmd/roachtest/tests/admission_control_tpcc_overload.go index 0117163f2845..01a3d8fac016 100644 --- a/pkg/cmd/roachtest/tests/admission_control_tpcc_overload.go +++ b/pkg/cmd/roachtest/tests/admission_control_tpcc_overload.go @@ -17,9 +17,11 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + "github.com/cockroachdb/cockroach/pkg/roachprod/install" "github.com/cockroachdb/cockroach/pkg/ts/tspb" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -144,3 +146,38 @@ func registerTPCCOverload(r registry.Registry) { }) } } + +// This test begins a ramping TPCC workload that will overwhelm the CRDB nodes. +// There is no way to "pass" this test since the 6 nodes can't possibly handle +// 10K warehouses. If they could handle this load, then the test should be +// changed to increase that count. The purpose of the test is to make sure that +// the nodes don't fail under unsustainable overload. As of today (v22.2), the +// CRDB nodes will eventually OOM around 3-4 hours through the ramp period. +func registerTPCCSevereOverload(r registry.Registry) { + r.Add(registry.TestSpec{ + Name: "admission-control/tpcc-severe-overload", + Owner: registry.OwnerAdmissionControl, + // TODO(abaptist): This test will require a lot of admission control work + // to pass. Just putting it here to make easy to run at any time. + Skip: "#89142", + Cluster: r.MakeClusterSpec(7, spec.CPU(8)), + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + roachNodes := c.Range(1, c.Spec().NodeCount-1) + workloadNode := c.Spec().NodeCount + + c.Put(ctx, t.Cockroach(), "./cockroach", c.All()) + c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(), roachNodes) + + t.Status("initializing (~1h)") + c.Run(ctx, c.Node(workloadNode), "./cockroach workload fixtures import tpcc --checks=false --warehouses=10000 {pgurl:1}") + + // This run passes through 4 "phases" + // 1) No admission control, low latencies (up to ~1500 warehouses). + // 2) Admission control delays, growing latencies (up to ~3000 warehouses). + // 3) High latencies (100s+), queues building (up to ~4500 warehouse). + // 4) Memory and goroutine unbounded growth with eventual node crashes (up to ~6000 warehouse). + t.Status("running workload (fails in ~3-4 hours)") + c.Run(ctx, c.Node(workloadNode), "./cockroach workload run tpcc --ramp=6h --tolerate-errors --warehouses=10000 '{pgurl:1-6}'") + }, + }) +}