Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bulk,kv: create initial splits in imports and backfills #74816

Merged
merged 4 commits into from
Feb 4, 2022

Conversation

dt
Copy link
Member

@dt dt commented Jan 13, 2022

It has been observed that during a backfill or import, the split-as-we-go behavior used by the sst batcher, where it only explicitly splits and scatters after sending some amount of data all to one range, does not send any splits in the steady state when importing out of order data: since it is out of order, after chunking it by destination range, no one chunk is large enough to be considered filling that range enough to cause us to send a split. Of course, the range may fill on its own, as many processors send it small chunks over and over and those add up, but no one processor knows when that happens, so we simply leave it up to the range to split itself when it is full as it would in any other heavy write-rate, non-bulk ingestion case.

However, we've seen this lead to hotspots during bulk ingestion. Until the range becomes full and decides to split, it can be a bottleneck as many processors send it data at once. Existing mechanisms for kv-driven load-based splitting are not tuned for ingestion workloads. When it does decide to split and rebalance, that is a more expensive operation, as we're now needing to read, move and ingest half of that full range that we just spend so much work sending data to and ingesting, all while still being sent more ingestion load.

Ideally, we'd prefer to split the span before we start ingesting, both to spread ingestion load over our available capacity better, and to reduce how much ingested data it will need to shuffle if we waited to split and scatter only once a range filled. By scattering first, we'll just ingest directly to the right place initially. However a challenge in doing this is that in many cases, we don't know what data we'll be ingesting until we start reading input and producing it -- it could be that we're working off of a key-ordered CSV, or it could be that we have a uniform random distribution. In some cases, such as an index backfill, we may be able to use some external information like a SQL statistics or a sampling scan of the table to derive a good partitioning, but in others, like IMPORTs or view materialization, we have no way to know what the data will be until we start producing it.

This change tries to do a bit better than not presplitting at all though, by instead using the first buffer's worth of data, if it was read out of sorted order, as if it is a representative sample of the rest of the data to come and then generating splits from it and splitting and scatter the target spans using those before proceeding to flush. If this buffer turns out not to be a good sample after all, this is no worse than before, where we pre-split not at all. If the input data was sorted, this step is unnecessary as we'll just split as we go, after each range we fill and scatter the empty remainder before moving to fill it, as we already do today.

Release note (performance improvement): IMPORTs and Index Creation attempt to better balance their expected data between nodes before ingesting data.

@dt dt requested review from andreimatei, nvanbenschoten, adityamaru and a team January 13, 2022 20:41
@dt dt requested a review from a team as a code owner January 13, 2022 20:41
@dt dt marked this pull request as draft January 13, 2022 20:42
@cockroach-teamcity
Copy link
Member

This change is Reviewable

@dt
Copy link
Member Author

dt commented Jan 13, 2022

This is a quick sketch based on conversation last night -- I haven't tested it at all but wanted to see if this looks like roughly what we were thinking.

@dt dt linked an issue Jan 13, 2022 that may be closed by this pull request
@shermanCRL shermanCRL removed the request for review from a team January 14, 2022 04:02
@dt
Copy link
Member Author

dt commented Jan 25, 2022

@nvanbenschoten what do you think about the conditional splits here?

@dt dt marked this pull request as ready for review January 26, 2022 22:21
@dt dt requested a review from a team January 26, 2022 22:21
@postamar postamar removed the request for review from a team January 27, 2022 02:41
Copy link
Member

@nvanbenschoten nvanbenschoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 1 files at r1, 7 of 7 files at r2, 2 of 2 files at r3, 6 of 6 files at r4, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru, @andreimatei, and @dt)


pkg/kv/bulk/buffering_adder.go, line 234 at r1 (raw file):

	beforeSort := timeutil.Now()

	if !b.sorted {

Should b.sorted be reset to true after this check?


pkg/kv/bulk/buffering_adder.go, line 215 at r2 (raw file):

}

// Flush flushes any buffered kvs to the batcher.

Looks like this comment is in the wrong place now.


pkg/kv/kvserver/kvserverbase/bulk_adder.go, line 77 at r2 (raw file):

	// InitialSplitsIfUnordered specifies a number of splits to make before the
	// first flush of the buffer if the contents of that buffer were unsorted.
	// Being unsorted suggests the remaining input is like unsorted as well and

s/like/likely/


pkg/roachpb/api.proto, line 767 at r3 (raw file):

  util.hlc.Timestamp expiration_time = 4 [(gogoproto.nullable) = false];

  // PredicateKeys specifies keys which if not contained within the range should

