-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23033][SS] Don't use task level retry for continuous processing #20225
Conversation
Test build #85939 has finished for PR 20225 at commit
|
eventually(timeout(streamingTimeout)) { assert(taskId != -1) } | ||
spark.sparkContext.killTaskAttempt(taskId) | ||
}, | ||
Execute(waitForRateSourceTriggers(_, 4)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you explain the logic behind this test? what does this test do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It kills an arbitrary task, and checks that query execution continues onward unaffected.
I've added a check that the run ID has changed, confirming that the retry was indeed made at the ContinuousExecution level.
The hang in test build #85939 was a test issue in ContinuousStressSuite, which I could reproduce locally by bumping up the rows per second. When the rate of incoming data is too high, the query execution does make progress - but the answer checking operation in the test is expensive. And the rate source was continuing to run as it happened. Since the executors are local, an overload means the answer check will take unreasonably long to finish. I've fixed this by stopping the rate source before checking the answer in the stress tests. |
@@ -219,6 +201,44 @@ class ContinuousSuite extends ContinuousSuiteBase { | |||
StopStream) | |||
} | |||
|
|||
test("kill task") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test does not verify killing tasks :) it verifies "task failure stops the query" or "task failure restarts the query"
query.exception.get.getCause.getCause.getCause.isInstanceOf[ContinuousTaskRetryException]) | ||
}) | ||
|
||
spark.sparkContext.removeSparkListener(listener) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put this in a finally clause.
vals.contains(i) | ||
}) | ||
}) | ||
StopStream, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the earlier comment about the overloaded failure mode this PR exposed.
@@ -280,6 +294,7 @@ class ContinuousStressSuite extends ContinuousSuiteBase { | |||
AwaitEpoch(0), | |||
Execute(waitForRateSourceTriggers(_, 201)), | |||
IncrementEpoch(), | |||
StopStream, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are these needed?
// Wait until a task is started, then kill its first attempt. | ||
eventually(timeout(streamingTimeout)) { assert(taskId != -1) } | ||
spark.sparkContext.killTaskAttempt(taskId) | ||
eventually(timeout(streamingTimeout)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be checked with a "ExpectFailure" test? Better to test using the same harness that is used for microbatch so that we are sure they failure behavior is the same.
|
||
import org.apache.spark.SparkException | ||
|
||
class ContinuousTaskRetryException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add docs.
@@ -52,6 +52,10 @@ class ContinuousDataSourceRDD( | |||
} | |||
|
|||
override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { | |||
if (context.attemptNumber() != 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comments on what this is.
Test build #85987 has finished for PR 20225 at commit
|
Test build #85990 has finished for PR 20225 at commit
|
Test build #85993 has finished for PR 20225 at commit
|
The problem in the test runs above was also a test issue. The new code changed the synchronization such that the row wasn't written when the test expected; I verified manually that the failing test doesn't actually enter the attemptNumber != 0 branch. |
Test build #86007 has started for PR 20225 at commit |
Test build #85997 has finished for PR 20225 at commit
|
The most recent test build failure is from an earlier commit which I think is obsoleted. I think #86007 is correct but we should retest this please to confirm. |
retest this please |
Test build #86020 has finished for PR 20225 at commit
|
retest this please |
Test build #86046 has finished for PR 20225 at commit
|
Fix merge conflicts. And add [SS} to the title of this PR. |
Test build #86283 has finished for PR 20225 at commit
|
LGTM. Merging this to master and 2.3 |
## What changes were proposed in this pull request? Continuous processing tasks will fail on any attempt number greater than 0. ContinuousExecution will catch these failures and restart globally from the last recorded checkpoints. ## How was this patch tested? unit test Author: Jose Torres <[email protected]> Closes #20225 from jose-torres/no-retry. (cherry picked from commit 86a8450) Signed-off-by: Tathagata Das <[email protected]>
What changes were proposed in this pull request?
Continuous processing tasks will fail on any attempt number greater than 0. ContinuousExecution will catch these failures and restart globally from the last recorded checkpoints.
How was this patch tested?
unit test