-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-7884] Move block deserialization from BlockStoreShuffleFetcher to ShuffleReader #6423
Conversation
@massie mind associating this with a JIRA? It's not a huge code change, but I wouldn't really classify it as trivial. |
@@ -105,7 +105,8 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf) | |||
* when the writers are closed successfully | |||
*/ | |||
def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer, | |||
writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = { | |||
writeMetrics: ShuffleWriteMetrics, | |||
getDiskWriter: (BlockId, File, SerializerInstance, Int, ShuffleWriteMetrics) => BlockObjectWriter = blockManager.getDiskWriter): ShuffleWriterGroup = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't a Java-friendly interface, which is going to be a problem for new shuffle code that I'm working on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing that out, @JoshRosen. I'll update this to be more Java-friendly tomorrow. In the meantime, feel free to make suggestions on how to customize the creation of a BlockObjectWriter when calling forMapTask().
Test build #33549 has finished for PR 6423 at commit
|
This is the minimum amount of changes to the internal APIs I needed to make in order to get completely abstract the Parquet Shuffle Manager. |
@@ -29,6 +30,16 @@ import org.apache.spark.serializer.{SerializerInstance, Serializer} | |||
import org.apache.spark.util.{CompletionIterator, Utils} | |||
|
|||
/** | |||
* Factory class that creates iterators to read records from fetched blocks | |||
*/ | |||
trait BlockRecordIteratorFactory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this class intended to be public? It might be okay as a developer API but I don't think that we should commit to making any of the shuffle internals as stable public interfaces.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I think that any new shuffle interfaces should be implemented as Java interfaces, not Scala traits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was meant to be a developer API . I can update this to be a Java interface.
Our shuffle code is extremely hard to understand in it's current form and I'm hesitant to introduce new interfaces / extension points until we clean up the and document the existing code, so I'd like to see a better standalone description of the changes in this patch before I review it. |
I agree that the shuffle code is extremely hard to understand. I certainly don't want to make matter worse which is why I tried to make the minimal changes necessary. I'm not exactly clear what you mean when you say that we need to cleanup and document the existing shuffle code before this can be reviewed. Can you point me to the PR or branch where that work is being done? |
@@ -17,22 +17,21 @@ | |||
|
|||
package org.apache.spark.shuffle.hash | |||
|
|||
import org.apache.spark.storage._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style nit: import ordering (all of the spark imports should be grouped and alphabetized)
becdc81
to
3b32099
Compare
I updated this pull request to only address the read path in order to make it easier to review. There are no new interfaces added and I believe this approach will be cleaner and easier to reason about for others working on the shuffle code. |
Test build #33665 has finished for PR 6423 at commit
|
I'm fixing the Scala style checks now. |
3b32099
to
87323dc
Compare
readMetrics.incRecordsRead(1) | ||
delegate.next() | ||
} | ||
}.asInstanceOf[Iterator[Nothing]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This asInstanceOf
is necessary but ugly. The alternative, I believe, would be to move this into a method with a generic type, e.g...
def newInterruptibleIterator[T](context: ..., completionIter: ...) = {
new InterruptibleIterator[T](context, completionIter) {
val readMetrics = context.taskMetrics.createShuffleReadMetricsForDependency()
override def next(): T = {
readMetrics.incRecordsRead(1)
delegate.next()
}
}
Let me know which approach you prefer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This version is nice and short, but it does make it a bit hard to follow the types. What do you think of more explicit casting in each branch, to make it more clear what is going on? eg.:
// Update read metrics for each record materialized
val iter = new InterruptibleIterator[(Any, Any)](context, recordIterator) {
val readMetrics = context.taskMetrics.createShuffleReadMetricsForDependency()
override def next(): (Any, Any) = {
readMetrics.incRecordsRead(1)
delegate.next()
}
}
val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
// we are reading values that are already combined
val combinedKeyValuesIterator = iter.asInstanceOf[Iterator[(K,C)]]
new InterruptibleIterator(context,
dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context))
} else {
// we don't know the value type, but also don't care -- the dependency *should*
// have made sure its compatible w/ this aggregator, which will convert the value
// type to the combined type C
val keyValuesIterator = iter.asInstanceOf[Iterator[(K,Nothing)]]
new InterruptibleIterator(context,
dep.aggregator.get.combineValuesByKey(keyValuesIterator, context))
}
...
this is just an idea ... I'm not entirely convinced myself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The old BlockStoreShuffleFetcher.fetch(...)
returned Iterator[Nothing]
which is why that current casting is necessary -- but it's ugly.
I like your idea of casting in the branches to help make the types more explicit. I'll make the change now unless you'd like more time to think about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I completely realize that this is not a new problem you are introducing. I think I've been confused by the types in the old code a couple of times in the past. I was just thinking, as long as you are touching this, rather than making the bad code slightly worse, maybe we can make it slightly better :P
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@squito I like the cut of your jib. Pushing the update now.
Test build #33666 has finished for PR 6423 at commit
|
The tail of the Jenkins console was...
It looks like all the *ShuffleSuite tests passed but there was an unrelated YARN error? |
Jenkins, test this please. |
Test build #33676 has finished for PR 6423 at commit
|
@JoshRosen I hope that my changes address all your concerns. There are no new interfaces or extension points so this change shouldn't complicate the shuffle code you're working on. The only change here is to move serialization into the |
// so we don't release it again in cleanup. | ||
// Once the single-element (is0) iterator is exhausted, release the buffer so that we | ||
// don't release it again in cleanup. | ||
CompletionIterator[InputStream, Iterator[InputStream]](Iterator(is0), { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm actually a bit confused about the Try[Iterator[InputStream]]
here. In the old code, we had next()
return a (BlockId, Try[Iterator[Any]])
which, if the fetch successful, would contain an iterator of the elements in that individual block. Here, it looks like we're now returning a single-element iterator that contains an InputStream. I think that this is confusing for consumers of this class since the public Try[Iterator[InputStream]]
signature might lead them to believe that they have to handle the possibility of multiple input streams being returned. In fact, this is inconsistent with the class-level Scaladoc above, which says that this returns an iterator of "(BlockID, InputStream)".
It sounds like the motivation for returning an iterator here was to try to ensure proper release of the buffer. I'd like to understand if there's a cleaner way to do this, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to explore options, what if we returned buf
(which is a ManagedBuffer
) instead of returning an iterator from it? This would push the cleanup obligations to the caller, who might be in a better position to handle them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the other hand, I guess there aren't really any useful methods to call on ManagedBuffer
besides createInputStream()
and release()
. If we have a method that returns ManagedBuffer
, then that sort of implicitly takes the ManagedBuffer interface and makes it subject to this method's API stability guarantees (which there aren't any yet because this is private[spark]
...).
What if we returned an InputStream
that was wrapped such that calling close()
on it would release the underlying buffer? This avoids exposing ManagedBuffer to the higher-level code and makes cleanup easier to reason about. If we take this approach, it might be good to add a comment somewhere to mention that the caller should ensure that the input stream is eventually closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of returning an InputStream
that requires callers to call close()
and drop the use of the single-element CompletionIterator. I'll take make that change now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry jumping in a little late here -- but I think the caller doesn't have to call close()
, because its already getting called in the a task completion listener here, which is more robust. It doesn't hurt for the user to call it anyway (another reason why close
needs to be idempotent), but otherwise i think its really important that this close gets called by the framework, or else its really easy to have a leak.
(all that said, it looks like its still fine after the patch in its current form.)
9187e17
to
b12f912
Compare
@@ -313,6 +303,33 @@ final class ShuffleBlockFetcherIterator( | |||
} | |||
} | |||
|
|||
// Helper class that ensures a ManagedBuffer is released on InputStream.close() | |||
private class WrappedInputStream( | |||
delegate: InputStream, buf: ManagedBuffer, var currentResult: FetchResult) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style nit: the parameters here need to be wrapped differently; see https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to pass currentResult
here? It's not obvious on first read and I'm worried that someone might remove it by accident unless we add a comment explaining why we need to keep it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, note that the original code would end up mutating the currentResult
field in ShuffleBlockFetcherIterator
, whereas the currentResult = null
here only affects the field in this WrappedInputStream
class and leaves the ShuffleBlockFetcherIterator's field untouched.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at the style guide and used the recommendation for def
parameters (which are aligned with the definition). I didn't see that I should use the Scala Style Guide; I'll fix this. Thanks for point it out.
Test build #33777 has finished for PR 6423 at commit
|
currentResult = null | ||
buf.release() | ||
}) | ||
Try(buf.createInputStream()).map { inputStream => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might also treat this patch as an opportunity to revisit why we're using Try
here. It might be fine to keep Try
as the return type but I'm not necessarily convinced that we should be calling Try.apply()
here since I think it obscures whether we'll need to perform any cleanup after errors (for instance, do we need to free buf
? Is buf
guaranteed to be non-null or could this fail with an NPE on the buf.createInputStream()
call? I feel that the Try.apply()
makes it easy to overlook these concerns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ShuffleBlockFetcherIterator
has no information about server statuses from the map output tracker, shuffle IDs, etc. Using Try
allows the BlockStoreShuffleFetcher
to reformat exceptions as a FetchFailedException
which is the right exception to return to the scheduler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is technically not your change -- but do you know what happens if the stream is not consumed in full in a task? Does that lead to memory leaks because close on the stream is never called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to worry about a memory leak when the task exits with success or failure since there is a cleanup method registered with the task context, e.g.
// Add a task completion callback (called in both success case and failure case) to cleanup.
context.addTaskCompletionListener(_ => cleanup())
However, you're correct that there would be a memory (and file handle) leak, if the InputStream
isn't closed in the ShuffleReader
. This PR prevents that since serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
returns a NextIterator
which closes the stream when the last record is read.
To be more defensive and potentially simplify the code, it might make sense to have a call to ShuffleBlockFetcherIterator.next()
to not just return the next InputStream
but also close()
the last one. This would prevent callers from having more than one InputStream
open at a time but I don't think we want that anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rxin Let me know if you'd like this change to be made to ShuffleBlockFetcherIterator
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might consider deferring this change to a followup PR; want to file a JIRA issue so that we don't forget to eventually follow up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test build #35095 has finished for PR 6423 at commit
|
@massie thanks for updating the tests. It's still a little concerning to me that we don't explicitly check that the iterator returned from HashShuffleReader releases all memory. In theory, someone could change HashShuffleReader to not use asKeyValueIterator (and not use a NextIterator) such the the stream didn't get closed, and we wouldn't have any unit tests for that (so could end up with a memory leak). I realize it's annoying to add a test for that though, and it sounds like @rxin and @JoshRosen are fine with this as-is; given that I'm fine to sign off as well. |
@kayousterhout, Thanks for reviewing this PR. I agree that we should be more defensive moving forward. I think the right next step (no pun intended) is to Update ShuffleBlockFetcherIterator.next to close previous stream when returning a new one. That would prevent any ShuffleReader from leaking memory or file handles. @rxin I just pushed an update that I believe addresses all your comments. Let me know if I missed anything. |
Test build #35176 has finished for PR 6423 at commit
|
Jenkins, retest this please. |
Test build #35185 has finished for PR 6423 at commit
|
I talked to @kayousterhout a bit more offline. I think it is actually pretty important to have the same level of test coverage, since this code is very important & tricky to introduce bugs in the future. I don't think it should be super hard -- just need some mocking. |
I make the updates first thing next week. |
@kayousterhout @rxin I just pushed an update which adds a test specifically to validate that the |
Test build #35492 has finished for PR 6423 at commit
|
@kayousterhout @rxin All the tests passed. Let me know if you'd like any more changes made and I'll get to them immediately. |
This reverts commit f98a1b9.
Thanks. @kayousterhout is looking at this and will merge it. |
Proposal for different unit test
Test build #35608 has finished for PR 6423 at commit
|
derp this is my fault...just forgot to mark RecordingManagedBuffer as private[spark]! |
No worries. I'll fix it. |
Test build #35609 has finished for PR 6423 at commit
|
I will do a final pass on this and then merge tomorrow! |
Thanks, @kayousterhout. |
@kayousterhout Is this still on track for merging today? Let me know if you see anything else that needs to be done. |
@kayousterhout I'm happy to squash the commits and rebase them on |
Rebasing or squashing shouldn't be necessary Sent from my phone
|
Thanks for all of your work on this @massie! This is now merged into master. |
This commit updates the shuffle read path to enable ShuffleReader implementations more control over the deserialization process.
The BlockStoreShuffleFetcher.fetch() method has been renamed to BlockStoreShuffleFetcher.fetchBlockStreams(). Previously, this method returned a record iterator; now, it returns an iterator of (BlockId, InputStream). Deserialization of records is now handled in the ShuffleReader.read() method.
This change creates a cleaner separation of concerns and allows implementations of ShuffleReader more flexibility in how records are retrieved.