Skip to content

Commit

Permalink
kvserver: use a threshold on MVCC stats discrepancy to fall back to a…
Browse files Browse the repository at this point in the history
…ccurate-stats split

This patch is a followup to cockroachdb#119499, and allows to add thresholds on the
number of keys and bytes of the difference between the pre-split MVCC
stats (retrieved in AdminSplit) and the stats when the split holds
latches (retrieved in splitTrigger). This difference corresponds to
writes concurrent with the split. If the difference is too large, the
split falls back to computing LHS and RHS stats accurately. The
difference is computed only for stats corresponding to user-data;
system stats are always kept accurate.

These thresholds are tunable by two new cluster settings,
MaxMVCCStatCountDiff and MaxMVCCStatBytesDiff, which denote the maximum
number of entities (e.g. keys, intents) and maximum number of bytes
(e.g. value bytes, range value bytes), respectively, that are
acceptable as the difference between the pre- and post-split values of
an individual stat. The former defaults to 1000, and the latter to
512KB (0.1% of the maximum range size).

Fixes: cockroachdb#119503

Release note: None
  • Loading branch information
miraradeva committed Mar 11, 2024
1 parent b59305c commit fe85c85
Show file tree
Hide file tree
Showing 7 changed files with 265 additions and 105 deletions.
27 changes: 27 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,30 @@ var EnableMVCCStatsRecomputationInSplit = settings.RegisterBoolSetting(
"to prevent stats estimates from drifting",
util.ConstantWithMetamorphicTestBool("kv.split.mvcc_stats_recomputation.enabled", true))

// MaxMVCCStatCountDiff defines the maximum number of units (e.g. keys or
// intents) that is acceptable for an individual MVCC stat to diverge from the
// real value when computed during splits. If this threshold is
// exceeded, the split will fall back to computing 100% accurate stats.
// It takes effect only if kv.split.estimated_mvcc_stats.enabled is true.
var MaxMVCCStatCountDiff = settings.RegisterIntSetting(
settings.SystemVisible,
"kv.split.max_mvcc_stat_count_diff",
"defines the max number of units that are acceptable for an individual "+
"MVCC stat to diverge; needs kv.split.estimated_mvcc_stats.enabled to be true",
1000)

// MaxMVCCStatBytesDiff defines the maximum number of bytes (e.g. keys bytes or
// intents bytes) that is acceptable for an individual MVCC stat to diverge
// from the real value when computed during splits. If this threshold is
// exceeded, the split will fall back to computing 100% accurate stats.
// It takes effect only if kv.split.estimated_mvcc_stats.enabled is true.
var MaxMVCCStatBytesDiff = settings.RegisterIntSetting(
settings.SystemVisible,
"kv.split.max_mvcc_stat_bytes_diff",
"defines the max number of bytes that are acceptable for an individual "+
"MVCC stat to diverge; needs kv.split.estimated_mvcc_stats.enabled to be true",
512000) // 512 KB = 0.1% of the max range size

func init() {
RegisterReadWriteCommand(kvpb.EndTxn, declareKeysEndTxn, EndTxn)
}
Expand Down Expand Up @@ -1105,6 +1129,9 @@ func splitTrigger(
RightIsEmpty: emptyRHS,
PreSplitLeftUser: split.PreSplitLeftUserStats,
PostSplitScanLocalLeftFn: makeScanStatsFn(ctx, batch, ts, &split.LeftDesc, "local left hand side", true /* excludeUserSpans */),
PreSplitStats: split.PreSplitStats,
MaxCountDiff: MaxMVCCStatCountDiff.Get(&rec.ClusterSettings().SV),
MaxBytesDiff: MaxMVCCStatBytesDiff.Get(&rec.ClusterSettings().SV),
}
return splitTriggerHelper(ctx, rec, batch, h, split, ts)
}
Expand Down
26 changes: 25 additions & 1 deletion pkg/kv/kvserver/batcheval/split_stats_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

package batcheval

import "github.com/cockroachdb/cockroach/pkg/storage/enginepb"
import (
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
)

