Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.2: kvserver: avoid load based splits in middle of SQL row #104563

Merged
merged 5 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ go_test(
"replica_rangefeed_test.go",
"replica_rankings_test.go",
"replica_sideload_test.go",
"replica_split_load_test.go",
"replica_sst_snapshot_storage_test.go",
"replica_test.go",
"replica_tscache_test.go",
Expand Down
11 changes: 8 additions & 3 deletions pkg/kv/kvserver/asim/state/split_decider.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
package state

import (
"context"
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/metric"
)

// LoadSplitter provides an abstraction for load based splitting. It records
Expand Down Expand Up @@ -66,7 +68,10 @@ func (s *SplitDecider) newDecider() *split.Decider {
}

decider := &split.Decider{}
split.Init(decider, intN, s.qpsThreshold, s.qpsRetention)
split.Init(decider, intN, s.qpsThreshold, s.qpsRetention, &split.LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
return decider
}

Expand All @@ -81,7 +86,7 @@ func (s *SplitDecider) Record(tick time.Time, rangeID RangeID, le workload.LoadE
}

qps := LoadEventQPS(le)
shouldSplit := decider.Record(tick, int(qps), func() roachpb.Span {
shouldSplit := decider.Record(context.Background(), tick, int(qps), func() roachpb.Span {
return roachpb.Span{
Key: Key(le.Key).ToRKey().AsRawKey(),
}
Expand All @@ -102,7 +107,7 @@ func (s *SplitDecider) SplitKey(tick time.Time, rangeID RangeID) (Key, bool) {
return InvalidKey, false
}

key := decider.MaybeSplitKey(tick)
key := decider.MaybeSplitKey(context.Background(), tick)
if key == nil {
return InvalidKey, false
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_range_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func RangeStats(
) (result.Result, error) {
reply := resp.(*roachpb.RangeStatsResponse)
reply.MVCCStats = cArgs.EvalCtx.GetMVCCStats()
reply.DeprecatedLastQueriesPerSecond = cArgs.EvalCtx.GetLastSplitQPS()
if qps, ok := cArgs.EvalCtx.GetMaxSplitQPS(); ok {
reply.DeprecatedLastQueriesPerSecond = cArgs.EvalCtx.GetLastSplitQPS(ctx)
if qps, ok := cArgs.EvalCtx.GetMaxSplitQPS(ctx); ok {
reply.MaxQueriesPerSecond = qps
} else {
// See comment on MaxQueriesPerSecond. -1 means !ok.
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,15 @@ type EvalContext interface {
//
// NOTE: This should not be used when the load based splitting cluster setting
// is disabled.
GetMaxSplitQPS() (float64, bool)
GetMaxSplitQPS(context.Context) (float64, bool)

// GetLastSplitQPS returns the Replica's most recent queries/s request rate.
//
// NOTE: This should not be used when the load based splitting cluster setting
// is disabled.
//
// TODO(nvanbenschoten): remove this method in v22.1.
GetLastSplitQPS() float64
GetLastSplitQPS(context.Context) float64

GetGCThreshold() hlc.Timestamp
ExcludeDataFromBackup() bool
Expand Down Expand Up @@ -240,10 +240,10 @@ func (m *mockEvalCtxImpl) ContainsKey(key roachpb.Key) bool {
func (m *mockEvalCtxImpl) GetMVCCStats() enginepb.MVCCStats {
return m.Stats
}
func (m *mockEvalCtxImpl) GetMaxSplitQPS() (float64, bool) {
func (m *mockEvalCtxImpl) GetMaxSplitQPS(context.Context) (float64, bool) {
return m.QPS, true
}
func (m *mockEvalCtxImpl) GetLastSplitQPS() float64 {
func (m *mockEvalCtxImpl) GetLastSplitQPS(context.Context) float64 {
return m.QPS
}
func (m *mockEvalCtxImpl) CanCreateTxnRecord(
Expand Down
246 changes: 246 additions & 0 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3649,3 +3649,249 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) {
repl = store.LookupReplica(roachpb.RKey(splitKey))
require.Equal(t, descKey, repl.Desc().StartKey.AsRawKey())
}

// TestLBSplitUnsafeKeys tests that load based splits do not split between table
// rows, even when the suggested load based split key is itself between a table row.
func TestLBSplitUnsafeKeys(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
const indexID = 1

// The test is expensive and prone to timing out under race.
skip.UnderRace(t)
skip.UnderStressRace(t)
skip.UnderDeadlock(t)

makeTestKey := func(tableID uint32, suffix []byte) roachpb.Key {
tableKey := keys.MakeTableIDIndexID(nil, tableID, indexID)
return append(tableKey, suffix...)
}

es := func(vals ...int64) []byte {
k := []byte{}
for _, v := range vals {
k = encoding.EncodeVarintAscending(k, v)
}
return k
}

fk := func(k []byte, famID uint32) []byte {
return keys.MakeFamilyKey(k, famID)
}

testCases := []struct {
splitKey roachpb.Key
existingKeys []int
expSplitKey roachpb.Key
expErrStr string
}{
// We don't know the table ID here, we append the splitKey to the
// table/index prefix. e.g. /1 will be /Table/table_id/index_id/1.
{
// /1 -> /2
splitKey: es(1),
existingKeys: []int{1, 2, 3},
expSplitKey: es(2),
},
{
// /1/0 -> /2
splitKey: fk(es(1), 0),
existingKeys: []int{1, 2, 3},
expSplitKey: es(2),
},
{
// /1/3 -> /2
splitKey: fk(es(1), 3),
existingKeys: []int{1, 2, 3},
expSplitKey: es(2),
},
{
// /1/0/0 -> /2
splitKey: roachpb.Key(fk(es(1), 0)).Next(),
existingKeys: []int{1, 2, 3},
expSplitKey: es(2),
},
{
// /1/3/0 -> /2
splitKey: roachpb.Key(fk(es(1), 3)).Next(),
existingKeys: []int{1, 2, 3},
expSplitKey: es(2),
},
{
// /1/0/0/0 -> /2
splitKey: roachpb.Key(fk(es(1), 0)).Next().Next(),
existingKeys: []int{1, 2, 3},
expSplitKey: es(2),
},
{
// /1/3/0/0 -> /2
splitKey: roachpb.Key(fk(es(1), 3)).Next().Next(),
existingKeys: []int{1, 2, 3},
expSplitKey: es(2),
},
{
// /0 -> /3
// We will not split at the first row in a table, so expect the split at
// /3 intead of /2.
splitKey: es(0),
existingKeys: []int{2, 3},
expSplitKey: es(3),
},
{
// /2 -> /3
// Same case as above, despite the key being safe, the split would create
// an empty LHS.
splitKey: es(2),
existingKeys: []int{2, 3},
expSplitKey: es(3),
},
{
// /1 -> error
// There are no rows to split on.
splitKey: es(1),
existingKeys: []int{},
expErrStr: "could not find valid split key",
},
{
// /1 -> error
// There is only one row to split on, the range should not be split.
splitKey: es(1),
existingKeys: []int{2},
expErrStr: "could not find valid split key",
},
{
// /1/0 -> error
splitKey: fk(es(1), 0),
existingKeys: []int{2},
expErrStr: "could not find valid split key",
},
{
// /1/3 -> error
splitKey: fk(es(1), 3),
existingKeys: []int{2},
expErrStr: "could not find valid split key",
},
{
// /1/3/0/0 -> error
splitKey: roachpb.Key(fk(es(1), 0)).Next().Next(),
existingKeys: []int{2},
expErrStr: "could not find valid split key",
},
{
// /2 -> /2
splitKey: es(2),
existingKeys: []int{1, 2},
expSplitKey: es(2),
},
}

for _, tc := range testCases {
var expectStr string
if tc.expErrStr != "" {
expectStr = tc.expErrStr
} else {
expectStr = makeTestKey(1, tc.expSplitKey).String()
}
t.Run(fmt.Sprintf("%s%v -> %s", makeTestKey(1, tc.splitKey), tc.existingKeys, expectStr), func(t *testing.T) {
var targetRange atomic.Int32
var splitKeyOverride atomic.Value
splitKeyOverride.Store(roachpb.Key{})

// Mock the load based splitter key finding method. This function will be
// checked in splitQueue.shouldQueue() and splitQueue.process via
// replica.loadSplitKey. When a key is returned, the split queue calls
// replica.adminSplitWithDescriptor(...findFirstSafeSplitKey=true).
overrideLBSplitFn := func(rangeID roachpb.RangeID) (splitKey roachpb.Key, useSplitKey bool) {
if rangeID == roachpb.RangeID(targetRange.Load()) {
override := splitKeyOverride.Load()
// It is possible that the split queue is checking the range before
// we manually enqueued it.
if override == nil {
return nil, false
}
overrideKey, ok := override.(roachpb.Key)
require.Truef(t, ok, "stored value not key %+v", override)

if len(overrideKey) == 0 {
return nil, false
}

return override.(roachpb.Key), true
}
return nil, false
}

serv, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
DisableSpanConfigs: true,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
LoadBasedSplittingOverrideKey: overrideLBSplitFn,
DisableMergeQueue: true,
},
},
})
s := serv.(*server.TestServer)
defer s.Stopper().Stop(ctx)
tdb := sqlutils.MakeSQLRunner(sqlDB)
store, err := s.Stores().GetStore(1)
require.NoError(t, err)

// We want to exercise the case where there are column family keys.
// Create a simple table and insert the existing keys.
_ = tdb.Exec(t,
"CREATE TABLE t (k INT PRIMARY KEY, "+
"t0 INT, t1 INT, t2 INT, t3 INT, "+
"FAMILY (k), FAMILY (t0), FAMILY (t1), FAMILY (t2), FAMILY (t3))")
for _, k := range tc.existingKeys {
_ = tdb.Exec(t, fmt.Sprintf("INSERT INTO t VALUES (%d, %d, %d, %d, %d)",
k, k, k, k, k))
}

// Force a table scan to resolve descriptors.
var keyCount int
tdb.QueryRow(t, "SELECT count(k) FROM t").Scan(&keyCount)
require.Equal(t, len(tc.existingKeys), keyCount)
var tableID uint32
tdb.QueryRow(t, "SELECT table_id FROM crdb_internal.leases where name = 't'").Scan(&tableID)

// Split off the table range for the test, otherwise the range may
// contain multiple tables with existing values.
splitArgs := adminSplitArgs(keys.SystemSQLCodec.TablePrefix(tableID))
_, pErr := kv.SendWrapped(ctx, store.TestSender(), splitArgs)
require.Nil(t, pErr)

var rangeID roachpb.RangeID
tdb.QueryRow(t, "SELECT range_id FROM [SHOW RANGES FROM TABLE t] LIMIT 1").Scan(&rangeID)
targetRange.Store(int32(rangeID))
repl, err := store.GetReplica(rangeID)
require.NoError(t, err)

// Keep the previous end key around, we will use this to assert that no
// split has occurred when expecting an error.
prevEndKey := repl.Desc().EndKey.AsRawKey()
splitKey := makeTestKey(tableID, tc.splitKey)

// Update the split key override so that the split queue will enqueue and
// process the range. Remove it afterwards to avoid retrying the LHS.
splitKeyOverride.Store(splitKey)
_, processErr, enqueueErr := store.Enqueue(ctx, "split", repl, false /* shouldSkipQueue */, false /* async */)
splitKeyOverride.Store(roachpb.Key{})
require.NoError(t, enqueueErr)

endKey := repl.Desc().EndKey.AsRawKey()
if tc.expErrStr != "" {
// We expect this split not to process, assert that the expected error
// matches the returned error and the range has the same end key.
require.ErrorContainsf(t, processErr, tc.expErrStr,
"end key %s, previous end key %s", endKey, prevEndKey)
require.Equal(t, prevEndKey, endKey)
} else {
// Otherwise, assert that the new range end key matches the expected
// end key.
require.NoError(t, processErr)
require.Equal(t, makeTestKey(tableID, tc.expSplitKey), endKey)
}
})
}
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (mq *mergeQueue) process(

lhsDesc := lhsRepl.Desc()
lhsStats := lhsRepl.GetMVCCStats()
lhsQPS, lhsQPSOK := lhsRepl.GetMaxSplitQPS()
lhsQPS, lhsQPSOK := lhsRepl.GetMaxSplitQPS(ctx)
minBytes := lhsRepl.GetMinBytes()
if lhsStats.Total() >= minBytes {
log.VEventf(ctx, 2, "skipping merge: LHS meets minimum size threshold %d with %d bytes",
Expand Down
Loading