Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streamingccl: split destination based on source #111178

Merged
merged 1 commit into from
Sep 30, 2023

Conversation

stevendanna
Copy link
Collaborator

This adds initial splits and scatters to the stream ingestion job. We split based on the topology delivered by the source to align our initial splits with the splits that are likely coming in from the source cluster.

These splits substantially improve the throughput of a stream's initial scan.

Careful reviewers may note that we are not calling EnsureSafeSplitKey here. That is because the Spans in the source topology should already be safe split keys. Since EnsureSafeSplitKey isn't idempotent, if we were ta call EnsureSafeSplitKey, we would end up creating splits at rather poor split locations since any key that ends in an integer would have components erroneously trimmed from it.

An alternative here would be to do what the buffering adder does and create an initial set of splits by distributing them over the keyspace of the first buffer. That has the advantage of allowing us to call EnsureSafeSplitKey at the cost of less effective splits.

Note that we face this same problem during bulk operations. Perhaps in the future the source cluster should send some metadata about manual splits that are issued or something that lets the destination know about the fact that we expect a large amount of data in a particular span.

Epic: none

Release note: None

@stevendanna stevendanna requested review from a team as code owners September 24, 2023 16:55
@stevendanna stevendanna requested review from lidorcarmel and removed request for a team September 24, 2023 16:55
@cockroach-teamcity
Copy link
Member

This change is Reviewable

if err := ingestionJob.NoTxn().Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
md.Progress.GetStreamIngest().ReplicationStatus = jobspb.Replicating
md.Progress.GetStreamIngest().InitialSplitComplete = true
md.Progress.RunningStatus = "physical replication running"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could splits take a while? Does it make sense to update progress to indicate that "replication is running", then update it again once split concluded?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable

// get erroneously treated as the column family length.
//
// Since the partitions are generated from a call to
// PartitionSpans on the source cluster, they should be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it makes sense for the source cluster to create an iterator and read then key (SeekGE) that's at or after the split point? This could guarantee that the explicit split keys are valid.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure I'm following.

This would look like taking the output of PartitionSpans. Then iterate over the results. For each start key of the spans returned by PartitionSpans, we SeekGE to that key and return the first key rather than the split point generated by PartitionSpan.

The pro here is that the key returned is definitely going to be a SQL key that we can run back through EnsureSafeSplitKey.

The cons here would be the bit of added complexity on the source planning side and also that, at least for some time after the initial scan, on the destination you'd have splits that were just a little off from the source. So if a lot of data gets put "in between" the start according to partition spans and the actual start key, that data would now go to the lhs range. During the initial scan this shouldn't matter because we do our scan at a fixed timestamp that should be rather close to when we produced this original plan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup; that about sums it up. I was just trying to come up w/ a way to avoid this "scary" warning you wrote re not calling ensure safe key.

func splitAndScatter(
ctx context.Context, splitAndScatterKey roachpb.Key, s splitAndScatterer,
) error {
log.Infof(ctx, "splitting and scattering at %s", splitAndScatterKey)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to vmodule this?

ctx context.Context, splitAndScatterKey roachpb.Key, s splitAndScatterer,
) error {
log.Infof(ctx, "splitting and scattering at %s", splitAndScatterKey)
expirationTime := s.now().AddDuration(splitAndScatterSitckBitDuration)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: misspelling in StickyBit

pkg/jobs/jobspb/jobs.proto Show resolved Hide resolved
This adds initial splits and scatters to the stream ingestion job.  We
split based on the topology delivered by the source to align our
initial splits with the splits that are likely coming in from the
source cluster.

These splits substantially improve the throughput of a stream's
initial scan.

Careful reviewers may note that we are not calling EnsureSafeSplitKey
here.  That is because the Spans in the source topology should already
be safe split keys.  Since EnsureSafeSplitKey isn't idempotent, if we
were ta call EnsureSafeSplitKey, we would end up creating splits at
rather poor split locations since any key that ends in an integer
would have components erroneously trimmed from it.

An alternative here would be to do what the buffering adder does and
create an initial set of splits by distributing them over the keyspace
of the first buffer. That has the advantage of allowing us to call
EnsureSafeSplitKey at the cost of less effective splits.

Note that we face this same problem during bulk operations.  Perhaps
in the future the source cluster should send some metadata about
manual splits that are issued or _something_ that lets the destination
know about the fact that we expect a large amount of data in a
particular span.

Epic: none

Release note: None
@stevendanna
Copy link
Collaborator Author

bors r=adityamaru

@craig
Copy link
Contributor

craig bot commented Sep 30, 2023

Build succeeded:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants