Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
90313: kvserver: cleanup replica checksum computation func r=erikgrinaker a=pavelkalinnikov

This commit makes the replica checksum helper more convenient for tests, and simplifies the tests using it.

Follows up on cockroachdb#89813 (comment)

Epic: None
Release note: None

Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
craig[bot] and pav-kv committed Oct 26, 2022
2 parents 7dcb621 + c6be36d commit 7723487
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 47 deletions.
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/consistency_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -379,9 +380,10 @@ func TestCheckConsistencyInconsistent(t *testing.T) {
require.NotNil(t, desc)

// Compute a checksum over the content of the problematic range.
hash, err := kvserver.ChecksumRange(context.Background(), *desc, cpEng)
rd, err := kvserver.CalcReplicaDigest(context.Background(), *desc, cpEng,
roachpb.ChecksumMode_CHECK_FULL, quotapool.NewRateLimiter("test", quotapool.Inf(), 0))
require.NoError(t, err)
hashes[i] = hash
hashes[i] = rd.SHA512[:]
}

assert.Equal(t, hashes[0], hashes[2]) // s1 and s3 agree
Expand Down
12 changes: 0 additions & 12 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,15 +550,3 @@ func WatchForDisappearingReplicas(t testing.TB, store *Store) {
}
}
}

// ChecksumRange returns a checksum over the KV data of the given range.
func ChecksumRange(
ctx context.Context, desc roachpb.RangeDescriptor, snap storage.Reader,
) ([]byte, error) {
lim := quotapool.NewRateLimiter("test", 1<<30, 1<<30)
res, err := replicaSHA512(ctx, desc, snap, roachpb.ChecksumMode_CHECK_FULL, lim)
if err != nil {
return nil, err
}
return res.SHA512[:], nil
}
31 changes: 15 additions & 16 deletions pkg/kv/kvserver/replica_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ func (*Replica) checksumInitialWait(ctx context.Context) time.Duration {
}

