Skip to content

Commit

Permalink
kvserver: include MVCC range keys in replica consistency checks
Browse files Browse the repository at this point in the history
This patch adds handling of MVCC range keys in replica consistency
checks. These are iterated over as part of `MVCCStats` calculations and
hashed similarly to point keys.

Range keys will only exist after the version gate
`ExperimentalMVCCRangeTombstones` has been enabled, so a separate
version gate is not necessary.

Release note: None
  • Loading branch information
erikgrinaker committed Jun 4, 2022
1 parent 22df2b0 commit 877af3c
Show file tree
Hide file tree
Showing 7 changed files with 419 additions and 90 deletions.
59 changes: 56 additions & 3 deletions pkg/kv/kvserver/replica_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,9 +598,8 @@ func (*Replica) sha512(
var timestampBuf []byte
hasher := sha512.New()

// TODO(erikgrinaker): add a range key visitor to hash range keys.
pointKeyVisitor := func(unsafeKey storage.MVCCKey, unsafeValue []byte) error {
// Rate Limit the scan through the range
// Rate limit the scan through the range.
if err := limiter.WaitN(ctx, int64(len(unsafeKey.Key)+len(unsafeValue))); err != nil {
return err
}
Expand Down Expand Up @@ -643,6 +642,60 @@ func (*Replica) sha512(
return err
}

rangeKeyVisitor := func(rangeKV storage.MVCCRangeKeyValue) error {
// Rate limit the scan through the range.
err := limiter.WaitN(ctx,
int64(len(rangeKV.RangeKey.StartKey)+len(rangeKV.RangeKey.EndKey)+len(rangeKV.Value)))
if err != nil {
return err
}

if snapshot != nil {
// Add (a copy of) the range key into the debug message.
rkv := roachpb.RaftSnapshotData_RangeKeyValue{
Timestamp: rangeKV.RangeKey.Timestamp,
}
alloc, rkv.StartKey = alloc.Copy(rangeKV.RangeKey.StartKey, 0)
alloc, rkv.EndKey = alloc.Copy(rangeKV.RangeKey.EndKey, 0)
alloc, rkv.Value = alloc.Copy(rangeKV.Value, 0)
snapshot.RangeKV = append(snapshot.RangeKV, rkv)
}

// Encode the length of the start key and end key.
binary.LittleEndian.PutUint64(intBuf[:], uint64(len(rangeKV.RangeKey.StartKey)))
if _, err := hasher.Write(intBuf[:]); err != nil {
return err
}
binary.LittleEndian.PutUint64(intBuf[:], uint64(len(rangeKV.RangeKey.EndKey)))
if _, err := hasher.Write(intBuf[:]); err != nil {
return err
}
binary.LittleEndian.PutUint64(intBuf[:], uint64(len(rangeKV.Value)))
if _, err := hasher.Write(intBuf[:]); err != nil {
return err
}
if _, err := hasher.Write(rangeKV.RangeKey.StartKey); err != nil {
return err
}
if _, err := hasher.Write(rangeKV.RangeKey.EndKey); err != nil {
return err
}
legacyTimestamp = rangeKV.RangeKey.Timestamp.ToLegacyTimestamp()
if size := legacyTimestamp.Size(); size > cap(timestampBuf) {
timestampBuf = make([]byte, size)
} else {
timestampBuf = timestampBuf[:size]
}
if _, err := protoutil.MarshalTo(&legacyTimestamp, timestampBuf); err != nil {
return err
}
if _, err := hasher.Write(timestampBuf); err != nil {
return err
}
_, err = hasher.Write(rangeKV.Value)
return err
}

var ms enginepb.MVCCStats
// In statsOnly mode, we hash only the RangeAppliedState. In regular mode, hash
// all of the replicated key space.
Expand All @@ -660,7 +713,7 @@ func (*Replica) sha512(
UpperBound: span.End,
})
spanMS, err := storage.ComputeStatsForRangeWithVisitors(
iter, span.Start, span.End, 0 /* nowNanos */, pointKeyVisitor, nil /* rangeKeyVisitor */)
iter, span.Start, span.End, 0 /* nowNanos */, pointKeyVisitor, rangeKeyVisitor)
iter.Close()
if err != nil {
return nil, err
Expand Down
150 changes: 109 additions & 41 deletions pkg/kv/kvserver/replica_consistency_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package kvserver

import (
"bytes"
fmt "fmt"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
Expand All @@ -26,6 +27,7 @@ type ReplicaSnapshotDiff struct {
// holder.
LeaseHolder bool
Key roachpb.Key
EndKey roachpb.Key // only set for range keys (MVCCRangeKey)
Timestamp hlc.Timestamp
Value []byte
}
Expand All @@ -43,17 +45,33 @@ func (rsds ReplicaSnapshotDiffSlice) SafeFormat(buf redact.SafePrinter, _ rune)
// Lease holder (RHS) has something follower (LHS) does not have.
prefix = redact.SafeString("-")
}
const format = `%s%s %s
if len(d.EndKey) > 0 {
const rangeKVFormat = `%s%s %s
%s ts:%s
%s value:%s
%s raw from/to/ts/value: %x %x %x %x
`
buf.Printf(rangeKVFormat,
prefix, d.Timestamp, roachpb.Span{Key: d.Key, EndKey: d.EndKey},
prefix, d.Timestamp.GoTime(),
prefix, fmt.Sprintf("%q", d.Value), // just print the raw value for now
prefix, storage.EncodeMVCCKeyPrefix(d.Key), storage.EncodeMVCCKeyPrefix(d.EndKey),
storage.EncodeMVCCTimestampSuffix(d.Timestamp), d.Value)

} else {
const kvFormat = `%s%s %s
%s ts:%s
%s value:%s
%s raw mvcc_key/value: %x %x
`
mvccKey := storage.MVCCKey{Key: d.Key, Timestamp: d.Timestamp}
buf.Printf(format,
prefix, d.Timestamp, d.Key,
prefix, d.Timestamp.GoTime(),
prefix, SprintMVCCKeyValue(storage.MVCCKeyValue{Key: mvccKey, Value: d.Value}, false /* printKey */),
prefix, storage.EncodeMVCCKey(mvccKey), d.Value)
mvccKey := storage.MVCCKey{Key: d.Key, Timestamp: d.Timestamp}
buf.Printf(kvFormat,
prefix, d.Timestamp, d.Key,
prefix, d.Timestamp.GoTime(),
prefix, SprintMVCCKeyValue(
storage.MVCCKeyValue{Key: mvccKey, Value: d.Value}, false /* printKey */),
prefix, storage.EncodeMVCCKey(mvccKey), d.Value)
}
}
}

Expand All @@ -66,15 +84,22 @@ func diffRange(l, r *roachpb.RaftSnapshotData) ReplicaSnapshotDiffSlice {
if l == nil || r == nil {
return nil
}
var diff ReplicaSnapshotDiffSlice
diff = append(diff, diffKVs(l.KV, r.KV)...)
diff = append(diff, diffRangeKeys(l.RangeKV, r.RangeKV)...)
return diff
}

func diffKVs(l, r []roachpb.RaftSnapshotData_KeyValue) ReplicaSnapshotDiffSlice {
var diff []ReplicaSnapshotDiff
i, j := 0, 0
for {
var e, v roachpb.RaftSnapshotData_KeyValue
if i < len(l.KV) {
e = l.KV[i]
if i < len(l) {
e = l[i]
}
if j < len(r.KV) {
v = r.KV[j]
if j < len(r) {
v = r[j]
}

addLeaseHolder := func() {
Expand All @@ -88,52 +113,95 @@ func diffRange(l, r *roachpb.RaftSnapshotData) ReplicaSnapshotDiffSlice {

// Compare keys.
var comp int
// Check if it has finished traversing over all the lease holder keys.
if e.Key == nil {
if v.Key == nil {
// Done traversing over all the replica keys. Done!
break
} else {
comp = 1
}
if e.Key == nil && v.Key == nil {
// Done!
break
} else if e.Key == nil {
// Finished traversing over all the lease holder keys.
comp = 1
} else if v.Key == nil {
// Finished traversing over all the replica keys.
comp = -1
} else {
// Check if it has finished traversing over all the replica keys.
if v.Key == nil {
comp = -1
} else {
// Both lease holder and replica keys exist. Compare them.
comp = bytes.Compare(e.Key, v.Key)
}
// Both lease holder and replica keys exist. Compare them.
comp = storage.MVCCKey{Key: e.Key, Timestamp: e.Timestamp}.Compare(
storage.MVCCKey{Key: v.Key, Timestamp: v.Timestamp})
}

switch comp {
case -1:
addLeaseHolder()

case 1:
addReplica()
case 0:
// Timestamp sorting is weird. Timestamp{} sorts first, the
// remainder sort in descending order. See storage/engine/doc.go.
if !e.Timestamp.EqOrdering(v.Timestamp) {
if e.Timestamp.IsEmpty() {
addLeaseHolder()
} else if v.Timestamp.IsEmpty() {
addReplica()
} else if v.Timestamp.Less(e.Timestamp) {
addLeaseHolder()
} else {
addReplica()
}
} else if !bytes.Equal(e.Value, v.Value) {
if !bytes.Equal(e.Value, v.Value) {
addLeaseHolder()
addReplica()
} else {
// No diff; skip.
i++
j++
}
}
}
return diff
}

func diffRangeKeys(l, r []roachpb.RaftSnapshotData_RangeKeyValue) ReplicaSnapshotDiffSlice {
var diff []ReplicaSnapshotDiff
i, j := 0, 0
for {
var e, v roachpb.RaftSnapshotData_RangeKeyValue
if i < len(l) {
e = l[i]
}
if j < len(r) {
v = r[j]
}

addLeaseHolder := func() {
diff = append(diff, ReplicaSnapshotDiff{LeaseHolder: true, Key: e.StartKey, EndKey: e.EndKey,
Timestamp: e.Timestamp, Value: e.Value})
i++
}
addReplica := func() {
diff = append(diff, ReplicaSnapshotDiff{LeaseHolder: false, Key: v.StartKey, EndKey: v.EndKey,
Timestamp: v.Timestamp, Value: v.Value})
j++
}

// Compare keys.
var comp int
if e.StartKey == nil && e.EndKey == nil && v.StartKey == nil && v.EndKey == nil {
// Done!
break
} else if e.StartKey == nil && e.EndKey == nil {
// Finished traversing over all the lease holder keys.
comp = 1
} else if v.StartKey == nil && v.EndKey == nil {
// Finished traversing over all the replica keys.
comp = -1
} else {
// Both lease holder and replica keys exist. Compare them.
eMVCC := storage.MVCCRangeKey{StartKey: e.StartKey, EndKey: e.EndKey, Timestamp: e.Timestamp}
vMVCC := storage.MVCCRangeKey{StartKey: v.StartKey, EndKey: v.EndKey, Timestamp: v.Timestamp}
comp = eMVCC.Compare(vMVCC)
}

switch comp {
case -1:
addLeaseHolder()
case 1:
addReplica()

case 0:
if !bytes.Equal(e.Value, v.Value) {
addLeaseHolder()
addReplica()
} else {
// No diff; skip.
i++
j++
}
}
}
return diff
Expand Down
Loading

0 comments on commit 877af3c

Please sign in to comment.