From be8e2fa350583148e489ab8c547dbdae64701553 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 26 Oct 2022 13:59:40 +0100 Subject: [PATCH 1/2] util/quotapool: introduce infinite limit helper This commit adds quotapool.Inf() which can be used in tests to create an unlimited RateLimiter. Release note: None --- pkg/kv/kvserver/helpers_test.go | 2 +- pkg/kv/kvserver/replica_consistency_test.go | 2 +- pkg/kv/kvserver/replica_test.go | 2 +- pkg/util/quotapool/int_rate.go | 7 +++++++ 4 files changed, 10 insertions(+), 3 deletions(-) diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 8c861611b9b9..f4abb1d6bd0e 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -555,7 +555,7 @@ func WatchForDisappearingReplicas(t testing.TB, store *Store) { func ChecksumRange( ctx context.Context, desc roachpb.RangeDescriptor, snap storage.Reader, ) ([]byte, error) { - lim := quotapool.NewRateLimiter("test", 1<<30, 1<<30) + lim := quotapool.NewRateLimiter("test", quotapool.Inf(), 0) res, err := replicaSHA512(ctx, desc, snap, roachpb.ChecksumMode_CHECK_FULL, lim) if err != nil { return nil, err diff --git a/pkg/kv/kvserver/replica_consistency_test.go b/pkg/kv/kvserver/replica_consistency_test.go index eea1e318aa94..8fa80817f537 100644 --- a/pkg/kv/kvserver/replica_consistency_test.go +++ b/pkg/kv/kvserver/replica_consistency_test.go @@ -164,7 +164,7 @@ func TestReplicaChecksumSHA512(t *testing.T) { ctx := context.Background() sb := &strings.Builder{} - lim := quotapool.NewRateLimiter("rate", 1e9, 0) + lim := quotapool.NewRateLimiter("rate", quotapool.Inf(), 0) eng := storage.NewDefaultInMemForTesting() defer eng.Close() diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index d8f30bf176b1..cd6b45548f5d 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -10433,7 +10433,7 @@ func TestReplicaServersideRefreshes(t *testing.T) { defer snap.Close() res, err := replicaSHA512(ctx, *tc.repl.Desc(), tc.engine, roachpb.ChecksumMode_CHECK_FULL, - quotapool.NewRateLimiter("ConsistencyQueue", quotapool.Limit(math.MaxFloat64), math.MaxInt64)) + quotapool.NewRateLimiter("ConsistencyQueue", quotapool.Inf(), 0)) if err != nil { return hlc.Timestamp{}, err } diff --git a/pkg/util/quotapool/int_rate.go b/pkg/util/quotapool/int_rate.go index e8e7325a3b7e..0df950193b06 100644 --- a/pkg/util/quotapool/int_rate.go +++ b/pkg/util/quotapool/int_rate.go @@ -22,6 +22,11 @@ import ( // Limit defines a rate in terms of quota per second. type Limit float64 +// Inf returns the infinite rate limit, which allows any rate and bursts. +func Inf() Limit { + return Limit(math.Inf(1)) +} + // RateLimiter implements a token-bucket style rate limiter. // It has the added feature that quota acquired from the pool can be returned // in the case that they end up not getting used. @@ -34,6 +39,8 @@ type RateLimiter struct { // token bucket which has a maximum capacity of burst. If a request attempts to // acquire more than burst, it will block until the bucket is full and then // put the token bucket in debt. +// +// If rate == Inf() then any bursts are allowed, and acquisition does not block. func NewRateLimiter(name string, rate Limit, burst int64, options ...Option) *RateLimiter { rl := &RateLimiter{} tb := &TokenBucket{} From c6be36d31bfa9550f63c39f0db0aa067c8cd20db Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 19 Oct 2022 12:57:06 +0100 Subject: [PATCH 2/2] kvserver: cleanup replica checksum computation func This commit makes the replica checksum helper more convenient for tests, and simplifies the tests using it. Release note: None --- pkg/kv/kvserver/consistency_queue_test.go | 6 ++-- pkg/kv/kvserver/helpers_test.go | 12 -------- pkg/kv/kvserver/replica_consistency.go | 31 ++++++++++----------- pkg/kv/kvserver/replica_consistency_test.go | 15 +++++----- pkg/kv/kvserver/replica_test.go | 13 +++------ 5 files changed, 30 insertions(+), 47 deletions(-) diff --git a/pkg/kv/kvserver/consistency_queue_test.go b/pkg/kv/kvserver/consistency_queue_test.go index 7ade78a3b623..54a1669cfa80 100644 --- a/pkg/kv/kvserver/consistency_queue_test.go +++ b/pkg/kv/kvserver/consistency_queue_test.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -379,9 +380,10 @@ func TestCheckConsistencyInconsistent(t *testing.T) { require.NotNil(t, desc) // Compute a checksum over the content of the problematic range. - hash, err := kvserver.ChecksumRange(context.Background(), *desc, cpEng) + rd, err := kvserver.CalcReplicaDigest(context.Background(), *desc, cpEng, + roachpb.ChecksumMode_CHECK_FULL, quotapool.NewRateLimiter("test", quotapool.Inf(), 0)) require.NoError(t, err) - hashes[i] = hash + hashes[i] = rd.SHA512[:] } assert.Equal(t, hashes[0], hashes[2]) // s1 and s3 agree diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index f4abb1d6bd0e..1d4032700165 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -550,15 +550,3 @@ func WatchForDisappearingReplicas(t testing.TB, store *Store) { } } } - -// ChecksumRange returns a checksum over the KV data of the given range. -func ChecksumRange( - ctx context.Context, desc roachpb.RangeDescriptor, snap storage.Reader, -) ([]byte, error) { - lim := quotapool.NewRateLimiter("test", quotapool.Inf(), 0) - res, err := replicaSHA512(ctx, desc, snap, roachpb.ChecksumMode_CHECK_FULL, lim) - if err != nil { - return nil, err - } - return res.SHA512[:], nil -} diff --git a/pkg/kv/kvserver/replica_consistency.go b/pkg/kv/kvserver/replica_consistency.go index 3056823c5d13..ae19f73134c2 100644 --- a/pkg/kv/kvserver/replica_consistency.go +++ b/pkg/kv/kvserver/replica_consistency.go @@ -458,7 +458,7 @@ func (*Replica) checksumInitialWait(ctx context.Context) time.Duration { } // computeChecksumDone sends the checksum computation result to the receiver. -func (*Replica) computeChecksumDone(rc *replicaChecksum, result *replicaHash) { +func (*Replica) computeChecksumDone(rc *replicaChecksum, result *ReplicaDigest) { var c CollectChecksumResponse if result != nil { c.Checksum = result.SHA512[:] @@ -475,21 +475,23 @@ func (*Replica) computeChecksumDone(rc *replicaChecksum, result *replicaHash) { close(rc.result) } -type replicaHash struct { - SHA512 [sha512.Size]byte - PersistedMS, RecomputedMS enginepb.MVCCStats +// ReplicaDigest holds a summary of the replicated state on a replica. +type ReplicaDigest struct { + SHA512 [sha512.Size]byte + PersistedMS enginepb.MVCCStats + RecomputedMS enginepb.MVCCStats } -// replicaSHA512 computes the SHA512 hash of the replica data at the given -// snapshot. Either the full replicated state is taken into account, or only -// RangeAppliedState (which includes MVCC stats), depending on the mode. -func replicaSHA512( +// CalcReplicaDigest computes the SHA512 hash and MVCC stats of the replica data +// at the given snapshot. Depending on the mode, it either considers the full +// replicated state, or only RangeAppliedState (including MVCC stats). +func CalcReplicaDigest( ctx context.Context, desc roachpb.RangeDescriptor, snap storage.Reader, mode roachpb.ChecksumMode, limiter *quotapool.RateLimiter, -) (*replicaHash, error) { +) (*ReplicaDigest, error) { statsOnly := mode == roachpb.ChecksumMode_CHECK_STATS // Iterate over all the data in the range. @@ -573,21 +575,18 @@ func replicaSHA512( return err } - var ms enginepb.MVCCStats // In statsOnly mode, we hash only the RangeAppliedState. In regular mode, hash // all of the replicated key space. + var result ReplicaDigest if !statsOnly { - var err error - ms, err = rditer.ComputeStatsForRangeWithVisitors(&desc, snap, 0, /* nowNanos */ + ms, err := rditer.ComputeStatsForRangeWithVisitors(&desc, snap, 0, /* nowNanos */ pointKeyVisitor, rangeKeyVisitor) if err != nil { return nil, err } + result.RecomputedMS = ms } - var result replicaHash - result.RecomputedMS = ms - rangeAppliedState, err := stateloader.Make(desc.RangeID).LoadRangeAppliedState(ctx, snap) if err != nil { return nil, err @@ -691,7 +690,7 @@ func (r *Replica) computeChecksumPostApply( ); err != nil { log.Errorf(ctx, "checksum collection did not join: %v", err) } else { - result, err := replicaSHA512(ctx, desc, snap, cc.Mode, r.store.consistencyLimiter) + result, err := CalcReplicaDigest(ctx, desc, snap, cc.Mode, r.store.consistencyLimiter) if err != nil { log.Errorf(ctx, "checksum computation failed: %v", err) result = nil diff --git a/pkg/kv/kvserver/replica_consistency_test.go b/pkg/kv/kvserver/replica_consistency_test.go index 8fa80817f537..70dfa1107485 100644 --- a/pkg/kv/kvserver/replica_consistency_test.go +++ b/pkg/kv/kvserver/replica_consistency_test.go @@ -164,7 +164,6 @@ func TestReplicaChecksumSHA512(t *testing.T) { ctx := context.Background() sb := &strings.Builder{} - lim := quotapool.NewRateLimiter("rate", quotapool.Inf(), 0) eng := storage.NewDefaultInMemForTesting() defer eng.Close() @@ -175,9 +174,10 @@ func TestReplicaChecksumSHA512(t *testing.T) { } // Hash the empty state. - rh, err := replicaSHA512(ctx, desc, eng, roachpb.ChecksumMode_CHECK_FULL, lim) + unlim := quotapool.NewRateLimiter("test", quotapool.Inf(), 0) + rd, err := CalcReplicaDigest(ctx, desc, eng, roachpb.ChecksumMode_CHECK_FULL, unlim) require.NoError(t, err) - fmt.Fprintf(sb, "checksum0: %x\n", rh.SHA512[:]) + fmt.Fprintf(sb, "checksum0: %x\n", rd.SHA512) // We incrementally add writes, and check the checksums after each write to // make sure they differ such that each write affects the checksum. @@ -213,17 +213,16 @@ func TestReplicaChecksumSHA512(t *testing.T) { require.NoError(t, storage.MVCCPut(ctx, eng, nil, key, ts, localTS, value, nil)) } - rh, err = replicaSHA512(ctx, desc, eng, roachpb.ChecksumMode_CHECK_FULL, lim) + rd, err = CalcReplicaDigest(ctx, desc, eng, roachpb.ChecksumMode_CHECK_FULL, unlim) require.NoError(t, err) - fmt.Fprintf(sb, "checksum%d: %x\n", i+1, rh.SHA512[:]) + fmt.Fprintf(sb, "checksum%d: %x\n", i+1, rd.SHA512) } // Run another check to obtain stats for the final state. - rh, err = replicaSHA512(ctx, desc, eng, roachpb.ChecksumMode_CHECK_FULL, lim) + rd, err = CalcReplicaDigest(ctx, desc, eng, roachpb.ChecksumMode_CHECK_FULL, unlim) require.NoError(t, err) - jsonpb := protoutil.JSONPb{Indent: " "} - json, err := jsonpb.Marshal(&rh.RecomputedMS) + json, err := jsonpb.Marshal(&rd.RecomputedMS) require.NoError(t, err) fmt.Fprintf(sb, "stats: %s\n", string(json)) diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index cd6b45548f5d..457631b590de 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -10431,9 +10431,8 @@ func TestReplicaServersideRefreshes(t *testing.T) { // Regression test for #31870. snap := tc.engine.NewSnapshot() defer snap.Close() - res, err := replicaSHA512(ctx, *tc.repl.Desc(), tc.engine, - roachpb.ChecksumMode_CHECK_FULL, - quotapool.NewRateLimiter("ConsistencyQueue", quotapool.Inf(), 0)) + res, err := CalcReplicaDigest(ctx, *tc.repl.Desc(), tc.engine, roachpb.ChecksumMode_CHECK_FULL, + quotapool.NewRateLimiter("test", quotapool.Inf(), 0)) if err != nil { return hlc.Timestamp{}, err } @@ -10998,17 +10997,13 @@ func TestReplicaServersideRefreshes(t *testing.T) { for _, test := range testCases { t.Run(test.name, func(t *testing.T) { ts, err := test.setupFn() - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) ba, expTS := test.batchFn(ts) actualTS, err := send(ba) if !testutils.IsError(err, test.expErr) { t.Fatalf("expected error %q; got \"%v\"", test.expErr, err) } - if actualTS != expTS { - t.Fatalf("expected ts=%s; got %s", expTS, actualTS) - } + require.Equal(t, expTS, actualTS) }) } }