Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
77403: kv/bulk: fix initial splits r=dt a=dt

Previously initial splits were missing at the first key due to how the loop
initialized. This meant that a processor Y that was ingesting data in the 
span `[k, p)` would write keys `k`, `j`, `l`, etc but not split until, say key `m`. 
When another processor X was ingesting into the prior span `[a, k)`, it regularly
splits and scatters the "empty" RHS of its span before it fills some prefix of it and
then splits and scatters the remaining span again. However the fact that processor 
Y did not split at `k` meant the "empty" range containing the empty suffix of `[a, k)` 
actually was not an empty range and instead had `k`, `j`, etc in it, which then had 
to be actually moved every time X scattered that range. 

Now the first key is used as a split key to avoid this. This requires using a
different predicate key than usual; typically we check if where we are
splitting at is still in the same range as the prior split to detect if
another node has already split the span, but we have no prior split on
the first one. Instead, we use the next split for the first one's predicate
as this can also serve to show that the span is still 'wide' enough to split.

Release note: none.

Release justification: low-risk fix of new functionality.

77443: admission: add support for tenant weights r=ajwerner,cucaroach a=sumeerbhola

The weights are used in ordering tenants, such that tenant i is
preferred over tenant j if used_i/weight_i < used_j/weight_j,
where the used values represent usage of slots or tokens. This
allows for a form of weighted fair sharing. This fair sharing
is quite primitive, since there is no history for slots, and only
1s of history for token consumption (with a sharp reset, instead
of a rolling one).

The weighting can be useful when a node (or store) has
significantly different number of ranges for two tenants, so
giving them an equal share of resources (like CPU) would not be
reasonable.

The minimum weight is 1 and the maximum weight is currently
capped at 20. Note that a tenant using 0 slots/tokens will always
be preferred over one that is using a non-zero amount, regardless
of weight. This reduces the likelihood starvation, though with
a large enough number of waiting tenants, both the unweighted
(weight of 1) and weighted scheme can have starvation since
ties between tenants that are using 0 slots/tokens are broken
non-deterministically (and not by preferring the longer waiting
tenant).

This will be used for KV admission control, both for kv and
kv-stores queues, which use slots and tokens respectively. The
integration code that periodically sets the weights based on
the range count per tenant will be in a later PR.

Informs #77358

Release justification: Low-risk update to new functionality.
Even though inter-tenant isolation was included in v21.2,
it has only been used in CockroachDB serverless recently,
and there is consensus to include weighting for v22.1.
The integration code (in a later PR) will be gated on a
cluster setting, and will default to not using weights.

Release note: None

77497: sql: presplit temp hash sharded index r=chengxiong-ruan a=chengxiong-ruan

fixes #76686

we presplit hash sharded index before backflling it. but with the new
mvcc index backfiller, we also create a temp index to take care of write
traffic when the origin index is being backfilled. We need to presplit
the temp index as well.

Release note: None
Release justification: this needed by the mvcc index backfiller

Co-authored-by: David Taylor <[email protected]>
Co-authored-by: sumeerbhola <[email protected]>
Co-authored-by: Chengxiong Ruan <[email protected]>
  • Loading branch information
4 people committed Mar 9, 2022
4 parents f15acd1 + 2a4f519 + 8f385b4 + 6dca2c9 commit e7e596e
Show file tree
Hide file tree
Showing 8 changed files with 547 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -281,4 +281,4 @@ t_presplit 116 t_presplit_idx_member_id /Table/116/2/"seattle"/3 /Tabl
t_presplit 116 t_presplit_idx_member_id /Table/116/2/"seattle"/4 /Table/116/2/"seattle"/5
t_presplit 116 t_presplit_idx_member_id /Table/116/2/"seattle"/5 /Table/116/2/"seattle"/6
t_presplit 116 t_presplit_idx_member_id /Table/116/2/"seattle"/6 /Table/116/2/"seattle"/7
t_presplit 116 t_presplit_idx_member_id /Table/116/2/"seattle"/7 /Max
t_presplit 116 t_presplit_idx_member_id /Table/116/2/"seattle"/7 /Table/116/3/"new york"/0
59 changes: 49 additions & 10 deletions pkg/kv/bulk/buffering_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
Expand Down Expand Up @@ -353,38 +354,76 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
}

