-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sql: fix race condition in internal executor #63010
sql: fix race condition in internal executor #63010
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 1 files at r1.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner)
pkg/sql/internal_result_channel.go, line 105 at r1 (raw file):
var firstErr error for { res, done, err := c.nextResult(context.TODO())
I started thinking that this context.TODO
might also be insufficient for correctness purposes. WDYT?
I wonder if I should change the signature to taking in ctx
for close
and rowsIterator.Close
in a separate PR.
pkg/sql/internal_result_channel.go, line 184 at r1 (raw file):
case res, ok := <-i.dataCh: if !ok { return ieIteratorResult{}, true, nil
What about this place?
I'm starting to think that possibly removing doneCh
and using context.CancelFunc
might be less error-prone. Thoughts?
010591f
to
2d3c4aa
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See how this change makes you feel.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @yuzefovich)
pkg/sql/internal_result_channel.go, line 105 at r1 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
I started thinking that this
context.TODO
might also be insufficient for correctness purposes. WDYT?I wonder if I should change the signature to taking in
ctx
forclose
androwsIterator.Close
in a separate PR.
We could. It'd be better but I'm not sure it's needed. I'm thinking that draining this all the way isn't great. I'm thinking that maybe the differences between the sync and async impls here are not justified.
pkg/sql/internal_result_channel.go, line 184 at r1 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
What about this place?
I'm starting to think that possibly removing
doneCh
and usingcontext.CancelFunc
might be less error-prone. Thoughts?
That wouldn't have saved us from this one. What would have saved us from this one is never closing the dataCh and having finish
do something different. The other thing I've done to save us is to deduplicate the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the unification, thanks!
Reviewed 2 of 2 files at r2.
Reviewable status: complete! 1 of 0 LGTMs obtained (waiting on @ajwerner)
pkg/sql/internal_result_channel.go, line 105 at r1 (raw file):
Previously, ajwerner wrote…
We could. It'd be better but I'm not sure it's needed. I'm thinking that draining this all the way isn't great. I'm thinking that maybe the differences between the sync and async impls here are not justified.
I think that draining is still needed for correctness. In the query execution we need to allow for all metadata (e.g. LeafTxnFinalState
) to be propagated throughout the flow to the gateway.
What we could improve is: instead of "draining" artificially once we have received enough rows - by letting the query run to completion - we would actually be draining properly - by making DistSQLReceiver
change its status
to DrainRequested
. A possible way to do that is checking whether our custom errIEResultChannelClosed
is returned from AddRow
with something like this:
diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go
index 5c2d8d5a3c..dd4c666904 100644
--- a/pkg/sql/distsql_running.go
+++ b/pkg/sql/distsql_running.go
@@ -724,32 +724,38 @@ func (r *DistSQLReceiver) Push(
r.tracing.TraceExecRowsResult(r.ctx, r.row)
// Note that AddRow accounts for the memory used by the Datums.
if commErr := r.resultWriter.AddRow(r.ctx, r.row); commErr != nil {
- // ErrLimitedResultClosed is not a real error, it is a
- // signal to stop distsql and return success to the client.
- if !errors.Is(commErr, ErrLimitedResultClosed) {
- // Set the error on the resultWriter too, for the convenience of some of the
- // clients. If clients don't care to differentiate between communication
- // errors and query execution errors, they can simply inspect
- // resultWriter.Err(). Also, this function itself doesn't care about the
- // distinction and just uses resultWriter.Err() to see if we're still
- // accepting results.
- r.resultWriter.SetError(commErr)
-
- // We don't need to shut down the connection
- // if there's a portal-related error. This is
- // definitely a layering violation, but is part
- // of some accepted technical debt (see comments on
- // sql/pgwire.limitedCommandResult.moreResultsNeeded).
- // Instead of changing the signature of AddRow, we have
- // a sentinel error that is handled specially here.
- if !errors.Is(commErr, ErrLimitedResultNotSupported) {
- r.commErr = commErr
+ if errors.Is(commErr, errIEResultChannelClosed) {
+ r.status = execinfra.DrainRequested
+ } else {
+ // ErrLimitedResultClosed is not a real error, it is a
+ // signal to stop distsql and return success to the client.
+ if !errors.Is(commErr, ErrLimitedResultClosed) {
+ // Set the error on the resultWriter too, for the convenience of some of the
+ // clients. If clients don't care to differentiate between communication
+ // errors and query execution errors, they can simply inspect
+ // resultWriter.Err(). Also, this function itself doesn't care about the
+ // distinction and just uses resultWriter.Err() to see if we're still
+ // accepting results.
+ r.resultWriter.SetError(commErr)
+
+ // We don't need to shut down the connection
+ // if there's a portal-related error. This is
+ // definitely a layering violation, but is part
+ // of some accepted technical debt (see comments on
+ // sql/pgwire.limitedCommandResult.moreResultsNeeded).
+ // Instead of changing the signature of AddRow, we have
+ // a sentinel error that is handled specially here.
+ if !errors.Is(commErr, ErrLimitedResultNotSupported) {
+ r.commErr = commErr
+ }
}
+ // TODO(andrei): We should drain here. Metadata from this query would be
+ // useful, particularly as it was likely a large query (since AddRow()
+ // above failed, presumably with an out-of-memory error).
+ r.status = execinfra.ConsumerClosed
}
- // TODO(andrei): We should drain here. Metadata from this query would be
- // useful, particularly as it was likely a large query (since AddRow()
- // above failed, presumably with an out-of-memory error).
- r.status = execinfra.ConsumerClosed
}
return r.status
}
pkg/sql/internal_result_channel.go, line 93 at r2 (raw file):
// doneCh is used to indicate that the ReadWriter has been closed. // doneCh is closed under the doneOnce. The doneCh is only used for the
nit: doneCh
is now used for both variants.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 1 of 0 LGTMs obtained (waiting on @ajwerner and @yuzefovich)
pkg/sql/internal_result_channel.go, line 105 at r1 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
I think that draining is still needed for correctness. In the query execution we need to allow for all metadata (e.g.
LeafTxnFinalState
) to be propagated throughout the flow to the gateway.What we could improve is: instead of "draining" artificially once we have received enough rows - by letting the query run to completion - we would actually be draining properly - by making
DistSQLReceiver
change itsstatus
toDrainRequested
. A possible way to do that is checking whether our customerrIEResultChannelClosed
is returned fromAddRow
with something like this:diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 5c2d8d5a3c..dd4c666904 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -724,32 +724,38 @@ func (r *DistSQLReceiver) Push( r.tracing.TraceExecRowsResult(r.ctx, r.row) // Note that AddRow accounts for the memory used by the Datums. if commErr := r.resultWriter.AddRow(r.ctx, r.row); commErr != nil { - // ErrLimitedResultClosed is not a real error, it is a - // signal to stop distsql and return success to the client. - if !errors.Is(commErr, ErrLimitedResultClosed) { - // Set the error on the resultWriter too, for the convenience of some of the - // clients. If clients don't care to differentiate between communication - // errors and query execution errors, they can simply inspect - // resultWriter.Err(). Also, this function itself doesn't care about the - // distinction and just uses resultWriter.Err() to see if we're still - // accepting results. - r.resultWriter.SetError(commErr) - - // We don't need to shut down the connection - // if there's a portal-related error. This is - // definitely a layering violation, but is part - // of some accepted technical debt (see comments on - // sql/pgwire.limitedCommandResult.moreResultsNeeded). - // Instead of changing the signature of AddRow, we have - // a sentinel error that is handled specially here. - if !errors.Is(commErr, ErrLimitedResultNotSupported) { - r.commErr = commErr + if errors.Is(commErr, errIEResultChannelClosed) { + r.status = execinfra.DrainRequested + } else { + // ErrLimitedResultClosed is not a real error, it is a + // signal to stop distsql and return success to the client. + if !errors.Is(commErr, ErrLimitedResultClosed) { + // Set the error on the resultWriter too, for the convenience of some of the + // clients. If clients don't care to differentiate between communication + // errors and query execution errors, they can simply inspect + // resultWriter.Err(). Also, this function itself doesn't care about the + // distinction and just uses resultWriter.Err() to see if we're still + // accepting results. + r.resultWriter.SetError(commErr) + + // We don't need to shut down the connection + // if there's a portal-related error. This is + // definitely a layering violation, but is part + // of some accepted technical debt (see comments on + // sql/pgwire.limitedCommandResult.moreResultsNeeded). + // Instead of changing the signature of AddRow, we have + // a sentinel error that is handled specially here. + if !errors.Is(commErr, ErrLimitedResultNotSupported) { + r.commErr = commErr + } } + // TODO(andrei): We should drain here. Metadata from this query would be + // useful, particularly as it was likely a large query (since AddRow() + // above failed, presumably with an out-of-memory error). + r.status = execinfra.ConsumerClosed } - // TODO(andrei): We should drain here. Metadata from this query would be - // useful, particularly as it was likely a large query (since AddRow() - // above failed, presumably with an out-of-memory error). - r.status = execinfra.ConsumerClosed } return r.status }
Hmm, are you implying then that what we have here is wrong? The main thing is that now the code will drain results that have already been sent but it will return an error on the sending side for subsequent additions. I can add code to node close the doneCh in async if that sounds right to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 1 of 0 LGTMs obtained (waiting on @ajwerner)
pkg/sql/internal_result_channel.go, line 105 at r1 (raw file):
Previously, ajwerner wrote…
Hmm, are you implying then that what we have here is wrong? The main thing is that now the code will drain results that have already been sent but it will return an error on the sending side for subsequent additions. I can add code to node close the doneCh in async if that sounds right to you.
Yes, I'm implying that. I originally started typing out that thought explicitly, then persuaded myself that we were doing the correct thing, but now I believe we're doing an incorrect thing.
Generally speaking, the queries that run via the internal executor could end up being distributed. If that's the case, we'll be using a leaf txn for the execution, and it is required for correctness that we propagate LeafTxnFinalState
metadata. You are correctly pointing out that by returning an error in addResult
, we will shut down the flow immediately, without properly draining the metadata.
I think before this PR we were doing the right thing in the async case but the wrong thing in the sync case, and with the current change we'll be doing the wrong thing in both cases. I now believe that something like the diff above is needed for correctness.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 1 of 0 LGTMs obtained (waiting on @ajwerner)
pkg/sql/internal_result_channel.go, line 105 at r1 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
Yes, I'm implying that. I originally started typing out that thought explicitly, then persuaded myself that we were doing the correct thing, but now I believe we're doing an incorrect thing.
Generally speaking, the queries that run via the internal executor could end up being distributed. If that's the case, we'll be using a leaf txn for the execution, and it is required for correctness that we propagate
LeafTxnFinalState
metadata. You are correctly pointing out that by returning an error inaddResult
, we will shut down the flow immediately, without properly draining the metadata.I think before this PR we were doing the right thing in the async case but the wrong thing in the sync case, and with the current change we'll be doing the wrong thing in both cases. I now believe that something like the diff above is needed for correctness.
My understanding is that if there is an error on the context, that indicates that the consumer gave up on the query execution (kinda like ROLLBACK), so it is ok to perform the "hard" shutdown using ConsumerClosed
status without draining the flow, but if we're returning errIEResultChannelClosed
error to signal to the producer that we don't need any more rows, we must transition into DrainRequested
state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you take that over the finish line?
Reviewable status: complete! 1 of 0 LGTMs obtained (waiting on @ajwerner)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course! I started thinking about this more, and I think we have a similar problem in the non-internal executor path too (#63032).
Reviewable status: complete! 1 of 0 LGTMs obtained (waiting on @ajwerner)
2d3c4aa
to
b606d7d
Compare
This seems better than what's on master. Maybe we should merge it to deflake tests? |
I agree that it's an improvement. I opened up a separate PR #63032 for that - which actually uncovered an issue with shutting down the changefeed processors. I think I'd rather skip the flaky test temporarily, take the time with #63032 (it is missing a test, and I don't see a good way to write it), and then merge this fix. |
The async and sync implementations were too close to justify two structs. Also, the async behavior of not stopping the writer in case the reader called close wasn't desireable. This commit unifies the implementation. It also ensures that we propagate context errors in all cases triggered by the closure of the done channel. It also makes closing the channel idempotent. Additionally, this commit transitions the execution flow into draining state without setting our customer error on the resultWriter. Release note: None
b606d7d
to
d8f85e2
Compare
Alright, #63032 landed, so I rebased on top of it and unskipped the flaky test. I'll merge once CI is green (assuming I have an implicit approval from Andrew). |
Alright, the build is green, thanks Andrew for working with me on this PR! bors r+ |
Build failed (retrying...): |
Build failed (retrying...): |
Build succeeded: |
The async and sync implementations were too close to justify two structs.
Also, the async behavior of not stopping the writer in case the reader
called close wasn't desireable. This commit unifies the implementation.
It also ensures that we propagate context errors in all cases triggered
by the closure of the done channel. It also makes closing the channel
idempotent.
Additionally, this commit transitions the execution flow into draining
state without setting our custom error on the resultWriter.
Fixes #62948.
Release note: None