-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-4028][Streaming] ReceivedBlockHandler interface to abstract the functionality of storage of received data #2940
Conversation
@JoshRosen Please take a look. This is still not polished, and their might be comments missing, style issues, etc. I am putting this up to get early feedback. Thing to note is that the RecivedBlockInfo is being moved from streaming.scheduler to streaming.receiver, it is used in developer API, but itself not exposed (should ideally be). Point of discussion. |
@harishreedharan Please take a look as well. |
Test build #22191 has started for PR 2940 at commit
|
Test build #22191 has finished for PR 2940 at commit
|
Test FAILed. |
Test build #22311 has started for PR 2940 at commit
|
Test build #22311 has finished for PR 2940 at commit
|
Test FAILed. |
Hi TD, are you going to expose some store() API in |
@jerryshao No, there will be no new API in the Receiver. With the configuration change, the existing store() API will go through the new WriteAheadLogBasedBlockHandler. |
if (env.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) { | ||
if (checkpointDirOption.isEmpty) { | ||
throw new SparkException( | ||
"Cannot enable receiver write-ahead log without checkpoint directory set. " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit off topic (and we can deal with this later) - but should we make the checkpoint directory into a sparkConf
setting? That way we could do this type of validation earlier on. Right now unfortunately we can't distinguish here whether the user didn't call checkpoint
or whether there was just a bug somewhere in Spark code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. This is something that requires changes at both spark as well as spark streaming level, and probably further discussed, and hence deferred to the next release deadline.
Test build #22397 has started for PR 2940 at commit
|
Test build #22397 has finished for PR 2940 at commit
|
Test FAILed. |
@JoshRosen @pwendell Ready for another round of reviews |
|
||
// For processing futures used in parallel block storing into block manager and write ahead log | ||
implicit private val executionContext = ExecutionContext.fromExecutorService( | ||
Utils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I mentioned earlier, this might actually end up being a bottle neck. Since you could write using multiple threads in the same receiver - we are basically blocking more than one write from happening at any point in time. Since the BlockManager can handle more writes in parallel, we should probably use a much higher value than 2.
That said, the WAL Writer would still be a bottle neck - since the writes to the WAL have to be synchronized. So I am not entirely sure if having more than 2 threads helps a whole lot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a good scope of future optimization here. We can always create multiple WALManagers and write in parallel to multiple WALs. That would improve performance depending on where the bottle neck is.
Test build #22399 has finished for PR 2940 at commit
|
Test FAILed. |
Jenkins, test this. |
I took another pass - the main thing blocking this for me is cleaning up the type signature to not have |
The way this now we have to do runtime type checking in a bunch of places... I think it could be avoided with a fairly simple change. |
I mentioned this in the earlier in the original thread. This is a tradeoff between generality and type checking. I want the code in To achieve that these classes have to be agnostic to the exact return type of the
I dont see much advantage in 2 over 1. I thought of 3 as an option. But it gets weird for the default Regarding runtime type checking, I actually removed it in the last update. There is not runtime type check in this PR, the Thoughts? |
/** Trait that represents a class that handles the storage of blocks received by receiver */ | ||
private[streaming] trait ReceivedBlockHandler { | ||
|
||
/** Store a received block with the given block id */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that the Option[Any]
has provoked so much discussion, maybe we should document the return type in this scaladoc (e.g. say that it's arbitrary metadata or something); currently, it's not clear what's being returned.
…andler's storeBlock
Test build #22516 has started for PR 2940 at commit
|
For reference to others, I spoke @pwendell and @JoshRosen offline and decided that a slightly modified version of suggestion 3 (in my earlier comment) is the best middle ground that addresses all the concerns. What I have done is add a trait |
Test build #22520 has started for PR 2940 at commit
|
Test build #22516 has finished for PR 2940 at commit
|
Test FAILed. |
Test build #22520 has finished for PR 2940 at commit
|
Test FAILed. |
Jenkins, test this please. |
Test build #22529 has started for PR 2940 at commit
|
Test build #22529 has finished for PR 2940 at commit
|
Test FAILed. |
Test build #22558 has started for PR 2940 at commit
|
@pwendell Please take a look, hopefully this change addresses your concerns. |
LGTM - the new approach looks good. |
Thanks @pwendell and @JoshRosen for all the feedback. I am merging this. |
Test build #22558 has finished for PR 2940 at commit
|
Test PASSed. |
As part of the initiative to prevent data loss on streaming driver failure, this JIRA tracks the subtask of implementing a ReceivedBlockHandler, that abstracts the functionality of storage of received data blocks. The default implementation will maintain the current behavior of storing the data into BlockManager. The optional implementation will store the data to both BlockManager as well as a write ahead log.