func (b *BufferingAdder) createInitialSplits(ctx context.Context) error {
targetSize := b.curBuf.Len() / b.initialSplits
log.Infof(ctx, "%s adder creating up to %d initial splits from %d keys in %s buffer", b.name, b.initialSplits, b.curBuf.Len(), b.curBuf.MemSize)
log.Infof(ctx, "%s adder creating up to %d initial splits from %d KVs in %s buffer",
b.name, b.initialSplits, b.curBuf.Len(), b.curBuf.MemSize)

hour := hlc.Timestamp{WallTime: timeutil.Now().Add(time.Hour).UnixNano()}

before := timeutil.Now()

created := 0
for i := targetSize; i < b.curBuf.Len(); i += targetSize {
k := b.curBuf.Key(i)
prev := b.curBuf.Key(i - targetSize)
log.VEventf(ctx, 1, "%s adder pre-splitting at key %d of %d at %s", b.name, i, b.curBuf.Len(), k)
resp, err := b.sink.db.SplitAndScatter(ctx, k, hour, prev)
width := len(b.curBuf.entries) / b.initialSplits
for i := 0; i < b.initialSplits; i++ {
expire := hour
if i == 0 {
// If we over-split because our input is loosely ordered and we're just
// seeing a sample of the first span here vs a sample of all of it, then
// we may not fill enough for these splits to remain on their own. In that
// case we'd really prefer the other splits be merged away first rather
// than the first split, as it serves the important purpose of segregating
// this processor's span from the one below it when is being constantly
// re-scattered by that processor, so give the first split an extra hour.
expire = hour.Add(time.Hour.Nanoseconds(), 0)
}

splitAt := i * width
if splitAt >= len(b.curBuf.entries) {
break
}
// Typically we split at splitAt if, and only if, its range still includes
// the prior split, indicating no other processor is also splitting this
// span. However, for the first split, there is no prior split, so we can
// use the next split instead, as it too proves the range that need to be
// split still has enough "width" (i.e. between two splits) to indicate that
// another processor hasn't already split it.
predicateAt := splitAt - width
if predicateAt < 0 {
next := splitAt + width
if next > len(b.curBuf.entries)-1 {
next = len(b.curBuf.entries) - 1
}
predicateAt = next
}
splitKey, err := keys.EnsureSafeSplitKey(b.curBuf.Key(splitAt))
if err != nil {
log.Warningf(ctx, "failed to generate pre-split key for key %s", b.curBuf.Key(splitAt))
continue
}
predicateKey := b.curBuf.Key(predicateAt)
log.VEventf(ctx, 1, "pre-splitting span %d of %d at %s", i, b.initialSplits, splitKey)
resp, err := b.sink.db.SplitAndScatter(ctx, splitKey, expire, predicateKey)
if err != nil {
// TODO(dt): a typed error would be nice here.
if strings.Contains(err.Error(), "predicate") {
log.VEventf(ctx, 1, "%s adder split at %s rejected, had previously split and no longer included %s", b.name, k, prev)
log.VEventf(ctx, 1, "%s adder split at %s rejected, had previously split and no longer included %s",
b.name, splitKey, predicateKey)
continue
}
return err
}

b.sink.flushCounts.splitWait += resp.Timing.Split
b.sink.flushCounts.scatterWait += resp.Timing.Scatter
if resp.ScatteredStats != nil {
moved := sz(resp.ScatteredStats.Total())
b.sink.flushCounts.scatterMoved += moved
if resp.ScatteredStats.Total() > 0 {
log.VEventf(ctx, 1, "pre-split scattered %s in non-empty range %s", moved, resp.ScatteredSpan)
log.VEventf(ctx, 1, "pre-split scattered %s in non-empty range %s",
moved, resp.ScatteredSpan)
}
}
created++
}

log.Infof(ctx, "%s adder created %d initial splits in %v from %d keys in %s buffer",
b.name, created, timing(timeutil.Since(before)), b.curBuf.Len(), b.curBuf.MemSize)

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/hash_sharded_index
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ t_hash_pre_split 139 t_hash_pre_split_idx_b /Table/139/2/3 /Table/139/2
t_hash_pre_split 139 t_hash_pre_split_idx_b /Table/139/2/4 /Table/139/2/5
t_hash_pre_split 139 t_hash_pre_split_idx_b /Table/139/2/5 /Table/139/2/6
t_hash_pre_split 139 t_hash_pre_split_idx_b /Table/139/2/6 /Table/139/2/7
t_hash_pre_split 139 t_hash_pre_split_idx_b /Table/139/2/7 /Max
t_hash_pre_split 139 t_hash_pre_split_idx_b /Table/139/2/7 /Table/139/3/0

subtest test_default_bucket_count

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2985,9 +2985,9 @@ func (sc *SchemaChanger) shouldSplitAndScatter(
return false
}

if m.Adding() && idx.IsSharded() && !idx.IsTemporaryIndexForBackfill() {
if m.Adding() && idx.IsSharded() {
if sc.mvccCompliantAddIndex {
return m.Backfilling()
return m.Backfilling() || (idx.IsTemporaryIndexForBackfill() && m.DeleteOnly())
}
return m.DeleteOnly()
}
Expand Down
34 changes: 20 additions & 14 deletions pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7484,8 +7484,8 @@ func TestHashShardedIndexRangePreSplit(t *testing.T) {
defer log.Scope(t).Close(t)
ctx := context.Background()

getShardedIndexRanges := func(tableDesc *tabledesc.Mutable, kvDB *kv.DB, codec keys.SQLCodec) ([]kv.KeyValue, error) {
indexSpan := tableDesc.IndexSpan(codec, descpb.IndexID(2))
getShardedIndexRanges := func(tableDesc *tabledesc.Mutable, kvDB *kv.DB, codec keys.SQLCodec, indexID descpb.IndexID) ([]kv.KeyValue, error) {
indexSpan := tableDesc.IndexSpan(codec, indexID)
ranges, err := kvDB.Scan(
ctx,
keys.RangeMetaKey(keys.MustAddr(indexSpan.Key)),
Expand Down Expand Up @@ -7524,23 +7524,29 @@ CREATE TABLE t.test_split(a INT PRIMARY KEY, b INT NOT NULL);
)

runBeforePreSplitting = func(tableDesc *tabledesc.Mutable, kvDB *kv.DB, codec keys.SQLCodec) error {
ranges, err := getShardedIndexRanges(tableDesc, kvDB, codec)
if err != nil {
return err
}
if len(ranges) != 0 {
return errors.Newf("expected 0 ranges but found %d", len(ranges))
// 2 is the id for the new index
// 3 is the id for temp index for backfilling
for id := range []int{2, 3} {
ranges, err := getShardedIndexRanges(tableDesc, kvDB, codec, descpb.IndexID(id))
if err != nil {
return err
}
if len(ranges) != 0 {
return errors.Newf("expected 0 ranges but found %d", len(ranges))
}
}
return nil
}

runAfterPreSplitting = func(tableDesc *tabledesc.Mutable, kvDB *kv.DB, codec keys.SQLCodec) error {
ranges, err := getShardedIndexRanges(tableDesc, kvDB, codec)
if err != nil {
return err
}
if len(ranges) != 8 {
return errors.Newf("expected 8 ranges but found %d", len(ranges))
for _, id := range []int{2, 3} {
ranges, err := getShardedIndexRanges(tableDesc, kvDB, codec, descpb.IndexID(id))
if err != nil {
return err
}
if len(ranges) != 8 {
return errors.Newf("expected 8 ranges but found %d", len(ranges))
}
}
return nil
}
Expand Down
Loading

0 comments on commit e7e596e

Please sign in to comment.