-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-10210] [STREAMING] Filter out non-existent blocks before creating BlockRDD #8405
Conversation
Test build #41495 has finished for PR 8405 at commit
|
Test build #41497 has finished for PR 8405 at commit
|
@@ -116,7 +116,10 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont | |||
logWarning("Some blocks have Write Ahead Log information; this is unexpected") | |||
} | |||
} | |||
new BlockRDD[T](ssc.sc, blockIds) | |||
val validBlockIds = blockIds.filter { id => | |||
ssc.sparkContext.env.blockManager.master.contains(id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it worth to add some warning log here? I think the user may forget to enable receiver log.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added warning.
LGTM except one minor comment |
Test build #41507 has finished for PR 8405 at commit
|
LGTM |
Test build #41521 has finished for PR 8405 at commit
|
Test build #41523 has finished for PR 8405 at commit
|
Thanks @zsxwing for reviewing. Merging this to master and 1.5 |
…ing BlockRDD When write ahead log is not enabled, a recovered streaming driver still tries to run jobs using pre-failure block ids, and fails as the block do not exists in-memory any more (and cannot be recovered as receiver WAL is not enabled). This occurs because the driver-side WAL of ReceivedBlockTracker is recovers that past block information, and ReceiveInputDStream creates BlockRDDs even if those blocks do not exist. The solution in this PR is to filter out block ids that do not exist before creating the BlockRDD. In addition, it adds unit tests to verify other logic in ReceiverInputDStream. Author: Tathagata Das <[email protected]> Closes #8405 from tdas/SPARK-10210. (cherry picked from commit 1fc3758) Signed-off-by: Tathagata Das <[email protected]>
When write ahead log is not enabled, a recovered streaming driver still tries to run jobs using pre-failure block ids, and fails as the block do not exists in-memory any more (and cannot be recovered as receiver WAL is not enabled).
This occurs because the driver-side WAL of ReceivedBlockTracker is recovers that past block information, and ReceiveInputDStream creates BlockRDDs even if those blocks do not exist.
The solution in this PR is to filter out block ids that do not exist before creating the BlockRDD. In addition, it adds unit tests to verify other logic in ReceiverInputDStream.