// splitStatsHelper codifies and explains the stats computations related to a
// split. The quantities known during a split (i.e. while the split trigger
Expand Down Expand Up @@ -151,6 +153,18 @@ type splitStatsHelperInput struct {
// user spans of the left hand side, concurrent with the split, may not be
// accounted for.
PostSplitScanLocalLeftFn splitStatsScanFn
// PreSplitStats are the total on-disk stats before the split (in AdminSplit).
PreSplitStats enginepb.MVCCStats
// Max number of entities (keys, values, etc.) corresponding to a single MVCC
// stat (e.g. KeyCount) that is acceptable as the absolute difference between
// PreSplitStats and AbsPreSplitBothStored.
// Tuned by kv.split.max_mvcc_stat_count_diff.
MaxCountDiff int64
// Max number of bytes corresponding to a single MVCC stat (e.g. KeyBytes)
// that is acceptable as the absolute difference between PreSplitStats and
// AbsPreSplitBothStored.
// Tuned by kv.split.max_mvcc_stat_bytes_diff.
MaxBytesDiff int64
}

// makeSplitStatsHelper initializes a splitStatsHelper. The values in the input
Expand Down Expand Up @@ -246,6 +260,16 @@ func makeEstimatedSplitStatsHelper(input splitStatsHelperInput) (splitStatsHelpe
return h, nil
}

// If the user pre-split stats differ significantly from the current stats
// stored on disk, fall back to accurate-stats computation.
// Note that the current stats on disk were corrected in AdminSplit, so any
// differences we see here are due to writes concurrent with this split (not
// compounded estimates from previous splits).
if !h.in.AbsPreSplitBothStored.HasUserDataCloseTo(
h.in.PreSplitStats, h.in.MaxCountDiff, h.in.MaxBytesDiff) {
return makeSplitStatsHelper(input)
}

var absPostSplitFirst enginepb.MVCCStats
var err error
// If either the RHS or LHS is empty, scan it (it's cheap).
Expand Down
235 changes: 132 additions & 103 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,8 +895,9 @@ func TestStoreRangeSplitMergeStats(t *testing.T) {
//
// The test writes some data, then initiates a split and pauses it before the
// EndTxn request with the split trigger is evaluated. Then, it writes some more
// data, and unpauses the split. The MVCC stats for the two new ranges should
// agree with a re-computation.
// data, and unpauses the split. If the split computes accurate stats, the MVCC
// stats for the two new ranges should agree with a re-computation. Otherwise,
// we expect ContainsEstimates.
func TestStoreRangeSplitWithConcurrentWrites(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -918,111 +919,139 @@ func TestStoreRangeSplitWithConcurrentWrites(t *testing.T) {
return nil
}

ctx := context.Background()
s := serverutils.StartServerOnly(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true,
DisableSplitQueue: true,
TestingRequestFilter: filter,
},
},
})

defer s.Stopper().Stop(ctx)
store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID())
require.NoError(t, err)
testutils.RunTrueAndFalse(t, "estimates", func(t *testing.T, estimates bool) {
testutils.RunTrueAndFalse(t, "maxCount", func(t *testing.T, maxCount bool) {
if !estimates && maxCount {
// MaxMVCCStatCountDiff and MaxMVCCStatBytesDiff are not respected with
// EnableEstimatedMVCCStatsInSplit = false.
skip.IgnoreLint(t, "incompatible params")
}
testutils.RunTrueAndFalse(t, "maxBytes", func(t *testing.T, maxBytes bool) {
settings := cluster.MakeTestingClusterSettings()
batcheval.EnableEstimatedMVCCStatsInSplit.Override(context.Background(), &settings.SV, estimates)
if maxCount {
// If there is even a single write concurrent with the split, fall
// back to accurate stats computation.
batcheval.MaxMVCCStatCountDiff.Override(context.Background(), &settings.SV, 0)
}
if maxBytes {
// If there are more than 10 bytes of writes concurrent with the split, fall
// back to accurate stats computation.
batcheval.MaxMVCCStatBytesDiff.Override(context.Background(), &settings.SV, 10)
}
fallBackToAccurateStats := maxCount || maxBytes

ctx := context.Background()
s := serverutils.StartServerOnly(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true,
DisableSplitQueue: true,
TestingRequestFilter: filter,
},
},
Settings: settings,
})

// Write some initial data to the future LHS.
_, pErr := kv.SendWrapped(ctx, store.TestSender(), putArgs([]byte("a"), []byte("foo")))
require.NoError(t, pErr.GoError())
// Write some initial data to the future RHS.
_, pErr = kv.SendWrapped(ctx, store.TestSender(), putArgs([]byte("c"), []byte("bar")))
require.NoError(t, pErr.GoError())
defer s.Stopper().Stop(ctx)
store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID())
require.NoError(t, err)

// Write some initial data to the future LHS.
_, pErr := kv.SendWrapped(ctx, store.TestSender(), putArgs([]byte("a"), []byte("foo")))
require.NoError(t, pErr.GoError())
// Write some initial data to the future RHS.
_, pErr = kv.SendWrapped(ctx, store.TestSender(), putArgs([]byte("c"), []byte("bar")))
require.NoError(t, pErr.GoError())

splitKeyAddr, err := keys.Addr(splitKey)
require.NoError(t, err)
lhsRepl := store.LookupReplica(splitKeyAddr)

// Split the range.
g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
_, pErr = kv.SendWrapped(ctx, store.TestSender(), adminSplitArgs(splitKey))
return pErr.GoError()
})

