-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sql: cleanup fetchers #83010
sql: cleanup fetchers #83010
Conversation
f13171d
to
f4f984b
Compare
This was inspired by Michael's suggestion. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changefeed changes are 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice cleanup!
Reviewed 20 of 20 files at r1, 29 of 29 files at r2.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @michae2, @stevendanna, and @yuzefovich)
pkg/sql/row/fetcher.go
line 391 at r2 (raw file):
// SetTxn updates the Fetcher to use the provided txn. func (rf *Fetcher) SetTxn(txn *kv.Txn) { rf.setTxnAndSendFn(txn, makeKVBatchFetcherDefaultSendFunc(txn))
What if there was already a sendFn set? Wouldn't this overwrite it?
pkg/sql/row/kv_batch_streamer.go
line 83 at r2 (raw file):
) error { if bytesLimit != rowinfra.NoBytesLimit { return errors.AssertionFailedf("unexpectedly non-zero bytes limit for txnKVStreamer")
nit: s/unexpectedly/unexpected
pkg/sql/row/kv_fetcher.go
line 272 at r2 (raw file):
// SetupNextFetch overrides the same method from the wrapped KVBatchFetcher in // order reset this KVFetcher.
nit: s/in order/in order to
f4f984b
to
238d479
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @michae2, @rharding6373, and @stevendanna)
pkg/sql/row/fetcher.go
line 391 at r2 (raw file):
Previously, rharding6373 (Rachael Harding) wrote…
What if there was already a sendFn set? Wouldn't this overwrite it?
Yes, this would overwrite it, added some commentary.
This method is exposed to support the column backfill use case where a single row.Fetcher
object is reused many times throughout the column backfill operation which is broken down into chunks, and each chunk is processed under a new txn. The previous usage pattern of creating new row.txnKVFetcher
s for each chunk was doing essentially this - creating a new sendFn
for each chunk.
This commit removes a couple of arguments (`traceKV`, `forceProductionBatchSize`) from `StartScan*` fetcher methods in favor of passing them on `Init`. Additionally, several fields are removed from `row.Fetcher` in favor of accessing the args struct directly. The only meaningful change is that now we correctly propagate `traceKV` flag in the column backfiller code path when it is set up in a distributed case. Release note: None
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for doing this. It turned out really, really nice. I was not expecting all the unifications of streamer and non-streamer code!
I have a few nits. Almost done reading, I will finish tonight.
Reviewed 20 of 20 files at r1, 14 of 29 files at r2, 3 of 3 files at r3, all commit messages.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @rharding6373, @stevendanna, and @yuzefovich)
pkg/sql/row/fetcher.go
line 227 at r3 (raw file):
// WillUseCustomKVFetcher, if true, indicates that the caller will only use // StartScanFrom() method and will be providing its own KVFetcher. WillUseCustomKVFetcher bool
nit: Maybe this is pedantic, but wouldn't WillUseCustomKVBatchFetcher
be more accurate? And likewise, shouldn't the comment say "its own KVBatchFetcher" instead of "its own KVFetcher"?
pkg/sql/row/fetcher.go
line 403 at r3 (raw file):
// setTxnAndSendFn peeks inside of the KVFetcher to update the underlying // txnKVFetcher with the new txn and sendFn. func (rf *Fetcher) setTxnAndSendFn(txn *kv.Txn, sendFn sendFunc) {
Could we do this in a txnKVFetcher
method rather than here? I know it's a pain to plumb another method call down to KVFetcher
/ KVBatchFetcher
/ txnKVFetcher
, but reaching into the txnKVFetcher
like this this seems like a layer violation.
Or maybe a compromise would be: we put the changes to txnKVFetcher
in a txnKVFetcher
method, but then we peek inside kvFetcher.KVBatchFetcher.(*txnKVFetcher)
to call the method instead of adding the method to KVBatchFetcher
and KVFetcher
? That at least shrinks the surface area of the layer violation slightly. Would that be reasonable?
pkg/sql/row/fetcher.go
line 500 at r3 (raw file):
) error { if rf.args.StreamingKVFetcher != nil { return errors.AssertionFailedf("StartInconsistentScan is called instead of StartScanFrom")
nit: Should this say "... instead of StartScan" instead of "... instead of StartScanFrom"?
pkg/sql/row/kv_fetcher.go
line 138 at r3 (raw file):
diskBuffer kvstreamer.ResultDiskBuffer, ) *KVFetcher { streamer := kvstreamer.NewStreamer(
Nice that you can push this lower!
238d479
to
f7fe9bd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, I'm quite happy with this refactor, thanks for the idea!
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @michae2, @rharding6373, and @stevendanna)
pkg/sql/row/fetcher.go
line 227 at r3 (raw file):
Previously, michae2 (Michael Erickson) wrote…
nit: Maybe this is pedantic, but wouldn't
WillUseCustomKVBatchFetcher
be more accurate? And likewise, shouldn't the comment say "its own KVBatchFetcher" instead of "its own KVFetcher"?
You're right, fixed. I guess I wanted to make things shorter, but it seems better to be precise here.
pkg/sql/row/fetcher.go
line 403 at r3 (raw file):
Previously, michae2 (Michael Erickson) wrote…
Could we do this in a
txnKVFetcher
method rather than here? I know it's a pain to plumb another method call down toKVFetcher
/KVBatchFetcher
/txnKVFetcher
, but reaching into thetxnKVFetcher
like this this seems like a layer violation.Or maybe a compromise would be: we put the changes to
txnKVFetcher
in atxnKVFetcher
method, but then we peek insidekvFetcher.KVBatchFetcher.(*txnKVFetcher)
to call the method instead of adding the method toKVBatchFetcher
andKVFetcher
? That at least shrinks the surface area of the layer violation slightly. Would that be reasonable?
I agree that this is rather hacky, and I like your suggestion, done. I also added some assertions to make things more explicit (in the future, we might need to do a similar thing for txnKVStreamer
, but for now we only need to support the updating of the txn on txnKVFetcher
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 29 of 29 files at r4, 29 of 29 files at r5, all commit messages.
Reviewable status: complete! 1 of 0 LGTMs obtained (waiting on @rharding6373 and @stevendanna)
Previously, the lifecycle of different fetcher objects was a mess. Consider the sequence of fetchers when used by the join reader with the old non-streamer code path: `rowexec.joinReader` -> `row.Fetcher` -> `row.KVFetcher` -> `row.txnKVFetcher`. `row.Fetcher` was initialized once, but then on every call to `StartScan`, we would create a new `row.txnKVFetcher` and then wrap it with a new `row.KVFetcher` (during an internal `StartScanFrom` call). In other words, throughout the lifetime of the join reader, its fetcher would create a new pair of objects for each input row batch. This setup is very unintuitive and previously led to some bugs with memory accounting. I believe such a setup was created organically, without giving too much thought to it. Some considerations should be pointed out: - in some cases, we have some state from the previous fetch that we want to discard - in some cases, we provide the `row.Fetcher` with a custom `KVBatchFetcher` implementation. This commit refactors all of this stuff to make it much more sane. In particular, we now only create a single `row.KVFetcher` object that is powered by a single `row.txnKVFetcher` or `row.txnKVStreamer` implementation throughout the whole lifetime of `row.Fetcher`. In the main code path, the callers are now expected to only use `StartScan` method which correctly discards unnecessary state from the previous call. This is achieved by adding a new method to `KVBatchFetcher` interface. This commit supports the use case with custom `KVBatchFetcher`s too by asking the caller to explicitly specify a knob during the initialization of the `row.Fetcher` - in such case, only `StartScanFrom` calls are allowed. There, we still close the `KVBatchFetcher` from the previous call (tbh I believe this is not necessary since these custom `KVBatchFetcher`s don't have anything to clean up, but it's probably safer to keep the old behavior here). Furthermore, this commit pushes some arguments from `StartScan` into `Init` - most notably the txn is now passed only once. However, there are some use cases (like a column backfill, done in chunks) where the txn might change throughout the lifetime of the fetcher - we allow updating it later if needed. This also allows us to unify the streamer and the non-streamer code paths - to remove some of the duplicated code as well as push the usage of the streamer lower in the stack. Release note: None
f7fe9bd
to
e7e724e
Compare
@rharding6373 do you wanna take another look? |
All feedback has been addressed.
I'm assuming that Rachael is on board with this change. Thanks for the reviews! bors r+ |
Build succeeded: |
sql: clean up Fetcher interfaces a bit
This commit removes a couple of arguments (
traceKV
,forceProductionBatchSize
) fromStartScan*
fetcher methods infavor of passing them on
Init
. Additionally, several fields areremoved from
row.Fetcher
in favor of accessing the args structdirectly.
The only meaningful change is that now we correctly propagate
traceKV
flag in the column backfiller code path when it is set up in
a distributed case.
Release note: None
sql: clean up the lifecycle of fetchers
Previously, the lifecycle of different fetcher objects was a mess.
Consider the sequence of fetchers when used by the join reader with the
old non-streamer code path:
rowexec.joinReader
->row.Fetcher
->row.KVFetcher
->row.txnKVFetcher
.row.Fetcher
was initialized once, but then on every call toStartScan
, we would create a newrow.txnKVFetcher
and then wrap itwith a new
row.KVFetcher
(during an internalStartScanFrom
call). Inother words, throughout the lifetime of the join reader, its fetcher
would create a new pair of objects for each input row batch. This setup
is very unintuitive and previously led to some bugs with memory
accounting.
I believe such a setup was created organically, without giving too much
thought to it. Some considerations should be pointed out:
to discard
row.Fetcher
with a customKVBatchFetcher
implementation.This commit refactors all of this stuff to make it much more sane. In
particular, we now only create a single
row.KVFetcher
object that ispowered by a single
row.txnKVFetcher
orrow.txnKVStreamer
implementation throughout the whole lifetime of
row.Fetcher
. In themain code path, the callers are now expected to only use
StartScan
method which correctly discards unnecessary state from the previous
call. This is achieved by adding a new method to
KVBatchFetcher
interface.
This commit supports the use case with custom
KVBatchFetcher
s too byasking the caller to explicitly specify a knob during the initialization
of the
row.Fetcher
- in such case, onlyStartScanFrom
calls areallowed. There, we still close the
KVBatchFetcher
from the previouscall (tbh I believe this is not necessary since these custom
KVBatchFetcher
s don't have anything to clean up, but it's probablysafer to keep the old behavior here).
Furthermore, this commit pushes some arguments from
StartScan
intoInit
- most notably the txn is now passed only once. However, thereare some use cases (like a column backfill, done in chunks) where the
txn might change throughout the lifetime of the fetcher - we allow
updating it later if needed.
This also allows us to unify the streamer and the non-streamer
code paths - to remove some of the duplicated code as well as push the
usage of the streamer lower in the stack.
Release note: None