-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Worker retry for MSQ task #13353
Worker retry for MSQ task #13353
Conversation
This pull request introduces 1 alert when merging 74fa66f into 56d5c97 - view on LGTM.com new alerts:
|
This pull request introduces 1 alert when merging c92b217 into 81d005f - view on LGTM.com new alerts:
|
This pull request introduces 1 alert when merging 58da3cf into a3edda3 - view on LGTM.com new alerts:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR 🥳 Left a few comments for now.
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java
Outdated
Show resolved
Hide resolved
...-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
Outdated
Show resolved
Hide resolved
* Gets the error code from the message. If the messay is empty or null, {@link UnknownFault#CODE} is returned. This method | ||
* does not gurantee that the error code we get out of the message is a valid error code. | ||
*/ | ||
public static String getErrorCodeFromMessage(String message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of relying on the manual parsing of the error codes from the message string and defining our own error message schema, can we use jsonMapper somehow to do it for us?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Umm, I do not think so as task status might be received from the tasks table where the status calls are chomped.
...-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQFaultUtils.java
Show resolved
Hide resolved
...query/src/main/java/org/apache/druid/msq/indexing/error/TotalRelaunchLimitExceededFault.java
Outdated
Show resolved
Hide resolved
@@ -551,6 +563,15 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer) | |||
id(), | |||
task.getDataSource(), | |||
context, | |||
(failedTask, fault) -> { | |||
addToKernelManipulationQueue((kernel) -> { | |||
if (isDurableStorageEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think across this PR and #13368, we might need to unify what enabling durable storage, enabling fault tolerance, and enabling durable storage for intermediate steps mean since that might cause confusion in the code as well as to the end user configuring the properties.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed — it should all be controllable by the same faultTolerant
setting. We don't want people to have to tweak a bunch of different settings in order to get the full package of behavior.
IMO, it should default to false at first, while we're experimenting with it, but we should change the default to true once we're feeling that it's stable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, it should default to false at first
Sorry what is it here , isDurableStorageEnabled
or a new context param faultTolerant
which is by default false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a context flag.
if the user sets faultTolerance=true, I enable durableShuffleStorage as well
if the user sets faultTolerance=true, and durableStorage=false its an error.
if the user sets durableStorage=true, faultTolerance context param remains untouched.
...-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
Outdated
Show resolved
Hide resolved
...-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
Outdated
Show resolved
Hide resolved
...i-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStagePhase.java
Outdated
Show resolved
Hide resolved
...ge-query/src/main/java/org/apache/druid/msq/indexing/error/WorkerRelaunchedTooManyTimes.java
Outdated
Show resolved
Hide resolved
|
||
import org.apache.druid.msq.indexing.error.MSQFault; | ||
|
||
public interface RetryTask |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would be better named as RelaunchTask
if we are going with relaunch instead of retry everywhere. WDYT?
/** | ||
* Enables retrying for the task | ||
*/ | ||
public void enableRetrying() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public void enableRetrying() | |
public void enableRelaunching() |
@@ -58,11 +61,12 @@ public MSQWorkerTask( | |||
@JsonProperty("controllerTaskId") final String controllerTaskId, | |||
@JsonProperty("dataSource") final String dataSource, | |||
@JsonProperty("workerNumber") final int workerNumber, | |||
@JsonProperty("context") final Map<String, Object> context | |||
@JsonProperty("context") final Map<String, Object> context, | |||
@JsonProperty(value = "retry", defaultValue = "0") final int retryCount |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JsonProperty(value = "retry", defaultValue = "0") final int retryCount | |
@JsonProperty(value = "relaunch", defaultValue = "0") final int relaunchCount |
I think we should drop retry in favour of relaunch (or change everything back to retry) to avoid confusion unless they mean different things. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going through the code, can there be a case when say a task is present on a mm/indexer which loses network for a while? In that duration, we create a new task and once the older mm/indexer regains the network it also starts processing (which might cause some problems). How is that handled? Does tasksToCleanup
method continually clean up the tasks till they are not present?
...-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
Outdated
Show resolved
Hide resolved
...ore/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very cool! I took a first pass, mostly focusing on general behavior and the kernel. I will take another pass after hearing back about the kernel stuff.
docs/multi-stage-query/reference.md
Outdated
@@ -234,6 +234,9 @@ The following table lists query limits: | |||
| Number of output columns for any one stage. | 2,000 | `TooManyColumns` | | |||
| Number of workers for any one stage. | Hard limit is 1,000. Memory-dependent soft limit may be lower. | `TooManyWorkers` | | |||
| Maximum memory occupied by broadcasted tables. | 30% of each [processor memory bundle](concepts.md#memory-usage). | `BroadcastTablesTooLarge` | | |||
| Maximum relaunches per worker. Initial run is not a relaunch. The worker will be spawned 1 + workerRelaunchLimit times before erroring out. | 2 | `WorkerRelaunchedTooManyTimes` | | |||
| Maximum relaunches across all workers. | 30 | `TotalRelaunchLimitExceededFault` | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems low, especially since a single machine can be running many workers that would all fail at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be better if the two faults had similar names, like TooManyAttemptsForWorker
and TooManyAttemptsForJob
. Or something like that. Also, don't include Fault
in the name of faults.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the maximum relaunched across all workers to 100 but I do agree with @LakshSingla that it should depend on the number of workers of the job eventually.
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
Outdated
Show resolved
Hide resolved
@@ -551,6 +563,15 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer) | |||
id(), | |||
task.getDataSource(), | |||
context, | |||
(failedTask, fault) -> { | |||
addToKernelManipulationQueue((kernel) -> { | |||
if (isDurableStorageEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed — it should all be controllable by the same faultTolerant
setting. We don't want people to have to tweak a bunch of different settings in order to get the full package of behavior.
IMO, it should default to false at first, while we're experimenting with it, but we should change the default to true once we're feeling that it's stable.
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
Show resolved
Hide resolved
} | ||
|
||
/** | ||
* True if work orders needs to be sent else false. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This javadoc doesn't seem quite right either: it doesn't seem true to me that if all workers are in RESULTS_READY
then work orders need to be sent. Please expand the javadoc to explain what this really means.
return WorkerStagePhase.READING_INPUT; | ||
}); | ||
if (phase != ControllerStagePhase.READING_INPUT) { | ||
if (workOrdersNeedToBeSent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something seems backwards here? The check is workOrdersNeedToBeSent()
, but the comment says "if no more work orders need to be sent…". Please clarify.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the method name and the java doc to allWorkOrdersSent
...stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
Outdated
Show resolved
Hide resolved
private final int workerCount; | ||
|
||
private final WorkerInputs workerInputs; | ||
|
||
// worker-> workerStagePhase | ||
private final Int2ObjectMap<WorkerStagePhase> workerToPhase = new Int2ObjectOpenHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a big change. Previously WorkerStagePhase
was localized to the worker. Now it's on the controller too. Is the controller state kept in sync with the worker state? If so how does that work? Please explain this stuff in the comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// worker-> workerStagePhase
// Controller keeps track of the stage with this map.
// Currently, we rely on the serial nature of the state machine to keep things in sync between the controller and the worker.
// So the worker state in the controller can go out of sync with the actual worker state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the comments.
...-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
Outdated
Show resolved
Hide resolved
…he#13374) * Migrate current integration batch tests to equivalent MSQ tests using new IT framework * Fix build issues * Trigger Build * Adding more tests and addressing comments * fixBuildIssues * fix dependency issues * Parameterized the test and addressed comments * Addressing comments * fixing checkstyle errors * Adressing comments
# Conflicts: # extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java # extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java # extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java # extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java # extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java # extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelTest.java
# Conflicts: # extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java # extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java # extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java
This pull request introduces 2 alerts when merging d32397a into d85fb8c - view on LGTM.com new alerts:
Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog. |
The functions imported after resolving the conflict with sequential merge LGTM and should function in the same way as before. Will finish reviewing the rest as well. |
Adding stale task handling in sketchFetcher. Adding UT's.
# Conflicts: # docs/multi-stage-query/reference.md
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few more comments about surrounding methods and classes. Will revisit the PR for stage changes and durable storage changes. Will be punting the changes persisting to statistics collection for now.
core/src/main/java/org/apache/druid/java/util/common/function/TriConsumer.java
Outdated
Show resolved
Hide resolved
{ | ||
|
||
@Test | ||
public void sanityTest() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there should be a test where the consumerA
throws an exception after adding to the set, and then we should see if consumerB
runs or not.
msqHelper.testQueriesFromFile(QUERY_FILE, datasource); | ||
} | ||
|
||
private void killTaskAbruptly(String taskIdToKill) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be moved to the IT Framework's core methods instead of being in a specific IT test? Also, is there a better way to do it without invoking the command?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another way is to use a flag which is passed in the processing stack.
Then in the main code you check if this flag is set, exit the jvm with a non zero code.
As that would require us to change a lot of places, I have gone with this approach.
If more things start using this method, then maybe we can put it there.
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQueryHA.java
Outdated
Show resolved
Hide resolved
@@ -58,10 +58,14 @@ | |||
public static final String CTX_FINALIZE_AGGREGATIONS = "finalizeAggregations"; | |||
private static final boolean DEFAULT_FINALIZE_AGGREGATIONS = true; | |||
|
|||
public static final String CTX_ENABLE_DURABLE_SHUFFLE_STORAGE = "durableShuffleStorage"; | |||
public static final String CTX_DURABLE_SHUFFLE_STORAGE = "durableShuffleStorage"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we now have a fault-tolerant mode that is one step ahead of the shuffle storage, does it make sense to deprecate the durable shuffle storage mode entirely since the time benefits of just using the shuffle storage seem negligible in comparison to using the fault-tolerant mode? (My understanding is that most of the time would be spent in writing/reading from the durable storage which both cases do however with fault tolerance we also enable extra code paths which allow worker retry)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have featured flagged it just in case we break something. In the future, yes we should remove the durableShuffleStorage context param.
The way it works is
if the user sets faultTolerance=true, I enable durableShuffleStorage as well
if the user sets faultTolerance=true, and durableStorage=false its an error.
if the user sets durableStorage=true, faultTolerance context param remains untouched.
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQueryHA.java
Outdated
Show resolved
Hide resolved
) | ||
); | ||
|
||
if (sqlTaskStatus.getState().isFailure()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are trying to coordinate between the task running and the IT running, I think we should explore into the SLEEP()
function. I encountered this in ITSQLCancelTest
, which allows the queries to run for a certain amount of time.
Taken from ITSqlCancelTest
/**
* This query will run exactly for 15 seconds.
*/
private static final String QUERY
= "SELECT sleep(CASE WHEN added > 0 THEN 1 ELSE 0 END) FROM wikipedia_editstream WHERE added > 0 LIMIT 15";
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MSQTasks.java
Outdated
Show resolved
Hide resolved
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MSQTasks.java
Outdated
Show resolved
Hide resolved
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
Show resolved
Hide resolved
# Conflicts: # extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java # extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java # extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java # extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherAutoModeTest.java # extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java
Created separate module for faultToleranceTests
...ions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java
Fixed
Show fixed
Hide fixed
...stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
Fixed
Show fixed
Hide fixed
...stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
Fixed
Show fixed
Hide fixed
...stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
Fixed
Show fixed
Hide fixed
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
Fixed
Show fixed
Hide fixed
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MSQTasks.java
Fixed
Show fixed
Hide fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took a high level pass at the change - LGTM once CI passes
Thanks, @rohangarg @LakshSingla @adarshsanjeev for the review. |
Why
Tasks can fail for whatever reason in a large distributed cluster running large jobs. To run large jobs reliably we need a way so that MSQ becomes resilient to such failures.
Description
Adds the ability for MSQ controller task to retry worker task in case of failures.
All the logic is pushed down to the
ControllerQueryKernel
so that the state machine takes the decision if the work order for the stage needs to be retried.MSQWorkerTaskLauncher
contacts the overlord to figure out if the task is failed. If it detects a failure, it triggers a callbackRetryTask
which in turn invokes the retry function.The retry function is also invoked if we are not able to contact the task from the controller while posting
PartitionBoundaries
andWorkOrders
.Flow in the retry function is like this:
MSQWorkerTaskLauncher
workOrdersToRetry
retryFailedTasks
which checks if there are any work ordersAs our checkpointed location is stage result output per worker, we retry the worker tasks if the stage is in
As we need a way to retrieve outputs even if the worker dies in subsequent stages, we need the data to be in
DurableStorage
hence fault tolerance works only whenDurableStorage
is enabled.Guardrails
As we do not want all faults to be retired, only whitelisted error codes are retired
ControllerQueryKernel#retriableErrorCodes
We have a max retry limit per worker set at 2.
We have a limit on total retries in a job across all workers set at 100.
Release note
Adds the ability for MSQ controller task to retry worker task in case of failures. This can be set by passing
faultTolerance
:true
in the query context.Key changed/added classes in this PR
ContollerQueryKernel
ControllerImpl
MSQWorkerTaskLauncher
ControllerStageTracker
This PR has: