Skip to content

Commit

Permalink
sql: pre-split hash sharded indexes ranges before DELETE_AND_WRITE_ONLY
Browse files Browse the repository at this point in the history
Fixes #74558

Pre-split ranges on shard boundaries before SchemaChanger move
new hash sharded indexes from DELETE_ONLY to DELETE_AND_WRITE_ONLY
state.

Release note (bug fix): When creating hash sharded index to an existing
table, traffic could hit hard on the single range of the index before
it is split into more ranges for shards as range size grows. This change
make schema changer able to presplit ranges on shard boundaries before
the index becomes writable. `sql.hash_sharded_range_pre_split.max` is
the cluster setting added so that users can set the upbound of the
amount of ranges to have. If the bucket count of the defined index is
less than the cluster setting, the bucket count will be the amount of
pre-split ranges.
  • Loading branch information
chengxiong-ruan committed Jan 19, 2022
1 parent eb26eb3 commit 1723f7a
Show file tree
Hide file tree
Showing 6 changed files with 261 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@
<tr><td><code>sql.distsql.temp_storage.workmem</code></td><td>byte size</td><td><code>64 MiB</code></td><td>maximum amount of memory in bytes a processor can use before falling back to temp storage</td></tr>
<tr><td><code>sql.guardrails.max_row_size_err</code></td><td>byte size</td><td><code>512 MiB</code></td><td>maximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an error is returned; use 0 to disable</td></tr>
<tr><td><code>sql.guardrails.max_row_size_log</code></td><td>byte size</td><td><code>64 MiB</code></td><td>maximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an event is logged to SQL_PERF (or SQL_INTERNAL_PERF if the mutating statement was internal); use 0 to disable</td></tr>
<tr><td><code>sql.hash_sharded_range_pre_split.max</code></td><td>integer</td><td><code>16</code></td><td>max pre-split ranges to have when adding hash sharded index to an existing table</td></tr>
<tr><td><code>sql.log.slow_query.experimental_full_table_scans.enabled</code></td><td>boolean</td><td><code>false</code></td><td>when set to true, statements that perform a full table/index scan will be logged to the slow query log even if they do not meet the latency threshold. Must have the slow query log enabled for this setting to have any effect.</td></tr>
<tr><td><code>sql.log.slow_query.internal_queries.enabled</code></td><td>boolean</td><td><code>false</code></td><td>when set to true, internal queries which exceed the slow query log threshold are logged to a separate log. Must have the slow query log enabled for this setting to have any effect.</td></tr>
<tr><td><code>sql.log.slow_query.latency_threshold</code></td><td>duration</td><td><code>0s</code></td><td>when set to non-zero, log statements whose service latency exceeds the threshold to a secondary logger on each node</td></tr>
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,14 @@ var hashShardedIndexesEnabledClusterMode = settings.RegisterBoolSetting(
false,
).WithPublic()

var maxHashShardedIndexRangePreSplit = settings.RegisterIntSetting(
settings.SystemOnly,
"sql.hash_sharded_range_pre_split.max",
"max pre-split ranges to have when adding hash sharded index to an existing table",
16,
settings.PositiveInt,
).WithPublic()

var zigzagJoinClusterMode = settings.RegisterBoolSetting(
settings.TenantWritable,
"sql.defaults.zigzag_join.enabled",
Expand Down
42 changes: 42 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/hash_sharded_index
Original file line number Diff line number Diff line change
Expand Up @@ -897,3 +897,45 @@ TABLE t
└── computed column expressions
└── crdb_internal_a_shard_8
└── mod(fnv32(crdb_internal.datums_to_bytes(a)), 8)

subtest test_hash_index_presplit

statement ok
SET experimental_enable_hash_sharded_indexes = on;
CREATE TABLE t_hash_pre_split (
a INT PRIMARY KEY,
b INT
);

skipif config 3node-tenant
query TITTT retry
SELECT t.name, r.table_id, r.index_name, r.start_pretty, r.end_pretty
FROM crdb_internal.tables t
JOIN crdb_internal.ranges r ON t.table_id = r.table_id
WHERE t.name = 't_hash_pre_split'
AND t.state = 'PUBLIC'
AND r.split_enforced_until IS NOT NULL;
----

statement ok
CREATE INDEX t_hash_pre_split_idx_b ON t_hash_pre_split (b) USING HASH WITH BUCKET_COUNT = 8;

skipif config 3node-tenant
query TITTT colnames,retry
SELECT t.name, r.table_id, r.index_name, r.start_pretty, r.end_pretty
FROM crdb_internal.tables t
JOIN crdb_internal.ranges r ON t.table_id = r.table_id
WHERE t.name = 't_hash_pre_split'
AND t.state = 'PUBLIC'
AND r.split_enforced_until IS NOT NULL;
----
name table_id index_name start_pretty end_pretty
t_hash_pre_split 92 t_hash_pre_split_idx_b /Table/92/2 /Table/92/2/0
t_hash_pre_split 92 t_hash_pre_split_idx_b /Table/92/2/0 /Table/92/2/1
t_hash_pre_split 92 t_hash_pre_split_idx_b /Table/92/2/1 /Table/92/2/2
t_hash_pre_split 92 t_hash_pre_split_idx_b /Table/92/2/2 /Table/92/2/3
t_hash_pre_split 92 t_hash_pre_split_idx_b /Table/92/2/3 /Table/92/2/4
t_hash_pre_split 92 t_hash_pre_split_idx_b /Table/92/2/4 /Table/92/2/5
t_hash_pre_split 92 t_hash_pre_split_idx_b /Table/92/2/5 /Table/92/2/6
t_hash_pre_split 92 t_hash_pre_split_idx_b /Table/92/2/6 /Table/92/2/7
t_hash_pre_split 92 t_hash_pre_split_idx_b /Table/92/2/7 /Max
86 changes: 86 additions & 0 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -1423,6 +1424,11 @@ func (sc *SchemaChanger) runStateMachineAndBackfill(ctx context.Context) error {
if fn := sc.testingKnobs.RunBeforePublishWriteAndDelete; fn != nil {
fn()
}

if err := sc.preSplitHashShardedIndexRanges(ctx); err != nil {
return err
}

// Run through mutation state machine before backfill.
if err := sc.RunStateMachineBeforeBackfill(ctx); err != nil {
return err
Expand Down Expand Up @@ -1931,6 +1937,14 @@ type SchemaChangerTestingKnobs struct {
// TwoVersionLeaseViolation is called whenever a schema change transaction is
// unable to commit because it is violating the two version lease invariant.
TwoVersionLeaseViolation func()

// RunBeforeHashShardedIndexRangePreSplit is called before pre-splitting index
// ranges for hash sharded index.
RunBeforeHashShardedIndexRangePreSplit func(tbl *tabledesc.Mutable, kbDB *kv.DB, codec keys.SQLCodec) error

// RunAfterHashShardedIndexRangePreSplit is called after index ranges
// pre-splitting is done for hash sharded index.
RunAfterHashShardedIndexRangePreSplit func(tbl *tabledesc.Mutable, kbDB *kv.DB, codec keys.SQLCodec) error
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down Expand Up @@ -2516,6 +2530,78 @@ func (sc *SchemaChanger) getDependentMutationsJobs(
return dependentJobs, nil
}

func (sc *SchemaChanger) preSplitHashShardedIndexRanges(ctx context.Context) error {
if err := sc.txn(ctx, func(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection,
) error {
hour := hlc.Timestamp{WallTime: timeutil.Now().Add(time.Hour).UnixNano()}
tableDesc, err := descsCol.GetMutableTableByID(
ctx, txn, sc.descID,
tree.ObjectLookupFlags{
CommonLookupFlags: tree.CommonLookupFlags{
IncludeOffline: true,
IncludeDropped: true,
},
},
)
if err != nil {
return err
}

if fn := sc.testingKnobs.RunBeforeHashShardedIndexRangePreSplit; fn != nil {
if err := fn(tableDesc, sc.db, sc.execCfg.Codec); err != nil {
return err
}
}

for _, m := range tableDesc.AllMutations() {
if m.MutationID() != sc.mutationID {
// Mutations are applied in a FIFO order. Only apply the first set of
// mutations if they have the mutation ID we're looking for.
break
}

if idx := m.AsIndex(); m.Adding() && m.DeleteOnly() && idx != nil {
if idx.IsSharded() {
splitAtShards := calculateSplitAtShards(maxHashShardedIndexRangePreSplit.Get(&sc.settings.SV), idx.GetSharded().ShardBuckets)
for _, shard := range splitAtShards {
keyPrefix := sc.execCfg.Codec.IndexPrefix(uint32(tableDesc.GetID()), uint32(idx.GetID()))
splitKey := encoding.EncodeVarintAscending(keyPrefix, shard)
if err := sc.db.SplitAndScatter(ctx, splitKey, hour); err != nil {
return err
}
}
}
}
}

if fn := sc.testingKnobs.RunAfterHashShardedIndexRangePreSplit; fn != nil {
if err := fn(tableDesc, sc.db, sc.execCfg.Codec); err != nil {
return err
}
}

return nil
}); err != nil {
return err
}

return nil
}

// calculateSplitAtShards returns a slice of min(maxSplit, shardBucketCount)
// shard numbers. Shard numbers are sampled with a fix step within
// [0, shardBucketCount) range.
func calculateSplitAtShards(maxSplit int64, shardBucketCount int32) []int64 {
splitCount := int(math.Min(float64(maxSplit), float64(shardBucketCount)))
step := float64(shardBucketCount) / float64(splitCount)
splitAtShards := make([]int64, splitCount)
for i := 0; i < splitCount; i++ {
splitAtShards[i] = int64(math.Floor(float64(i) * step))
}
return splitAtShards
}

// isCurrentMutationDiscarded returns if the current column mutation is made irrelevant
// by a later operation. The nextMutationIdx provides the index at which to check for
// later mutation.
Expand Down
51 changes: 51 additions & 0 deletions pkg/sql/schema_changer_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ package sql

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/backfill"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

// TestingDistIndexBackfill exposes the index backfill functionality for
Expand All @@ -35,3 +38,51 @@ func (sc *SchemaChanger) TestingDistIndexBackfill(
func (sc *SchemaChanger) SetJob(job *jobs.Job) {
sc.job = job
}

func TestCalculateSplitAtShards(t *testing.T) {
defer leaktest.AfterTest(t)()

testCases := []struct {
testName string
maxSplit int64
bucketCount int32
expected []int64
}{
{
testName: "buckets_less_than_max_split",
maxSplit: 8,
bucketCount: 0,
expected: []int64{},
},
{
testName: "buckets_less_than_max_split",
maxSplit: 8,
bucketCount: 5,
expected: []int64{0, 1, 2, 3, 4},
},
{
testName: "buckets_equal_to_max_split",
maxSplit: 8,
bucketCount: 8,
expected: []int64{0, 1, 2, 3, 4, 5, 6, 7},
},
{
testName: "buckets_greater_than_max_split_1",
maxSplit: 8,
bucketCount: 30,
expected: []int64{0, 3, 7, 11, 15, 18, 22, 26},
},
{
testName: "buckets_greater_than_max_split_2",
maxSplit: 8,
bucketCount: 1000,
expected: []int64{0, 125, 250, 375, 500, 625, 750, 875},
},
}
for _, tc := range testCases {
t.Run(tc.testName, func(t *testing.T) {
shards := calculateSplitAtShards(tc.maxSplit, tc.bucketCount)
require.Equal(t, tc.expected, shards)
})
}
}
73 changes: 73 additions & 0 deletions pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/gcjob"
"github.com/cockroachdb/cockroach/pkg/sql/sqltestutils"
Expand Down Expand Up @@ -7519,3 +7520,75 @@ CREATE INDEX ON t.test (b) USING HASH WITH BUCKET_COUNT = 8;
)
require.Len(t, constraintsToValidate, 0)
}

func TestHashShardedIndexRangePreSplit(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

getShardedIndexRanges := func(tableDesc *tabledesc.Mutable, kvDB *kv.DB, codec keys.SQLCodec) ([]kv.KeyValue, error) {
indexSpan := tableDesc.IndexSpan(codec, descpb.IndexID(2))
ranges, err := kvDB.Scan(
ctx,
keys.RangeMetaKey(keys.MustAddr(indexSpan.Key)),
keys.RangeMetaKey(keys.MustAddr(indexSpan.EndKey)),

100,
)
if err != nil {
return nil, err
}
return ranges, nil
}

var runBeforePreSplitting func(tbl *tabledesc.Mutable, kvDB *kv.DB, codec keys.SQLCodec) error
var runAfterPreSplitting func(tbl *tabledesc.Mutable, kvDB *kv.DB, codec keys.SQLCodec) error
params, _ := tests.CreateTestServerParams()
params.Knobs = base.TestingKnobs{
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
RunBeforeHashShardedIndexRangePreSplit: func(tbl *tabledesc.Mutable, kvDB *kv.DB, codec keys.SQLCodec) error {
return runBeforePreSplitting(tbl, kvDB, codec)
},
RunAfterHashShardedIndexRangePreSplit: func(tbl *tabledesc.Mutable, kvDB *kv.DB, codec keys.SQLCodec) error {
return runAfterPreSplitting(tbl, kvDB, codec)
},
},
}

s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)
tdb := sqlutils.MakeSQLRunner(sqlDB)

tdb.Exec(t, `
CREATE DATABASE t;
CREATE TABLE t.test_split(a INT PRIMARY KEY, b INT NOT NULL);
`,
)

runBeforePreSplitting = func(tableDesc *tabledesc.Mutable, kvDB *kv.DB, codec keys.SQLCodec) error {
ranges, err := getShardedIndexRanges(tableDesc, kvDB, codec)
if err != nil {
return err
}
if len(ranges) != 0 {
return errors.Newf("expected 0 ranges but found %d", len(ranges))
}
return nil
}

runAfterPreSplitting = func(tableDesc *tabledesc.Mutable, kvDB *kv.DB, codec keys.SQLCodec) error {
ranges, err := getShardedIndexRanges(tableDesc, kvDB, codec)
if err != nil {
return err
}
if len(ranges) != 8 {
return errors.Newf("expected 8 ranges but found %d", len(ranges))
}
return nil
}

tdb.Exec(t, `
SET experimental_enable_hash_sharded_indexes = on;
CREATE INDEX idx_test_split_b ON t.test_split (b) USING HASH WITH BUCKET_COUNT = 8;
`)
}

0 comments on commit 1723f7a

Please sign in to comment.