Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: add new CheckSSTConflicts randomized test #98408

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2116,6 +2116,7 @@ GO_TARGETS = [
"//pkg/storage/enginepb:enginepb_test",
"//pkg/storage/fs:fs",
"//pkg/storage/fs:fs_test",
"//pkg/storage/meta:meta",
"//pkg/storage/metamorphic:metamorphic",
"//pkg/storage/metamorphic:metamorphic_test",
"//pkg/storage/pebbleiter:pebbleiter",
Expand Down
30 changes: 15 additions & 15 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,15 @@ func EvalAddSSTable(
args := cArgs.Args.(*kvpb.AddSSTableRequest)
h := cArgs.Header
ms := cArgs.Stats
start, end := storage.MVCCKey{Key: args.Key}, storage.MVCCKey{Key: args.EndKey}
start, end := args.Key, args.EndKey
sst := args.Data
sstToReqTS := args.SSTTimestampToRequestTimestamp

var span *tracing.Span
var err error
ctx, span = tracing.ChildSpan(ctx, "AddSSTable")
defer span.Finish()
log.Eventf(ctx, "evaluating AddSSTable [%s,%s)", start.Key, end.Key)
log.Eventf(ctx, "evaluating AddSSTable [%s,%s)", start, end)

if min := addSSTableCapacityRemainingLimit.Get(&cArgs.EvalCtx.ClusterSettings().SV); min > 0 {
cap, err := cArgs.EvalCtx.GetEngineCapacity()
Expand Down Expand Up @@ -214,7 +214,7 @@ func EvalAddSSTable(
// TODO(dt): use a quotapool.
conc := int(AddSSTableRewriteConcurrency.Get(&cArgs.EvalCtx.ClusterSettings().SV))
log.VEventf(ctx, 2, "rewriting timestamps for SSTable [%s,%s) from %s to %s",
start.Key, end.Key, sstToReqTS, h.Timestamp)
start, end, sstToReqTS, h.Timestamp)
sst, sstReqStatsDelta, err = storage.UpdateSSTTimestamps(
ctx, st, sst, sstToReqTS, h.Timestamp, conc, args.MVCCStats)
if err != nil {
Expand Down Expand Up @@ -243,7 +243,7 @@ func EvalAddSSTable(
// as it avoids expensive seeks with index/data block loading in the common
// case of no conflicts.
usePrefixSeek := false
bytes, err := cArgs.EvalCtx.GetApproximateDiskBytes(start.Key, end.Key)
bytes, err := cArgs.EvalCtx.GetApproximateDiskBytes(start, end)
if err == nil {
usePrefixSeek = bytes > prefixSeekCollisionCheckRatio*uint64(len(sst))
}
Expand All @@ -258,7 +258,7 @@ func EvalAddSSTable(
leftPeekBound, rightPeekBound := rangeTombstonePeekBounds(
args.Key, args.EndKey, desc.StartKey.AsRawKey(), desc.EndKey.AsRawKey())

log.VEventf(ctx, 2, "checking conflicts for SSTable [%s,%s)", start.Key, end.Key)
log.VEventf(ctx, 2, "checking conflicts for SSTable [%s,%s)", start, end)
statsDelta, err = storage.CheckSSTConflicts(ctx, sst, readWriter, start, end, leftPeekBound, rightPeekBound,
args.DisallowShadowing, args.DisallowShadowingBelow, sstTimestamp, maxIntents, usePrefixSeek)
statsDelta.Add(sstReqStatsDelta)
Expand All @@ -270,8 +270,8 @@ func EvalAddSSTable(
// If not checking for MVCC conflicts, at least check for separated intents.
// The caller is expected to make sure there are no writers across the span,
// and thus no or few intents, so this is cheap in the common case.
log.VEventf(ctx, 2, "checking conflicting intents for SSTable [%s,%s)", start.Key, end.Key)
intents, err := storage.ScanIntents(ctx, readWriter, start.Key, end.Key, maxIntents, 0)
log.VEventf(ctx, 2, "checking conflicting intents for SSTable [%s,%s)", start, end)
intents, err := storage.ScanIntents(ctx, readWriter, start, end, maxIntents, 0)
if err != nil {
return result.Result{}, errors.Wrap(err, "scanning intents")
} else if len(intents) > 0 {
Expand All @@ -297,9 +297,9 @@ func EvalAddSSTable(
if ok, err := sstIter.Valid(); err != nil {
return result.Result{}, err
} else if ok {
if unsafeKey := sstIter.UnsafeKey(); unsafeKey.Less(start) {
if unsafeKey := sstIter.UnsafeKey(); unsafeKey.Less(storage.MVCCKey{Key: start}) {
return result.Result{}, errors.Errorf("first key %s not in request range [%s,%s)",
unsafeKey.Key, start.Key, end.Key)
unsafeKey.Key, start, end)
}
}

Expand All @@ -308,19 +308,19 @@ func EvalAddSSTable(
if args.MVCCStats != nil {
stats = *args.MVCCStats
} else {
log.VEventf(ctx, 2, "computing MVCCStats for SSTable [%s,%s)", start.Key, end.Key)
log.VEventf(ctx, 2, "computing MVCCStats for SSTable [%s,%s)", start, end)
stats, err = storage.ComputeStatsForIter(sstIter, h.Timestamp.WallTime)
if err != nil {
return result.Result{}, errors.Wrap(err, "computing SSTable MVCC stats")
}
}

sstIter.SeekGE(end)
sstIter.SeekGE(storage.MVCCKey{Key: end})
if ok, err := sstIter.Valid(); err != nil {
return result.Result{}, err
} else if ok {
return result.Result{}, errors.Errorf("last key %s not in request range [%s,%s)",
sstIter.UnsafeKey(), start.Key, end.Key)
sstIter.UnsafeKey(), start, end)
}

// The above MVCCStats represents what is in this new SST.
Expand Down Expand Up @@ -390,7 +390,7 @@ func EvalAddSSTable(
var mvccHistoryMutation *kvserverpb.ReplicatedEvalResult_MVCCHistoryMutation
if sstToReqTS.IsEmpty() {
mvccHistoryMutation = &kvserverpb.ReplicatedEvalResult_MVCCHistoryMutation{
Spans: []roachpb.Span{{Key: start.Key, EndKey: end.Key}},
Spans: []roachpb.Span{{Key: start, EndKey: end}},
}
}

Expand All @@ -414,7 +414,7 @@ func EvalAddSSTable(
},
)
defer existingIter.Close()
existingIter.SeekGE(end)
existingIter.SeekGE(storage.MVCCKey{Key: end})
if ok, err := existingIter.Valid(); err != nil {
return result.Result{}, errors.Wrap(err, "error while searching for non-empty span start")
} else if ok {
Expand Down Expand Up @@ -522,7 +522,7 @@ func EvalAddSSTable(
AddSSTable: &kvserverpb.ReplicatedEvalResult_AddSSTable{
Data: sst,
CRC32: util.CRC32(sst),
Span: roachpb.Span{Key: start.Key, EndKey: end.Key},
Span: roachpb.Span{Key: start, EndKey: end},
AtWriteTimestamp: sstToReqTS.IsSet(),
},
MVCCHistoryMutation: mvccHistoryMutation,
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ go_test(
"//pkg/sql/sem/tree",
"//pkg/storage/enginepb",
"//pkg/storage/fs",
"//pkg/storage/meta",
"//pkg/testutils",
"//pkg/testutils/datapathutils",
"//pkg/testutils/echotest",
Expand Down
10 changes: 4 additions & 6 deletions pkg/storage/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1724,7 +1724,7 @@ func runCheckSSTConflicts(
st := cluster.MakeTestingClusterSettings()
sstFile := &MemObject{}
sstWriter := MakeIngestionSSTWriter(ctx, st, sstFile)
var sstStart, sstEnd MVCCKey
var sstStart, sstEnd roachpb.Key
lastKeyNum := -1
lastKeyCounter := 0
for i := 0; i < numSstKeys; i++ {
Expand All @@ -1739,11 +1739,9 @@ func runCheckSSTConflicts(
key := roachpb.Key(encoding.EncodeUvarintAscending(encoding.EncodeUvarintAscending(keyBuf[:4], uint64(keyNum)), uint64(1+lastKeyCounter)))
mvccKey := MVCCKey{Key: key, Timestamp: hlc.Timestamp{WallTime: int64(numVersions + 3)}}
if i == 0 {
sstStart.Key = append([]byte(nil), mvccKey.Key...)
sstStart.Timestamp = mvccKey.Timestamp
sstStart = append([]byte(nil), mvccKey.Key...)
} else if i == numSstKeys-1 {
sstEnd.Key = append([]byte(nil), mvccKey.Key...)
sstEnd.Timestamp = mvccKey.Timestamp
sstEnd = append([]byte(nil), mvccKey.Key...)
}
require.NoError(b, sstWriter.PutMVCC(mvccKey, value))
lastKeyNum = keyNum
Expand All @@ -1752,7 +1750,7 @@ func runCheckSSTConflicts(

b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := CheckSSTConflicts(context.Background(), sstFile.Data(), eng, sstStart, sstEnd, sstStart.Key, sstEnd.Key.Next(), false, hlc.Timestamp{}, hlc.Timestamp{}, math.MaxInt64, usePrefixSeek)
_, err := CheckSSTConflicts(context.Background(), sstFile.Data(), eng, sstStart, sstEnd, sstStart, sstEnd.Next(), false, hlc.Timestamp{}, hlc.Timestamp{}, math.MaxInt64, usePrefixSeek)
require.NoError(b, err)
}
}
Expand Down
55 changes: 55 additions & 0 deletions pkg/storage/enginepb/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
package enginepb

import (
"bytes"
"fmt"
"math"
"regexp"
"sort"

"github.com/cockroachdb/redact"
Expand Down Expand Up @@ -219,6 +222,58 @@ func (ms *MVCCStats) Subtract(oms MVCCStats) {
ms.AbortSpanBytes -= oms.AbortSpanBytes
}

var mvccStatsRegexp = regexp.MustCompile(`(\w+):(-?\d+)`)

// Formatted formats MVCC stats, returning a string.
func (ms MVCCStats) Formatted(delta bool) string {
// Split stats into field pairs. Subindex 1 is key, 2 is value.
fields := mvccStatsRegexp.FindAllStringSubmatch(ms.String(), -1)

// Sort some fields in preferred order, keeping the rest as-is at the end.
//
// TODO(erikgrinaker): Consider just reordering the MVCCStats struct fields
// instead, which determines the order of MVCCStats.String().
order := []string{"key_count", "key_bytes", "val_count", "val_bytes",
"range_key_count", "range_key_bytes", "range_val_count", "range_val_bytes",
"live_count", "live_bytes", "gc_bytes_age",
"intent_count", "intent_bytes", "separated_intent_count", "intent_age"}
sort.SliceStable(fields, func(i, j int) bool {
for _, name := range order {
if fields[i][1] == name {
return true
} else if fields[j][1] == name {
return false
}
}
return false
})

// Format and output fields.
var buf bytes.Buffer
for _, field := range fields {
key, value := field[1], field[2]

// Always skip zero-valued fields and LastUpdateNanos.
if value == "0" || key == "last_update_nanos" {
continue
}

if buf.Len() > 0 {
fmt.Fprint(&buf, " ")
}
fmt.Fprint(&buf, key, "=")
if delta && value[0] != '-' {
// prefix unsigned deltas with +
fmt.Fprint(&buf, "+")
}
fmt.Fprint(&buf, value)
}
if buf.Len() == 0 && delta {
return "no change"
}
return buf.String()
}

// IsInline returns true if the value is inlined in the metadata.
func (meta MVCCMetadata) IsInline() bool {
return meta.RawBytes != nil
Expand Down
9 changes: 9 additions & 0 deletions pkg/storage/meta/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "meta",
srcs = ["run.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/storage/meta",
visibility = ["//visibility:public"],
deps = ["@com_github_stretchr_testify//require"],
)
Loading