splitKeyAddr, err := keys.Addr(splitKey)
require.NoError(t, err)
lhsRepl := store.LookupReplica(splitKeyAddr)
// Wait until split is underway.
<-splitBlocked

// Write some more data to both sides.
_, pErr = kv.SendWrapped(ctx, store.TestSender(), putArgs([]byte("aa"), []byte("foo")))
require.NoError(t, pErr.GoError())
_, pErr = kv.SendWrapped(ctx, store.TestSender(), putArgs([]byte("cc"), []byte("bar")))
require.NoError(t, pErr.GoError())

// Unblock the split.
splitBlocked <- struct{}{}

// Wait for the split to complete.
require.Nil(t, g.Wait())

snap := store.TODOEngine().NewSnapshot()
defer snap.Close()
lhsStats, err := stateloader.Make(lhsRepl.RangeID).LoadMVCCStats(ctx, snap)
require.NoError(t, err)
rhsRepl := store.LookupReplica(splitKeyAddr)
rhsStats, err := stateloader.Make(rhsRepl.RangeID).LoadMVCCStats(ctx, snap)
require.NoError(t, err)
// If the split is producing estimates and neither of the tight count
// and bytes thresholds is set, expect non-zero ContainsEstimates.
if estimates && !fallBackToAccurateStats {
require.Greater(t, lhsStats.ContainsEstimates, int64(0))
require.Greater(t, rhsStats.ContainsEstimates, int64(0))
} else {
// Otherwise, the stats should agree with re-computation.
assertRecomputedStats(t, "LHS after split", snap, lhsRepl.Desc(), lhsStats, s.Clock().PhysicalNow())
assertRecomputedStats(t, "RHS after split", snap, rhsRepl.Desc(), rhsStats, s.Clock().PhysicalNow())
}

// Split the range.
g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
_, pErr = kv.SendWrapped(ctx, store.TestSender(), adminSplitArgs(splitKey))
return pErr.GoError()
// If we used estimated stats while splitting the range, the stats on disk
// will not match the stats recomputed from the range. We expect both of the
// concurrent writes to be attributed to the RHS (instead of 1 to the LHS and
// 1 th the RHS). But if we split these ranges one more time (no concurrent
// writes this time), we expect the stats to be corrected and not drift.
splitKeyLeft := roachpb.Key("aa")
splitKeyLeftAddr, err := keys.Addr(splitKeyLeft)
require.NoError(t, err)
_, pErr = kv.SendWrapped(ctx, store.TestSender(), adminSplitArgs(splitKeyLeft))
require.NoError(t, pErr.GoError())
splitKeyRight := roachpb.Key("bb")
splitKeyRightAddr, err := keys.Addr(splitKeyRight)
require.NoError(t, err)
_, pErr = kv.SendWrapped(ctx, store.TestSender(), adminSplitArgs(splitKeyRight))
require.NoError(t, pErr.GoError())

