-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
streamingccl: add ingestion job framework #58373
Conversation
Rough WIP to ensure that the PR doesn't become hard to review. I have left some TODOs in the code and would be happy to receive any comments on them. |
229bcfa
to
2dc44c7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me, I left a few small casting nits that rely on changes to the stream client PR. I'll ping here when that is updated for a rebase.
pkg/jobs/jobspb/jobs.proto
Outdated
message StreamIngestionDetails { | ||
// StreamAddress is the location of the stream which the ingestion job will | ||
// read from. | ||
string stream_address = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
casttype
to a streamclient.StreamAddress
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
@@ -134,6 +134,10 @@ message ReadImportDataSpec { | |||
// NEXTID: 16 | |||
} | |||
|
|||
message StreamIngestionDataSpec { | |||
map<int32,string> partition_address = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can also add [(gogoproto.castvalue) = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient.PartitionAddress"];
on this line to get this map to be typed a bit stricter. I also think we can castkey
to a PartitionID
type that I can add to the streamclient
package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed it to a repeated field as that is what the ingestion processor PR changed it to.
2dc44c7
to
da56c22
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru, @dt, and @pbardea)
pkg/ccl/streamingccl/streamclient/mock_stream_client.go, line 19 at r4 (raw file):
// NewMockStreamClient returns a new mock stream client. func NewMockStreamClient() *MockStreamClient {
What do you think of renaming this to something like just client
and making it the real stream client that just isn't yet implemented? (I also don't think it needs to be exported?)
Then this can just return the interface.
da56c22
to
6133c3d
Compare
This change introduces a new StreamIngestionJob. It does not do much more than laying out the general outline of the job, which is very similar to other bulk jobs such as changefeed, backup etc. More precisely: - Introduces StreamIngestionDetails job details proto - Hooks up the dependancy to a mock stream client - Introduces a StreamIngestionProcessorSpec - Sets up a simple DistSQL flow which round robin assigns the partitions to the processors. Most notable TODOs in job land which will be addressed in follow up PRs: - StreamIngestionPlanHook to create this job. Will involve figuring out SQL syntax. - Introducing a ts watermark in both the job and processors. This watermark will represent the lowest resolved ts which all processors have ingested till. Iron out semantics on job start and resumption. - Introducing a StreamIngestionFrontier processor which will slurp the results from the StreamIngestionProcessors, and use them to keep track of the minimum resolved ts across all processors. Release note: None
6133c3d
to
5797332
Compare
TFTR! bors r=pbardea |
Build failed: |
looks like a flake in bors r=pbardea |
Build succeeded: |
This change introduces a new StreamIngestionJob. It does not do much
more than laying out the general outline of the job, which is very
similar to other bulk jobs such as changefeed, backup etc.
More precisely:
to the processors.
Most notable TODOs in job land which will be addressed in follow up PRs:
SQL syntax.
have ingested till. Iron out semantics on job start and resumption.
results from the StreamIngestionProcessors, and use them to keep track
of the minimum resolved ts across all processors.
Fixes: #57399
Release note: None