-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-4026][Streaming] Write ahead log management #2882
Conversation
Please review this @JoshRosen |
QA tests have started for PR 2882 at commit
|
QA tests have finished for PR 2882 at commit
|
Test FAILed. |
QA tests have started for PR 2882 at commit
|
QA tests have finished for PR 2882 at commit
|
Test PASSed. |
*/ | ||
private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configuration) | ||
extends Closeable { | ||
private val underlyingStream: Either[DataOutputStream, FSDataOutputStream] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WIP: this file is going to be updated by @harishreedharan to get rid of the local file customizations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, that makes sense. I guess you can still use the HDFS API to write to local files for testing purposes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. And for all tests, we are just going to use Hadoop Minicluster anyway.
|
||
private lazy val hadoopFlushMethod = { | ||
val cls = classOf[FSDataOutputStream] | ||
Try(cls.getMethod("hflush")).orElse(Try(cls.getMethod("sync"))).toOption |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice Scala one-liner :)
Why do we need this reflection, though? Is this necessary to support multiple Hadoop versions? If so, could you add a one-line comment to explain this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually we do, since Spark supports Hadoop 1 to Hadoop 2.5.0 right now. In Hadoop 1.x, the "sync" method did the same thing hflush does in 2.5.0 - so in short we do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Credit goes to Colin McCabe who wrote this line.
https://github.com/apache/spark/blame/master/core/src/main/scala/org/apache/spark/util/FileLogger.scala#L106
Stole from there.
QA tests have finished for PR 2882 at commit
|
Test PASSed. |
QA tests have started for PR 2882 at commit
|
@JoshRosen |
QA tests have finished for PR 2882 at commit
|
Test FAILed. |
Directory deletion should not fail tests
QA tests have started for PR 2882 at commit
|
QA tests have finished for PR 2882 at commit
|
Test PASSed. |
Yay, finally! |
@JoshRosen whenever you get a chance. :) |
private[streaming] object HdfsUtils { | ||
|
||
def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = { | ||
// HDFS is not thread-safe when getFileSystem is called, so synchronize on that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this comment is no longer relevant, or perhaps like it should be moved somewhere else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved.
This looks good to me! |
Alright, thanks! I will merge when this last set of changes gets through jenkins. |
QA tests have started for PR 2882 at commit
|
QA tests have finished for PR 2882 at commit
|
Test PASSed. |
Let's merge this for now. I will try and find out more about the getFileSystem thread-safety without doAs (which is what we support anyway) |
Talked to @cmccabe who says we should not worry about the thread-safety. If at all there was an issue, it was in too old a version which we need not worry about. Let's merge this! |
Cool! Thanks for check with @cmccabe. Merging this. |
As part of the effort to avoid data loss on Spark Streaming driver failure, we want to implement a write ahead log that can write received data to HDFS. This allows the received data to be persist across driver failures. So when the streaming driver is restarted, it can find and reprocess all the data that were received but not processed.
This was primarily implemented by @harishreedharan. This is still WIP, as he is going to improve the unitests by using HDFS mini cluster.