-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pkg: Add and integrate BulkRowWriter processor into CTAS execution. #38374
Conversation
420c63f
to
4467d36
Compare
832663f
to
ccd06b3
Compare
@@ -134,7 +134,7 @@ statement ok | |||
SET tracing = on,kv,results; UPDATE t.kv2 SET v = v + 2; SET tracing = off | |||
|
|||
query TT | |||
SELECT operation, regexp_replace(message, '(\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d\.)?\d\d\d\d\d+', '...PK...') as message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the row converter changes the way row_id's are generated for hidden PK's. They are generated using GenerateUniqueID
which required this change in the logic test.
ccd06b3
to
8c52887
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 10 of 10 files at r1, 7 of 7 files at r2, 7 of 11 files at r3.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru27, @dt, and @lucy-zhang)
pkg/sql/create_table.go, line 1652 at r3 (raw file):
} func newBulkRowWriterProcessor(
for better or worse, most of the time we define processors in distsqlrun
and only use the function-pointer injection route for the ones that have to be defined elsewhere due to hard rules about dependency edges, like OSS vs CCL. Is there anything motivating putting this in sql
instead of distsqlrun
? Anything it depends on in sql
?
in either case, I'd pull it into own file.
pkg/sql/create_table.go, line 1744 at r3 (raw file):
g = ctxgroup.WithContext(ctx) g.GoCtx(func(ctx context.Context) error {
is sp.out threadsafe?
2¢: I might find this easier to follow if we
a) pull the first half of this inline func into a private method on the processor which also inlines ingestKVs.
b) I'd probably have that method just set a BulkOpSummary field in the processor when it is done, instead of emitting directly.
c) I'd then serialize and emit the row part done in Run
, after calling g.Wait (so you don't have to worry about a race).
pkg/sql/create_table.go, line 1778 at r3 (raw file):
}) for {
right now if the ingesting goroutine gets an error, I think this will keep pushing rows at it and might deadlock on the channel. I think you might want to also run this loop in a task on the error group so that its context is hooked up to cancel on error return (and the other way too -- if this errors, we should teardown the ingesting goroutine too).
pkg/sql/distsqlrun/processors.go, line 1244 at r2 (raw file):
var NewCSVWriterProcessor func(*FlowCtx, int32, distsqlpb.CSVWriterSpec, RowSource, RowReceiver) (Processor, error) // NewBulkRowWriterProcessor is externally implemented.
This is usually done to get around CCL vs OSS, where we inject the proc via this hook and an init func. This is a pure OSS proc, so I don't think we need that song and dance.
8c52887
to
0c50c53
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru27, @dt, and @lucy-zhang)
pkg/sql/create_table.go, line 1744 at r3 (raw file):
Previously, dt (David Taylor) wrote…
is sp.out threadsafe?
2¢: I might find this easier to follow if we
a) pull the first half of this inline func into a private method on the processor which also inlines ingestKVs.
b) I'd probably have that method just set a BulkOpSummary field in the processor when it is done, instead of emitting directly.
c) I'd then serialize and emit the row part done inRun
, after calling g.Wait (so you don't have to worry about a race).
Done.
pkg/sql/create_table.go, line 1778 at r3 (raw file):
Previously, dt (David Taylor) wrote…
right now if the ingesting goroutine gets an error, I think this will keep pushing rows at it and might deadlock on the channel. I think you might want to also run this loop in a task on the error group so that its context is hooked up to cancel on error return (and the other way too -- if this errors, we should teardown the ingesting goroutine too).
thats a cool catch, good to know. Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not very familiar with this part of the code, but it LGTM
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru27, @dt, and @lucy-zhang)
pkg/sql/bulk_row_writer.go, line 82 at r4 (raw file):
// ingestKvs drains kvs from the channel until it closes, ingesting them using // the BulkAdder. It handles the required buffering/sorting/etc. ingestKvs := func() error {
Is there a difference between defining this function and then calling it immediately afterward, and just replacing it by the function definition?
I don't think so, I just did it to modularize the kv ingestion logic. However, I just realized that this pattern is used for places where we reuse a method several times. Should I pull out the definition? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 11 files at r3, 1 of 2 files at r4.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru27)
pkg/sql/bulk_row_writer.go, line 70 at r4 (raw file):
} func (sp *bulkRowWriter) doRun(ctx context.Context, kvCh chan []roachpb.KeyValue) error {
nit: I'd call this method ingestLoop
or something descriptive since it doesn't really run the whole process.
That said, I think you're also on the right track going with the pattern of a doRun
method that has an error return and can thus have early returns on error, since you can then just call it in Run
and cleanup some of the named errors and conditionals deciding what to return.
pkg/sql/bulk_row_writer.go, line 118 at r4 (raw file):
var kvCh chan []roachpb.KeyValue var g ctxgroup.Group err := func() error {
This is a lot of inline anon function along with the added layer below for the GoCtx call.
In my experience, big inline funcs, particularly closing over each other, can make it tricky to see what vars closed over vs local, what is shadowed, etc. and can sometimes lead to subtle bugs, particularly if anyone else tries to make changes later.
My 2¢: I'd move most of this, i.e. what you want run on another goroutine, to a second private method, and then yourRun
(or doRun helper) looks like
<setup code>
g := ...
g.GoCtx(func(ctx) error { return ingestLoop(ctx, kvCh, ...) } )
g.GoCtx(func(ctx) error { return convertLoop(ctx, input, kvCh, ...)} )
g.Wait()
6125663
to
9774f70
Compare
This change introduces a BulkRowWriter processor which uses the AddSSTable method to write rows from a RowSource to the target table. The processor is required to make the CTAS statement scalable. Release note: None
This change integrates the BulkRowWriter into the distsql PlanAndRun phase for CTAS statements. Release Note: None
9774f70
to
3e1f7f7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru27 and @dt)
pkg/sql/create_table.go, line 1652 at r3 (raw file):
Previously, dt (David Taylor) wrote…
for better or worse, most of the time we define processors in
distsqlrun
and only use the function-pointer injection route for the ones that have to be defined elsewhere due to hard rules about dependency edges, like OSS vs CCL. Is there anything motivating putting this insql
instead ofdistsqlrun
? Anything it depends on insql
?in either case, I'd pull it into own file.
Done.
pkg/sql/distsqlrun/processors.go, line 1244 at r2 (raw file):
Previously, dt (David Taylor) wrote…
This is usually done to get around CCL vs OSS, where we inject the proc via this hook and an init func. This is a pure OSS proc, so I don't think we need that song and dance.
Done.
pkg/sql/bulk_row_writer.go, line 70 at r4 (raw file):
Previously, dt (David Taylor) wrote…
nit: I'd call this method
ingestLoop
or something descriptive since it doesn't really run the whole process.That said, I think you're also on the right track going with the pattern of a
doRun
method that has an error return and can thus have early returns on error, since you can then just call it inRun
and cleanup some of the named errors and conditionals deciding what to return.
Done.
pkg/sql/bulk_row_writer.go, line 118 at r4 (raw file):
Previously, dt (David Taylor) wrote…
This is a lot of inline anon function along with the added layer below for the GoCtx call.
In my experience, big inline funcs, particularly closing over each other, can make it tricky to see what vars closed over vs local, what is shadowed, etc. and can sometimes lead to subtle bugs, particularly if anyone else tries to make changes later.
My 2¢: I'd move most of this, i.e. what you want run on another goroutine, to a second private method, and then your
Run
(or doRun helper) looks like<setup code> g := ... g.GoCtx(func(ctx) error { return ingestLoop(ctx, kvCh, ...) } ) g.GoCtx(func(ctx) error { return convertLoop(ctx, input, kvCh, ...)} ) g.Wait()
Done.
TFTR |
bors r+ |
38374: pkg: Add and integrate BulkRowWriter processor into CTAS execution. r=adityamaru27 a=adityamaru27 This change introduces a BulkRowWriter processor which uses AddSSTable to write rows from a RowSource to the target table. This is in an attempt to make the CREATE TABLE ... AS statement more scalable. Co-authored-by: Aditya Maru <[email protected]>
Build succeeded |
This change introduces a BulkRowWriter processor which uses
AddSSTable to write rows from a RowSource to the target
table. This is in an attempt to make the CREATE TABLE ... AS
statement more scalable.