-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-4027][Streaming] WriteAheadLogBackedBlockRDD to read received either from BlockManager or WAL in HDFS #2931
Conversation
@JoshRosen Can you take a look? |
Test build #22152 has started for PR 2931 at commit
|
Test build #22152 timed out for PR 2931 at commit |
Test FAILed. |
Jenkins, test this. |
Test build #420 has started for PR 2931 at commit
|
Test build #420 has finished for PR 2931 at commit
|
The HdfsBackedRDDSuite is passing - not sure why there are some other failures. Maybe we are missing some cleanup? |
val partition = split.asInstanceOf[HDFSBackedBlockRDDPartition] | ||
val locations = getBlockIdLocations() | ||
locations.getOrElse(partition.blockId, | ||
HdfsUtils.getBlockLocations(partition.segment.path, hadoopConfiguration) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain how this code gets the block locations of the segment of the file that the partition needs? The offsets dont seem to be passed on to the HDFSUtils.getBlockLocations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed this one in the PR sent to your repo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
} | ||
|
||
// Hadoop Configuration is not serializable, so broadcast it as a serializable. | ||
val broadcastedHadoopConf = sc.broadcast(new SerializableWritable(hadoopConfiguration)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to take the SerializableWritable as the argument in the constructor (as being done in #2935) or should we just take the hadoopConf and wrap it in the SerializableWritable once that is merged? We don't want to change the interface later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now I am leaving this as is. Lets revisit this later if needed.
I left a pass of fairly shallow style comments; I'll loop back later to offer more substantive feedback and to actually check that I understand this logic. |
Make sure getBlockLocations uses offset and length to find the blocks on...
Test build #22189 has started for PR 2931 at commit
|
Test build #22189 has finished for PR 2931 at commit
|
Test FAILed. |
Shutdown spark context after tests. Formatting/minor fixes
Test build #22300 has started for PR 2931 at commit
|
Test build #22300 has finished for PR 2931 at commit
|
This looks good to me. |
@transient override val blockIds: Array[BlockId], | ||
@transient val segments: Array[WriteAheadLogFileSegment], | ||
val storeInBlockManager: Boolean, | ||
val storageLevel: StorageLevel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: the common style in spark is
val storageLevel: StorageLevel)
extends BlockRDD[T](sc, blockIds) {
@harishreedharan / @tdas I made a few more comments. Most are just nits that I've left earlier. |
Thanks @rxin. Updates coming soon. |
@rxin I updated. Only part i am not in agreement is the preferred location logic. |
Test build #22521 has started for PR 2931 at commit
|
Apart from the readability, does one have a performance benefit over the other? |
Test build #22521 has finished for PR 2931 at commit
|
Test PASSed. |
@harishreedharan I dont think so. The block location is called only once in both, and the hdfs location is called only once and only if required. I dont think there is any issue in performance between these two possible different implementations. |
def blockLocations = getBlockIdLocations().get(partition.blockId) | ||
def segmentLocations = HdfsUtils.getFileSegmentLocations( | ||
partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfig) | ||
blockLocations.orElse(segmentLocations).getOrElse(Seq.empty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah its not very ideal as I think the most easy to understand is something like
if ( ) {
blockLocations
} else if ( ) {
segmentLocations
} else {
Seq.empty
}
but this isnt too bad if the above isn't possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually this discussion is moot because we should just let getFileSegmentLocations return Seq[String] rather than Option[Seq[String]], and then this should only consist of two branches, accomplishable with a single getOrElse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the final version that I am doing then.
val blockLocations = getBlockIdLocations().get(partition.blockId)
def segmentLocations = HdfsUtils.getFileSegmentLocations(...)
blockLocations.getOrElse(segmentLocations)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. Once we make that change, I think both the getOrElse and the if..else solutions are equivalent - one is a scala way of doing things, and the other is the "traditional" way. The ones using def/lazy val is really a more scala way of doing it.
I have no preference for any one method, but would generally consider the overhead and performance incurred by each and I am not that much of an expert in scala to know.
@tdas you also missed one other comments ... |
@rxin, crap, i missed that. Personally I find the parenthesis ending in the next line more logical as braces always end in next line. But will do it in the interest of consistency. |
Test build #22537 has started for PR 2931 at commit
|
Test build #22537 has finished for PR 2931 at commit
|
Test PASSed. |
Test build #22562 has started for PR 2931 at commit
|
Alright! I think we have converged to the best solution here. I am going to wait for the tests to pass and then converge. Thanks @rxin and @JoshRosen for all the feedback! |
Test build #22562 has finished for PR 2931 at commit
|
Test PASSed. |
As part of the initiative of preventing data loss on streaming driver failure, this sub-task implements a BlockRDD that is backed by HDFS. This BlockRDD can either read data from the Spark's BlockManager, or read the data from file-segments in write ahead log in HDFS.
Most of this code has been written by @harishreedharan