// computeChecksumDone sends the checksum computation result to the receiver.
func (*Replica) computeChecksumDone(rc *replicaChecksum, result *replicaHash) {
func (*Replica) computeChecksumDone(rc *replicaChecksum, result *ReplicaDigest) {
var c CollectChecksumResponse
if result != nil {
c.Checksum = result.SHA512[:]
Expand All @@ -475,21 +475,23 @@ func (*Replica) computeChecksumDone(rc *replicaChecksum, result *replicaHash) {
close(rc.result)
}

type replicaHash struct {
SHA512 [sha512.Size]byte
PersistedMS, RecomputedMS enginepb.MVCCStats
// ReplicaDigest holds a summary of the replicated state on a replica.
type ReplicaDigest struct {
SHA512 [sha512.Size]byte
PersistedMS enginepb.MVCCStats
RecomputedMS enginepb.MVCCStats
}

// replicaSHA512 computes the SHA512 hash of the replica data at the given
// snapshot. Either the full replicated state is taken into account, or only
// RangeAppliedState (which includes MVCC stats), depending on the mode.
func replicaSHA512(
// CalcReplicaDigest computes the SHA512 hash and MVCC stats of the replica data
// at the given snapshot. Depending on the mode, it either considers the full
// replicated state, or only RangeAppliedState (including MVCC stats).
func CalcReplicaDigest(
ctx context.Context,
desc roachpb.RangeDescriptor,
snap storage.Reader,
mode roachpb.ChecksumMode,
limiter *quotapool.RateLimiter,
) (*replicaHash, error) {
) (*ReplicaDigest, error) {
statsOnly := mode == roachpb.ChecksumMode_CHECK_STATS

// Iterate over all the data in the range.
Expand Down Expand Up @@ -573,21 +575,18 @@ func replicaSHA512(
return err
}

var ms enginepb.MVCCStats
// In statsOnly mode, we hash only the RangeAppliedState. In regular mode, hash
// all of the replicated key space.
var result ReplicaDigest
if !statsOnly {
var err error
ms, err = rditer.ComputeStatsForRangeWithVisitors(&desc, snap, 0, /* nowNanos */
ms, err := rditer.ComputeStatsForRangeWithVisitors(&desc, snap, 0, /* nowNanos */
pointKeyVisitor, rangeKeyVisitor)
if err != nil {
return nil, err
}
result.RecomputedMS = ms
}

var result replicaHash
result.RecomputedMS = ms

rangeAppliedState, err := stateloader.Make(desc.RangeID).LoadRangeAppliedState(ctx, snap)
if err != nil {
return nil, err
Expand Down Expand Up @@ -691,7 +690,7 @@ func (r *Replica) computeChecksumPostApply(
); err != nil {
log.Errorf(ctx, "checksum collection did not join: %v", err)
} else {
result, err := replicaSHA512(ctx, desc, snap, cc.Mode, r.store.consistencyLimiter)
result, err := CalcReplicaDigest(ctx, desc, snap, cc.Mode, r.store.consistencyLimiter)
if err != nil {
log.Errorf(ctx, "checksum computation failed: %v", err)
result = nil
Expand Down
15 changes: 7 additions & 8 deletions pkg/kv/kvserver/replica_consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ func TestReplicaChecksumSHA512(t *testing.T) {

ctx := context.Background()
sb := &strings.Builder{}
lim := quotapool.NewRateLimiter("rate", 1e9, 0)
eng := storage.NewDefaultInMemForTesting()
defer eng.Close()

Expand All @@ -175,9 +174,10 @@ func TestReplicaChecksumSHA512(t *testing.T) {
}

// Hash the empty state.
rh, err := replicaSHA512(ctx, desc, eng, roachpb.ChecksumMode_CHECK_FULL, lim)
unlim := quotapool.NewRateLimiter("test", quotapool.Inf(), 0)
rd, err := CalcReplicaDigest(ctx, desc, eng, roachpb.ChecksumMode_CHECK_FULL, unlim)
require.NoError(t, err)
fmt.Fprintf(sb, "checksum0: %x\n", rh.SHA512[:])
fmt.Fprintf(sb, "checksum0: %x\n", rd.SHA512)

// We incrementally add writes, and check the checksums after each write to
// make sure they differ such that each write affects the checksum.
Expand Down Expand Up @@ -213,17 +213,16 @@ func TestReplicaChecksumSHA512(t *testing.T) {
require.NoError(t, storage.MVCCPut(ctx, eng, nil, key, ts, localTS, value, nil))
}

rh, err = replicaSHA512(ctx, desc, eng, roachpb.ChecksumMode_CHECK_FULL, lim)
rd, err = CalcReplicaDigest(ctx, desc, eng, roachpb.ChecksumMode_CHECK_FULL, unlim)
require.NoError(t, err)
fmt.Fprintf(sb, "checksum%d: %x\n", i+1, rh.SHA512[:])
fmt.Fprintf(sb, "checksum%d: %x\n", i+1, rd.SHA512)
}

// Run another check to obtain stats for the final state.
rh, err = replicaSHA512(ctx, desc, eng, roachpb.ChecksumMode_CHECK_FULL, lim)
rd, err = CalcReplicaDigest(ctx, desc, eng, roachpb.ChecksumMode_CHECK_FULL, unlim)
require.NoError(t, err)

jsonpb := protoutil.JSONPb{Indent: " "}
json, err := jsonpb.Marshal(&rh.RecomputedMS)
json, err := jsonpb.Marshal(&rd.RecomputedMS)
require.NoError(t, err)
fmt.Fprintf(sb, "stats: %s\n", string(json))

Expand Down
13 changes: 4 additions & 9 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10431,9 +10431,8 @@ func TestReplicaServersideRefreshes(t *testing.T) {
// Regression test for #31870.
snap := tc.engine.NewSnapshot()
defer snap.Close()
res, err := replicaSHA512(ctx, *tc.repl.Desc(), tc.engine,
roachpb.ChecksumMode_CHECK_FULL,
quotapool.NewRateLimiter("ConsistencyQueue", quotapool.Limit(math.MaxFloat64), math.MaxInt64))
res, err := CalcReplicaDigest(ctx, *tc.repl.Desc(), tc.engine, roachpb.ChecksumMode_CHECK_FULL,
quotapool.NewRateLimiter("test", quotapool.Inf(), 0))
if err != nil {
return hlc.Timestamp{}, err
}
Expand Down Expand Up @@ -10998,17 +10997,13 @@ func TestReplicaServersideRefreshes(t *testing.T) {
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
ts, err := test.setupFn()
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
ba, expTS := test.batchFn(ts)
actualTS, err := send(ba)
if !testutils.IsError(err, test.expErr) {
t.Fatalf("expected error %q; got \"%v\"", test.expErr, err)
}
if actualTS != expTS {
t.Fatalf("expected ts=%s; got %s", expTS, actualTS)
}
require.Equal(t, expTS, actualTS)
})
}
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/util/quotapool/int_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import (
// Limit defines a rate in terms of quota per second.
type Limit float64

// Inf returns the infinite rate limit, which allows any rate and bursts.
func Inf() Limit {
return Limit(math.Inf(1))
}

// RateLimiter implements a token-bucket style rate limiter.
// It has the added feature that quota acquired from the pool can be returned
// in the case that they end up not getting used.
Expand All @@ -34,6 +39,8 @@ type RateLimiter struct {
// token bucket which has a maximum capacity of burst. If a request attempts to
// acquire more than burst, it will block until the bucket is full and then
// put the token bucket in debt.
//
// If rate == Inf() then any bursts are allowed, and acquisition does not block.
func NewRateLimiter(name string, rate Limit, burst int64, options ...Option) *RateLimiter {
rl := &RateLimiter{}
tb := &TokenBucket{}
Expand Down

0 comments on commit 7723487

Please sign in to comment.