Did you consider making the predicate here be the range descriptor itself? So we would only split if the range's current descriptor is identical to the (optional) client-provided descriptor. I'd gravitate towards a solution like that because it feels more generally applicable and also has prior precedence with requests like AdminChangeReplicasRequest.

Copy link
Member Author

@dt dt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru, @andreimatei, @dt, and @nvanbenschoten)


pkg/kv/bulk/buffering_adder.go, line 234 at r1 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Should b.sorted be reset to true after this check?

Eh, it seemed to me that once we've seen an unsorted key, we can assume all future keys are unsorted and not even bother comparing them, and get better branch prediction in the tight loop?


pkg/roachpb/api.proto, line 767 at r3 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Did you consider making the predicate here be the range descriptor itself? So we would only split if the range's current descriptor is identical to the (optional) client-provided descriptor. I'd gravitate towards a solution like that because it feels more generally applicable and also has prior precedence with requests like AdminChangeReplicasRequest.

I didn't consider that.

I'm happy to defer to KV on the right KV API here, but naively, just thinking of what API I might find easiest to use: "split at 5 if you contain 1 and 10" captures the thing I care about: split if it is a certain width (1-10). And exact range descriptor requirement is maybe more than I really care about, which might matter both from a convince of calling it standpoint (now I need to go find a rangedesc) but also could now spuriously fail: say my range was 1-20. I think "if a single range is covering the width of 1-10, it should split at 5" and in the meantime, someone else says "if a single range is covering 11-20, it should split at 15". Even if they come along and split before me, I actually still think we should split because we still contain 1 and 10, even though my rangedescriptor that says 20 is stale.

Anyway, I'm happy to change it if you think it should be rangedesc, just wanted to lay out what I was thinking with the key inclusion predicate.

Copy link
Member Author

@dt dt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru, @andreimatei, @dt, and @nvanbenschoten)


pkg/roachpb/api.proto, line 767 at r3 (raw file):

Previously, dt (David Taylor) wrote…

I didn't consider that.

I'm happy to defer to KV on the right KV API here, but naively, just thinking of what API I might find easiest to use: "split at 5 if you contain 1 and 10" captures the thing I care about: split if it is a certain width (1-10). And exact range descriptor requirement is maybe more than I really care about, which might matter both from a convince of calling it standpoint (now I need to go find a rangedesc) but also could now spuriously fail: say my range was 1-20. I think "if a single range is covering the width of 1-10, it should split at 5" and in the meantime, someone else says "if a single range is covering 11-20, it should split at 15". Even if they come along and split before me, I actually still think we should split because we still contain 1 and 10, even though my rangedescriptor that says 20 is stale.

Anyway, I'm happy to change it if you think it should be rangedesc, just wanted to lay out what I was thinking with the key inclusion predicate.

Oh, and I sorta like the key inclusion predicate's simplicity if you only provide one, smaller, key, since then it is just saying "please make sure this key (split key) is not in the same range as that key (predicate[0])" without expressing to much about the kv internals/specifics, just that you want those keys in separate ranges.

Copy link
Member

@nvanbenschoten nvanbenschoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru, @andreimatei, and @dt)


pkg/kv/db.go, line 584 at r4 (raw file):