snap = store.TODOEngine().NewSnapshot()
defer snap.Close()
lhs1Stats, err := stateloader.Make(lhsRepl.RangeID).LoadMVCCStats(ctx, snap)
require.NoError(t, err)
lhs2Repl := store.LookupReplica(splitKeyLeftAddr)
lhs2Stats, err := stateloader.Make(lhs2Repl.RangeID).LoadMVCCStats(ctx, snap)
require.NoError(t, err)
rhs1Stats, err := stateloader.Make(rhsRepl.RangeID).LoadMVCCStats(ctx, snap)
require.NoError(t, err)
rhs2Repl := store.LookupReplica(splitKeyRightAddr)
rhs2Stats, err := stateloader.Make(rhs2Repl.RangeID).LoadMVCCStats(ctx, snap)
require.NoError(t, err)

// Stats should agree with re-computation.
assertRecomputedStats(t, "LHS1 after second split", snap, lhsRepl.Desc(), lhs1Stats, s.Clock().PhysicalNow())
assertRecomputedStats(t, "LHS2 after second split", snap, lhs2Repl.Desc(), lhs2Stats, s.Clock().PhysicalNow())
assertRecomputedStats(t, "RHS1 after second split", snap, rhsRepl.Desc(), rhs1Stats, s.Clock().PhysicalNow())
assertRecomputedStats(t, "RHS2 after second split", snap, rhs2Repl.Desc(), rhs2Stats, s.Clock().PhysicalNow())
if estimates && !fallBackToAccurateStats {
require.Greater(t, lhs1Stats.ContainsEstimates, int64(0))
require.Greater(t, lhs2Stats.ContainsEstimates, int64(0))
// This range is empty, so we don't label it with ContainsEstimates.
require.Equal(t, int64(0), rhs1Stats.ContainsEstimates)
require.Greater(t, rhs2Stats.ContainsEstimates, int64(0))
}
})
})
})

// Wait until split is underway.
<-splitBlocked

// Write some more data to both sides.
_, pErr = kv.SendWrapped(ctx, store.TestSender(), putArgs([]byte("aa"), []byte("foo")))
require.NoError(t, pErr.GoError())
_, pErr = kv.SendWrapped(ctx, store.TestSender(), putArgs([]byte("cc"), []byte("bar")))
require.NoError(t, pErr.GoError())

// Unblock the split.
splitBlocked <- struct{}{}

// Wait for the split to complete.
require.Nil(t, g.Wait())

snap := store.TODOEngine().NewSnapshot()
defer snap.Close()
lhsStats, err := stateloader.Make(lhsRepl.RangeID).LoadMVCCStats(ctx, snap)
require.NoError(t, err)
rhsRepl := store.LookupReplica(splitKeyAddr)
rhsStats, err := stateloader.Make(rhsRepl.RangeID).LoadMVCCStats(ctx, snap)
require.NoError(t, err)
if batcheval.EnableEstimatedMVCCStatsInSplit.Get(&store.ClusterSettings().SV) {
require.Greater(t, lhsStats.ContainsEstimates, int64(0))
require.Greater(t, rhsStats.ContainsEstimates, int64(0))
} else {
// Stats should agree with re-computation.
assertRecomputedStats(t, "LHS after split", snap, lhsRepl.Desc(), lhsStats, s.Clock().PhysicalNow())
assertRecomputedStats(t, "RHS after split", snap, rhsRepl.Desc(), rhsStats, s.Clock().PhysicalNow())
}

// If we used estimated stats while splitting the range, the stats on disk
// will not match the stats recomputed from the range. We expect both of the
// concurrent writes to be attributed to the RHS (instead of 1 to the LHS and
// 1 th the RHS). But if we split these ranges one more time (no concurrent
// writes this time), we expect the stats to be corrected and not drift.
splitKeyLeft := roachpb.Key("aa")
splitKeyLeftAddr, err := keys.Addr(splitKeyLeft)
require.NoError(t, err)
_, pErr = kv.SendWrapped(ctx, store.TestSender(), adminSplitArgs(splitKeyLeft))
require.NoError(t, pErr.GoError())
splitKeyRight := roachpb.Key("bb")
splitKeyRightAddr, err := keys.Addr(splitKeyRight)
require.NoError(t, err)
_, pErr = kv.SendWrapped(ctx, store.TestSender(), adminSplitArgs(splitKeyRight))
require.NoError(t, pErr.GoError())

