-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23033][SS][Follow Up] Task level retry for continuous processing #20675
Conversation
cc @tdas and @jose-torres |
Test build #87665 has finished for PR 20675 at commit
|
retest this please |
Test build #87666 has finished for PR 20675 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this does work, but the outstanding question is whether we should support task-level retry for continuous processing at all. My instinct is that we should not, for a few reasons:
- The semantics aren't quite right. Task-level retry can happen a fixed number of times for the lifetime of the task, which is the lifetime of the query - even if it runs for days after, the attempt number will never be reset.
- We end up with two divergent ways for continuous readers to get their starting offset, through the constructor and through the GetLastEpochAndOffset RPC.
- We have to complicate the data source API.
What I think we should do instead is detect when a task retry would have happened, convert it to a global retry in ContinuousExecution, and impose some kind of time-aware (e.g. N per day) limit. Since every task is checkpointing and retries are expected to be rare, making the other tasks restart too shouldn't cause problems.
IncrementEpoch(), | ||
// Check the answer exactly, if there's duplicated result, CheckAnserRowsContains | ||
// will also return true. | ||
CheckAnswerRowsContainsOnlyOnce(scala.Range(0, 20).map(Row(_))), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking exact answer can just be CheckAnswer(0 to 20: _*)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I firstly use CheckAnswer(0 to 19: _*)
here, but I found the test case failure probably because the CP maybe not stop between Range(0, 20) every time. See the logs below:
== Plan ==
== Parsed Logical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MemoryStreamWriter@6435422d
+- Project [value#13L]
+- StreamingDataSourceV2Relation [timestamp#12, value#13L], org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader@5c5d9c45
== Analyzed Logical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MemoryStreamWriter@6435422d
+- Project [value#13L]
+- StreamingDataSourceV2Relation [timestamp#12, value#13L], org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader@5c5d9c45
== Optimized Logical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MemoryStreamWriter@6435422d
+- Project [value#13L]
+- StreamingDataSourceV2Relation [timestamp#12, value#13L], org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader@5c5d9c45
== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MemoryStreamWriter@6435422d
+- *(1) Project [value#13L]
+- *(1) DataSourceV2Scan [timestamp#12, value#13L], org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader@5c5d9c45
ScalaTestFailureLocation: org.apache.spark.sql.streaming.StreamTest$class at (StreamTest.scala:436)
org.scalatest.exceptions.TestFailedException:
== Results ==
!== Correct Answer - 20 == == Spark Answer - 25 ==
!struct<value:int> struct<value:bigint>
[0] [0]
[10] [10]
[11] [11]
[12] [12]
[13] [13]
[14] [14]
[15] [15]
[16] [16]
[17] [17]
[18] [18]
[19] [19]
[1] [1]
![2] [20]
![3] [21]
![4] [22]
![5] [23]
![6] [24]
![7] [2]
![8] [3]
![9] [4]
! [5]
! [6]
! [7]
! [8]
! [9]
== Progress ==
StartStream(ContinuousTrigger(3600000),org.apache.spark.util.SystemClock@343e225a,Map(),null)
AssertOnQuery(<condition>, )
AssertOnQuery(<condition>, )
AssertOnQuery(<condition>, )
AssertOnQuery(<condition>, )
AssertOnQuery(<condition>, )
AssertOnQuery(<condition>, )
=> CheckAnswer: [0],[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19]
StopStream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, right, my bad.
Now that I think about it, we may eventually need a way to set the starting partition offset after creation for other reasons, so I'm less confident in those second and third reasons. But on the whole I still think converting to global restarts makes sense. |
Great thanks for your detailed reply!
|
It's not semantically wrong that the attempt number is never reset; it just means that for very long-running streams task restarts will eventually run out. You make a good point that in high parallelism cases we might need to be able to restart only a single task, although I think we'd still need query-level restart on top of that. But if you're worried that the current implementation of task restart will become incorrect as more complex scenarios are supported, I'd definitely lean towards deferring it until continuous processing is more feature-complete. I was working on getting basic aggregation working, and I think we definitely will need some kind of setOffset-like functionality. Do you want to spin that off into a separate PR? (I can handle it otherwise.) |
* | ||
* @param offset last offset before task retry. | ||
*/ | ||
default void setOffset(PartitionOffset offset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it might be better to create a new interface ContinuousDataReaderFactory, and implement this there as something like createDataReaderWithOffset(PartitionOffset offset)
. That way the intended lifecycle is explicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, that's more clearer.
Ah, I know your means. Yeah, if we support task level retry we should also set the task retry number unlimited.
Yep, the "complex scenarios" I mentioned mainly including shuffle and aggregation scenario like comments in https://issues.apache.org/jira/browse/SPARK-20928?focusedCommentId=16245556&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16245556, in those scenario maybe task level retry should consider epoch align, but current implementation of task restart is completed for map-only continuous processing I think. Agree with you about deferring it, so I just leave a comment in SPARK-23033 and close this or you think this should reviewed by others?
Of cause, #20689 added a new interface |
…rtOffset ## What changes were proposed in this pull request? As discussion in #20675, we need add a new interface `ContinuousDataReaderFactory` to support the requirements of setting start offset in Continuous Processing. ## How was this patch tested? Existing UT. Author: Yuanjian Li <[email protected]> Closes #20689 from xuanyuanking/SPARK-23533.
…rtOffset ## What changes were proposed in this pull request? As discussion in apache#20675, we need add a new interface `ContinuousDataReaderFactory` to support the requirements of setting start offset in Continuous Processing. ## How was this patch tested? Existing UT. Author: Yuanjian Li <[email protected]> Closes apache#20689 from xuanyuanking/SPARK-23533.
Looks like the patch is outdated, and when continuous query supports shuffled stateful operators, implementing task level retry is not that trivial. To get correct result of aggregation, when one of task fails at epoch N, all the tasks and states should be restored to epoch N. I definitely agree that it would be ideal to have stable task level retry, just wondering this patch would work with follow-up tasks for continuous mode. |
@HeartSaVioR Thanks for your reply, sorry for just seen your comment. Yep, will keep tracking this feature after we supports shuffled stateful operators. |
…rtOffset ## What changes were proposed in this pull request? As discussion in apache#20675, we need add a new interface `ContinuousDataReaderFactory` to support the requirements of setting start offset in Continuous Processing. ## How was this patch tested? Existing UT. Author: Yuanjian Li <[email protected]> Closes apache#20689 from xuanyuanking/SPARK-23533. RB=1844647 G=superfriends-reviewers R=mshen,fli,zolin,yezhou,latang A=
What changes were proposed in this pull request?
Here we want to reimplement the task level retry for continuous processing, changes include:
EpochCoordinatorMessage
namedGetLastEpochAndOffset
, it is used for getting last epoch and offset of particular partition while task restarted.ContinuousDataReader
, it supported BaseReader can restart from given offset.How was this patch tested?
Add new UT in
ContinuousSuite
and newStreamAction
namedCheckAnswerRowsContainsOnlyOnce
for more accurate result checking.