-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sql/importer: more controlled shutdown during job cancellation #91615
sql/importer: more controlled shutdown during job cancellation #91615
Conversation
First commit is #91563 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks really good. Left mostly questions about docs.
pkg/sql/importer/import_processor.go
Outdated
select { | ||
case progCh <- prog: | ||
case <-ctx.Done(): | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we spoke about an idea where a remote processor would shut itself down if it can't push to the coordinator. That's not implemented in here, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The import processor already attempts to push a progress meta periodically. As part of that periodic push, we should see the downstream status, so I don't think we actually need an additional push loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but if the processor can't communicate the with the coordinator it will not receive a downstream status, i.e. a flowCtx cancellation, right? I thought we spoke about the need for the import processor to shut itself down if it has lost connection with corrdinator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not opposed to adding more if needed. But, I think the following is how the code stands currently:
- Every 10s, we push an update to the progress channel. That update will be available to the next caller of Next():
for prog := range idp.progCh { |
cockroach/pkg/sql/importer/import_processor.go
Lines 474 to 488 in a8b0cd9
g.GoCtx(func(ctx context.Context) error { | |
tick := time.NewTicker(time.Second * 10) | |
defer tick.Stop() | |
done := ctx.Done() | |
for { | |
select { | |
case <-done: | |
return ctx.Err() | |
case <-stopProgress: | |
return nil | |
case <-tick.C: | |
pushProgress() | |
} | |
} | |
}) |
- Next() is actually called in a loop in Run():
https://github.com/cockroachdb/cockroach/blob/master/pkg/sql/execinfra/base.go#L186-L204
- In this loop we Push() on our destination. I believe this will eventually make its way to the Outbox which is what sends data to the remote node. If that Outbox encounters an error because it can't talk to the remote node, it should result in our processor seeing both a context cancellation and a call to ConsumerClosed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this detailed explanation!! I took a closer look at the DistSQLReceiver.Push()
function, and I think i've convinced myself that it will handle communication errors correctly, at least on master:
- If the import processor attempts to send a row and the receiver's status is
NeedMoreRows
, we do seem to handle communication errors, and update the status to draing/consumer closed.
- when we push metadata however, i don't think communication errors are handled. Maybe this isn't a problem, as we're only sending metadata after the receiver's status is no longer
NeedMoreRows
.
I'd be curious to chat with yahor if he thinks previous versions of crdb handled comm errors like master does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, you mentioned this implementation only reduces the chances of a race. Do you have an understanding of what sorts of situations can slip past these guardrails?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spoke offline. Here's the scenario we're worried about:
t0: remote sends a progress update, with connection
t1: remote silently looses connection with coordinator
t2: remote sends an addsstable request, without realizing it has gone rogue
t3: coordinator sends cancel request, waits for X seconds but gets crickets from remote, and cancels the flowctx
t4: remote finally receives add sstable response, but damage is done, b/c on fail or cancel has begun
We're unsure if a remote addstable request timeout could solve this problem.
b4c6d79
to
87cbbe1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for answering all my questions!
@yuzefovich This is in part what we chatted about in Slack re processor shutdown. I wonder if you might have a few minutes to take a look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me and is already an improvement, but I do have one suggestion to consider.
Reviewed 2 of 4 files at r2, 4 of 4 files at r3, all commit messages.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @msbutler, @rhu713, and @stevendanna)
pkg/sql/importer/import_processor.go
line 497 at r2 (raw file):
Previously, msbutler (Michael Butler) wrote…
spoke offline. Here's the scenario we're worried about:
t0: remote sends a progress update, with connection
t1: remote silently looses connection with coordinator
t2: remote sends an addsstable request, without realizing it has gone rogue
t3: coordinator sends cancel request, waits for X seconds but gets crickets from remote, and cancels the flowctx
t4: remote finally receives add sstable response, but damage is done, b/c on fail or cancel has begunWe're unsure if a remote addstable request timeout could solve this problem.
There is probably some confusion about what "communication error" is. DistSQLReceiver.handleCommErr
is called only when we have issues communicating the results to the client (i.e. SQL CLI, ORM, etc). Communication errors "within" a distributed plan are handled differently - they eventually are pushed as metadata into DistSQLReceiver.pushMeta
where we call DistSQLReceiver.SetError
which will update the status of the receiver. In a scenario where the coordinator node loses the connection to one of the remote nodes, a communication error will be "generated" on the coordinator node in processInboundStreamHelper
and will be pushed as metadata into the DistSQLReceiver
too.
Just to spell things out a bit: when a remote node pushes a progress metadata object, eventually it'll arrive on the coordinator node in processProducerMessage
. If the status of the receiver has been changed to "drain requested" since the last update, then we send an explicit drain signal to the remote. Also, there haven't been any significant changes around this machinery for a while in case you're considering a backport.
The scenario you describe seems feasible. However, it would depend on exactly how "remote silently looses connection" - if the gRPC stream is broken, then the outbox listener goroutine would get an error in listenForDrainSignalFromConsumer
which would trigger the ungraceful shutdown of the whole flow.
One thing we could consider doing is rather than transitioning to DrainRequested
we'd go straight to ConsumerClosed
. DrainRequested
is propagated passively, on the next push of the metadata by the remote nodes, but ConsumerClosed
results in an eager propagation - we abruptly shutdown gRPC streams between nodes. This then results in the outboxes on the remote nodes canceling flowCtx
on each node. As a whole, this would be an ungraceful termination, so we might need to teach the import processors to clean up in such cases, but it would be eager.
pkg/sql/importer/import_processor_planning.go
line 339 at r3 (raw file):
// watch starts watching the context passed to newCancelWatcher for // cancelation and notifies the given DistSQLReceiver when a
nit: s/cancelation/cancellation/
.
pkg/sql/importer/import_stmt_test.go
line 2083 at r3 (raw file):
// TestImportIntoCSVCancel cancels a distributed import. This test // currently has few assertions but is essentially a regression tests
nit: s/tests/test/
.
87cbbe1
to
ebc7e01
Compare
ebc7e01
to
4946de7
Compare
Previously, we passed the import Resumer's context directly to our DistSQLReceiver and to (*sql.DistSQLPlanner).Run. This context is canceled when the user cancels or pauses a job. In practice, this setup made it very common for dsp.Run to return before the processors have shut down. Here, we create a separate context for the distsql flow. When the Resumer's context is canceled, we SetError on the DistSQLReceiver which will transition the receiver to DrainRequested which will be propagated to remote processors. Eventually, this leads to all remote processors exiting, and the entire flow shutting down. Note that the propagation of the draining status happens when a message is actually pushed from processor. We push progress messages from the import processors to the distsql receivers every 10 seconds or so. To protect against waiting too long, we explicitly cancel the flow after a timeout. Release note: None
4946de7
to
524e674
Compare
A good portion of this merged in another PR. This has some problems with some other recent distsql changes. Calling SetError hits a data race when updating the status. But, what is also interesting is that the previous test that required these changes is no longer reliably catching the problem. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I did fix an error recently about SetError
in #93360 - I'm assuming you're rebased on top of that, right? Can you share reproduction steps / CI link for the failure?
We also merged yesterday #90864 which should improve the shutdown of the distributed plans that use row-by-row infrastructure, so it might be interesting to try reverting that change and see whether the problem would reproduce.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @msbutler, @rhu713, and @stevendanna)
Previously, we passed the import Resumer's context directly to our
DistSQLReceiver and to (*sql.DistSQLPlanner).Run. This context is
canceled when the user cancels or pauses a job. In practice, this
setup made it very common for dsp.Run to return before the processors
have shut down.
Here, we create a separate context for the distsql flow. When the
Resumer's context is canceled, we SetError on the DistSQLReceiver
which will transition the receiver to DrainRequested which will be
propagated to remote processors. Eventually, this leads to all remote
processors exiting, and the entire flow shutting down.
Note that the propagation of the draining status happens when a
message is actually pushed from processor. We push progress messages
from the import processors to the distsql receivers every 10 seconds
or so.
To protect against waiting too long, we explicitly cancel the flow
after a timeout.
Further, previously un-managed goroutines in the import processors are
now explicitly managed in a context group that we wait on during
shutdown, similar to other job-related processors.
Overall, in the included test, this substantially reduces the
frequency at which we see import processors outliving the running job.
Epic: None
Release note: None