// SplitAndScatter is a helper that wraps AdminSplit + AdminScatter.
func (db *DB) SplitAndScatter(
	ctx context.Context, key roachpb.Key, expirationTime hlc.Timestamp, predicateKeys []roachpb.Key,

Should we make this variadic?


pkg/kv/bulk/buffering_adder.go, line 234 at r1 (raw file):

Previously, dt (David Taylor) wrote…

Eh, it seemed to me that once we've seen an unsorted key, we can assume all future keys are unsorted and not even bother comparing them, and get better branch prediction in the tight loop?

Seems reasonable. It's probably worth a comment either here or on the field then.


pkg/kv/bulk/buffering_adder.go, line 315 at r4 (raw file):

		log.VEventf(ctx, 1, "splitting at key %d / %d: %s", i, b.curBuf.Len(), k)
		if err := b.sink.db.SplitAndScatter(ctx, k, hour, []roachpb.Key{prev}); err != nil {
			if strings.Contains(err.Error(), "predicate") {

Was the plan to keep this string matching, or did we want to strongly type this error?


pkg/kv/kvserver/replica_command.go, line 312 at r4 (raw file):

	var reply roachpb.AdminSplitResponse

	for _, k := range args.PredicateKeys {

nit: move this down below the splitKey computation block and give this a comment.


pkg/roachpb/api.proto, line 767 at r3 (raw file):

Previously, dt (David Taylor) wrote…

Oh, and I sorta like the key inclusion predicate's simplicity if you only provide one, smaller, key, since then it is just saying "please make sure this key (split key) is not in the same range as that key (predicate[0])" without expressing to much about the kv internals/specifics, just that you want those keys in separate ranges.

I think what you said here makes sense. This API is already in terms of keys (split_key), so including some additional predicate keys feels consistent.


pkg/roachpb/api.proto, line 770 at r4 (raw file):

  // cause the split to be rejected. This can be used by a caller to effectively
  // send a "conditional split" request, i.e. a split if not already split.
  repeated bytes predicate_keys = 5 [(gogoproto.casttype) = "Key"];

Could we add a test for this behavior?

This changes bufferingAdder to track whether or not its input has been added
in sorted order so far. If so, it can avoid sorting it before flushing, spending
O(n) comparisons during additions to avoid O(nlogn) sort later. As soon as the
first unsorted key is added it stops checking, so the extra per-key overhead is
then just the boolean check.

Release note: none.
Copy link
Member Author

@dt dt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru, @andreimatei, and @nvanbenschoten)


pkg/kv/db.go, line 584 at r4 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Should we make this variadic?

Done.


pkg/kv/bulk/buffering_adder.go, line 234 at r1 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Seems reasonable. It's probably worth a comment either here or on the field then.

Done (put it in Reset which is where I'd have thought to look for it).


pkg/kv/bulk/buffering_adder.go, line 215 at r2 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Looks like this comment is in the wrong place now.

Done.


pkg/kv/bulk/buffering_adder.go, line 315 at r4 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Was the plan to keep this string matching, or did we want to strongly type this error?

Added a TODO.


pkg/kv/kvserver/replica_command.go, line 312 at r4 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

nit: move this down below the splitKey computation block and give this a comment.

Done.


pkg/kv/kvserver/kvserverbase/bulk_adder.go, line 77 at r2 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

s/like/likely/

Done


pkg/roachpb/api.proto, line 770 at r4 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Could we add a test for this behavior?

Yep, done (was just waiting to see if kv liked the proposed api before wrote a test against it).

Copy link
Member

@nvanbenschoten nvanbenschoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 14 files at r5, 6 of 7 files at r6, 1 of 2 files at r7, 7 of 7 files at r8, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru, @andreimatei, and @dt)


pkg/ccl/importccl/import_processor_planning.go, line 267 at r8 (raw file):

				UserProto:             user.EncodeProto(),
				DatabasePrimaryRegion: details.DatabasePrimaryRegion,
				InitialSplits:         int32(len(sqlInstanceIDs)),

Are we ok with this change? This is now the number of SQL pods in a serverless cluster, not the number of KV nodes, right?

Copy link
Member Author

@dt dt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru, @andreimatei, and @nvanbenschoten)


pkg/ccl/importccl/import_processor_planning.go, line 267 at r8 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Are we ok with this change? This is now the number of SQL pods in a serverless cluster, not the number of KV nodes, right?

I think this is correct. We should be pre-splitting according the the size of the tenant, not the shared storage cluster that has hundreds/thousands of other tenants on it too.

dt added 3 commits February 3, 2022 21:03
This adds an option to 'pre-split' before the first buffer is flushes when
bulk ingesting data, using the data of the first buffer to pick split points.

Release note: none.
The new PredicateKeys field on AdminSplitRequest can be used to specify keys
which must be contained within the target range for the split to proceed. This
allows sending a 'conditional split' which splits a range at the requested key
only if it has not already been split, for use when multiple processes may be
racing to split the same spans.

Release note: none.
@dt
Copy link
Member Author

dt commented Feb 4, 2022

TFTR!

bors r+

@craig
Copy link
Contributor

craig bot commented Feb 4, 2022

Build succeeded:

@craig craig bot merged commit a3c3403 into cockroachdb:master Feb 4, 2022
@dt dt deleted the flush branch February 4, 2022 14:06
dt added a commit to dt/cockroach that referenced this pull request Feb 4, 2022
In a last minute change to cockroachdb#74816 I switched from []predicate to ...predicate but unfortunately forgot to remove the nil arg in batcher. It still compiled but semantically went from predicates=[] to predicates=[nil], which is not at all the same.

Release note: none.
craig bot pushed a commit that referenced this pull request Feb 5, 2022
76082: kv/bulk: fix accidental minkey predicate r=dt a=dt

In a last minute change to #74816 I switched from []predicate to ...predicate but unfortunately forgot to remove the nil arg in batcher. It still compiled but semantically went from predicates=[] to predicates=[nil], which is not at all the same.

Release note: none.

Co-authored-by: David Taylor <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

bulk: pre-split more before ingesting unsorted data
3 participants