Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: include MVCC range keys in replica consistency checks #78104

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 56 additions & 3 deletions pkg/kv/kvserver/replica_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,9 +598,8 @@ func (*Replica) sha512(
var timestampBuf []byte
hasher := sha512.New()

// TODO(erikgrinaker): add a range key visitor to hash range keys.
pointKeyVisitor := func(unsafeKey storage.MVCCKey, unsafeValue []byte) error {
// Rate Limit the scan through the range
// Rate limit the scan through the range.
if err := limiter.WaitN(ctx, int64(len(unsafeKey.Key)+len(unsafeValue))); err != nil {
return err
}
Expand Down Expand Up @@ -643,6 +642,60 @@ func (*Replica) sha512(
return err
}

rangeKeyVisitor := func(rangeKV storage.MVCCRangeKeyValue) error {
// Rate limit the scan through the range.
err := limiter.WaitN(ctx,
int64(len(rangeKV.RangeKey.StartKey)+len(rangeKV.RangeKey.EndKey)+len(rangeKV.Value)))
if err != nil {
return err
}

if snapshot != nil {
// Add (a copy of) the range key into the debug message.
rkv := roachpb.RaftSnapshotData_RangeKeyValue{
Timestamp: rangeKV.RangeKey.Timestamp,
}
alloc, rkv.StartKey = alloc.Copy(rangeKV.RangeKey.StartKey, 0)
alloc, rkv.EndKey = alloc.Copy(rangeKV.RangeKey.EndKey, 0)
alloc, rkv.Value = alloc.Copy(rangeKV.Value, 0)
snapshot.RangeKV = append(snapshot.RangeKV, rkv)
}

// Encode the length of the start key and end key.
binary.LittleEndian.PutUint64(intBuf[:], uint64(len(rangeKV.RangeKey.StartKey)))
if _, err := hasher.Write(intBuf[:]); err != nil {
return err
}
binary.LittleEndian.PutUint64(intBuf[:], uint64(len(rangeKV.RangeKey.EndKey)))
if _, err := hasher.Write(intBuf[:]); err != nil {
return err
}
binary.LittleEndian.PutUint64(intBuf[:], uint64(len(rangeKV.Value)))
if _, err := hasher.Write(intBuf[:]); err != nil {
return err
}
if _, err := hasher.Write(rangeKV.RangeKey.StartKey); err != nil {
return err
}
if _, err := hasher.Write(rangeKV.RangeKey.EndKey); err != nil {
return err
}
legacyTimestamp = rangeKV.RangeKey.Timestamp.ToLegacyTimestamp()
aliher1911 marked this conversation as resolved.
Show resolved Hide resolved
if size := legacyTimestamp.Size(); size > cap(timestampBuf) {
timestampBuf = make([]byte, size)
} else {
timestampBuf = timestampBuf[:size]
}
if _, err := protoutil.MarshalTo(&legacyTimestamp, timestampBuf); err != nil {
return err
}
if _, err := hasher.Write(timestampBuf); err != nil {
return err
}
_, err = hasher.Write(rangeKV.Value)
return err
}

var ms enginepb.MVCCStats
// In statsOnly mode, we hash only the RangeAppliedState. In regular mode, hash
// all of the replicated key space.
Expand All @@ -660,7 +713,7 @@ func (*Replica) sha512(
UpperBound: span.End,
})
spanMS, err := storage.ComputeStatsForRangeWithVisitors(
iter, span.Start, span.End, 0 /* nowNanos */, pointKeyVisitor, nil /* rangeKeyVisitor */)
iter, span.Start, span.End, 0 /* nowNanos */, pointKeyVisitor, rangeKeyVisitor)
iter.Close()
if err != nil {
return nil, err
Expand Down
150 changes: 109 additions & 41 deletions pkg/kv/kvserver/replica_consistency_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package kvserver

import (
"bytes"
fmt "fmt"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
Expand All @@ -26,6 +27,7 @@ type ReplicaSnapshotDiff struct {
// holder.
LeaseHolder bool
Key roachpb.Key
EndKey roachpb.Key // only set for range keys (MVCCRangeKey)
Timestamp hlc.Timestamp
Value []byte
}
Expand All @@ -43,17 +45,33 @@ func (rsds ReplicaSnapshotDiffSlice) SafeFormat(buf redact.SafePrinter, _ rune)
// Lease holder (RHS) has something follower (LHS) does not have.
prefix = redact.SafeString("-")
}
const format = `%s%s %s
if len(d.EndKey) > 0 {
const rangeKVFormat = `%s%s %s
%s ts:%s
%s value:%s
%s raw from/to/ts/value: %x %x %x %x
`
buf.Printf(rangeKVFormat,
prefix, d.Timestamp, roachpb.Span{Key: d.Key, EndKey: d.EndKey},
prefix, d.Timestamp.GoTime(),
prefix, fmt.Sprintf("%q", d.Value), // just print the raw value for now
erikgrinaker marked this conversation as resolved.
Show resolved Hide resolved
prefix, storage.EncodeMVCCKeyPrefix(d.Key), storage.EncodeMVCCKeyPrefix(d.EndKey),
storage.EncodeMVCCTimestampSuffix(d.Timestamp), d.Value)

} else {
const kvFormat = `%s%s %s
%s ts:%s
%s value:%s
%s raw mvcc_key/value: %x %x
`
mvccKey := storage.MVCCKey{Key: d.Key, Timestamp: d.Timestamp}
buf.Printf(format,
prefix, d.Timestamp, d.Key,
prefix, d.Timestamp.GoTime(),
prefix, SprintMVCCKeyValue(storage.MVCCKeyValue{Key: mvccKey, Value: d.Value}, false /* printKey */),
prefix, storage.EncodeMVCCKey(mvccKey), d.Value)
mvccKey := storage.MVCCKey{Key: d.Key, Timestamp: d.Timestamp}
buf.Printf(kvFormat,
prefix, d.Timestamp, d.Key,
prefix, d.Timestamp.GoTime(),
prefix, SprintMVCCKeyValue(
storage.MVCCKeyValue{Key: mvccKey, Value: d.Value}, false /* printKey */),
prefix, storage.EncodeMVCCKey(mvccKey), d.Value)
}
}
}

Expand All @@ -66,15 +84,22 @@ func diffRange(l, r *roachpb.RaftSnapshotData) ReplicaSnapshotDiffSlice {
if l == nil || r == nil {
return nil
}
var diff ReplicaSnapshotDiffSlice
diff = append(diff, diffKVs(l.KV, r.KV)...)
diff = append(diff, diffRangeKeys(l.RangeKV, r.RangeKV)...)
return diff
}

func diffKVs(l, r []roachpb.RaftSnapshotData_KeyValue) ReplicaSnapshotDiffSlice {
var diff []ReplicaSnapshotDiff
i, j := 0, 0
for {
var e, v roachpb.RaftSnapshotData_KeyValue
if i < len(l.KV) {
e = l.KV[i]
if i < len(l) {
e = l[i]
}
if j < len(r.KV) {
v = r.KV[j]
if j < len(r) {
v = r[j]
}

addLeaseHolder := func() {
Expand All @@ -88,52 +113,95 @@ func diffRange(l, r *roachpb.RaftSnapshotData) ReplicaSnapshotDiffSlice {

// Compare keys.
var comp int
// Check if it has finished traversing over all the lease holder keys.
if e.Key == nil {
if v.Key == nil {
// Done traversing over all the replica keys. Done!
break
} else {
comp = 1
}
if e.Key == nil && v.Key == nil {
// Done!
break
} else if e.Key == nil {
// Finished traversing over all the lease holder keys.
comp = 1
} else if v.Key == nil {
// Finished traversing over all the replica keys.
comp = -1
} else {
// Check if it has finished traversing over all the replica keys.
if v.Key == nil {
comp = -1
} else {
// Both lease holder and replica keys exist. Compare them.
comp = bytes.Compare(e.Key, v.Key)
}
// Both lease holder and replica keys exist. Compare them.
comp = storage.MVCCKey{Key: e.Key, Timestamp: e.Timestamp}.Compare(
storage.MVCCKey{Key: v.Key, Timestamp: v.Timestamp})
}

switch comp {
case -1:
addLeaseHolder()

case 1:
addReplica()
case 0:
// Timestamp sorting is weird. Timestamp{} sorts first, the
// remainder sort in descending order. See storage/engine/doc.go.
if !e.Timestamp.EqOrdering(v.Timestamp) {
if e.Timestamp.IsEmpty() {
addLeaseHolder()
} else if v.Timestamp.IsEmpty() {
addReplica()
} else if v.Timestamp.Less(e.Timestamp) {
addLeaseHolder()
} else {
addReplica()
}
} else if !bytes.Equal(e.Value, v.Value) {
if !bytes.Equal(e.Value, v.Value) {
addLeaseHolder()
addReplica()
} else {
// No diff; skip.
i++
j++
}
}
}
return diff
}

func diffRangeKeys(l, r []roachpb.RaftSnapshotData_RangeKeyValue) ReplicaSnapshotDiffSlice {
var diff []ReplicaSnapshotDiff
i, j := 0, 0
for {
var e, v roachpb.RaftSnapshotData_RangeKeyValue
if i < len(l) {
e = l[i]
}
if j < len(r) {
v = r[j]
}

addLeaseHolder := func() {
diff = append(diff, ReplicaSnapshotDiff{LeaseHolder: true, Key: e.StartKey, EndKey: e.EndKey,
Timestamp: e.Timestamp, Value: e.Value})
i++
}
addReplica := func() {
diff = append(diff, ReplicaSnapshotDiff{LeaseHolder: false, Key: v.StartKey, EndKey: v.EndKey,
Timestamp: v.Timestamp, Value: v.Value})
j++
}

// Compare keys.
var comp int
if e.StartKey == nil && e.EndKey == nil && v.StartKey == nil && v.EndKey == nil {
// Done!
break
} else if e.StartKey == nil && e.EndKey == nil {
// Finished traversing over all the lease holder keys.
comp = 1
} else if v.StartKey == nil && v.EndKey == nil {
// Finished traversing over all the replica keys.
comp = -1
} else {
// Both lease holder and replica keys exist. Compare them.
eMVCC := storage.MVCCRangeKey{StartKey: e.StartKey, EndKey: e.EndKey, Timestamp: e.Timestamp}
vMVCC := storage.MVCCRangeKey{StartKey: v.StartKey, EndKey: v.EndKey, Timestamp: v.Timestamp}
comp = eMVCC.Compare(vMVCC)
}

switch comp {
case -1:
addLeaseHolder()
case 1:
addReplica()

case 0:
if !bytes.Equal(e.Value, v.Value) {
addLeaseHolder()
addReplica()
} else {
// No diff; skip.
i++
j++
}
}
}
return diff
Expand Down
Loading