snap = store.TODOEngine().NewSnapshot()
defer snap.Close()
lhs1Stats, err := stateloader.Make(lhsRepl.RangeID).LoadMVCCStats(ctx, snap)
require.NoError(t, err)
lhs2Repl := store.LookupReplica(splitKeyLeftAddr)
lhs2Stats, err := stateloader.Make(lhs2Repl.RangeID).LoadMVCCStats(ctx, snap)
require.NoError(t, err)
rhs1Stats, err := stateloader.Make(rhsRepl.RangeID).LoadMVCCStats(ctx, snap)
require.NoError(t, err)
rhs2Repl := store.LookupReplica(splitKeyRightAddr)
rhs2Stats, err := stateloader.Make(rhs2Repl.RangeID).LoadMVCCStats(ctx, snap)
require.NoError(t, err)

// Stats should agree with re-computation.
assertRecomputedStats(t, "LHS1 after second split", snap, lhsRepl.Desc(), lhs1Stats, s.Clock().PhysicalNow())
assertRecomputedStats(t, "LHS2 after second split", snap, lhs2Repl.Desc(), lhs2Stats, s.Clock().PhysicalNow())
assertRecomputedStats(t, "RHS1 after second split", snap, rhsRepl.Desc(), rhs1Stats, s.Clock().PhysicalNow())
assertRecomputedStats(t, "RHS2 after second split", snap, rhs2Repl.Desc(), rhs2Stats, s.Clock().PhysicalNow())
if batcheval.EnableEstimatedMVCCStatsInSplit.Get(&store.ClusterSettings().SV) {
require.Greater(t, lhs1Stats.ContainsEstimates, int64(0))
require.Greater(t, lhs2Stats.ContainsEstimates, int64(0))
// This range is empty, so we don't label it with ContainsEstimates.
require.Equal(t, int64(0), rhs1Stats.ContainsEstimates)
require.Greater(t, rhs2Stats.ContainsEstimates, int64(0))
}
}

// RaftMessageHandlerInterceptor wraps a storage.IncomingRaftMessageHandler. It
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func splitTxnAttempt(
oldDesc *roachpb.RangeDescriptor,
reason redact.RedactableString,
preSplitLeftUserStats enginepb.MVCCStats,
preSplitStats enginepb.MVCCStats,
) error {
txn.SetDebugName(splitTxnName)

Expand Down Expand Up @@ -234,6 +235,7 @@ func splitTxnAttempt(
LeftDesc: *leftDesc,
RightDesc: *rightDesc,
PreSplitLeftUserStats: preSplitLeftUserStats,
PreSplitStats: preSplitStats,
},
},
})
Expand Down Expand Up @@ -471,7 +473,7 @@ func (r *Replica) adminSplitWithDescriptor(
}

if err := r.store.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return splitTxnAttempt(ctx, r.store, txn, leftDesc, rightDesc, desc, reason, userOnlyLeftStats)
return splitTxnAttempt(ctx, r.store, txn, leftDesc, rightDesc, desc, reason, userOnlyLeftStats, r.GetMVCCStats())
}); err != nil {
// The ConditionFailedError can occur because the descriptors acting
// as expected values in the CPuts used to update the left or right
Expand Down
5 changes: 5 additions & 0 deletions pkg/roachpb/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ message SplitTrigger {
// Pre-computed stats for the LHS spans corresponding to user data.
// Used in splitTrigger to estimate the LHS range's MVCC stats.
storage.enginepb.MVCCStats pre_split_left_user_stats = 5 [(gogoproto.nullable) = false];
// Pre-computed stats for the entire range.
// Used in splitTrigger to fall back to accurate stats computation if these
// pre-computed stats differ significantly from the stats retrieved from
// disk while holding latches in splitTrigger.
storage.enginepb.MVCCStats pre_split_stats = 6 [(gogoproto.nullable) = false];
}

// A MergeTrigger is run after a successful commit of an AdminMerge
Expand Down
Loading

0 comments on commit fe85c85

Please sign in to comment.