-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-9023] [SQL] Efficiency improvements for UnsafeRows in Exchange #7456
Conversation
Test build #37562 has finished for PR 7456 at commit
|
import org.apache.spark.unsafe.PlatformDependent | ||
|
||
|
||
class UnsafeRowSerializer(numFields: Int) extends Serializer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a file that deserves a little bit more comment to explain what this is for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. I'm going to add comments now.
The current code looks pretty good to me. |
I'm going to rebase this on top of #7482 to make things easier to test; will rebase again once that patch is merged. |
Alright, I've updated this and it should be ready for another look. Added a very trivial test to trigger the new shuffle path and caught a bug related to UnsafeRowSerializer not being Serializable. |
Jenkins, retest this please. |
Test build #37736 has finished for PR 7456 at commit
|
Test build #37738 has finished for PR 7456 at commit
|
Can you also update the pull request description? |
Done; I removed the "work-in-progress" part. |
Huh, looks like a legitimate test failure in SparkSqlSerializer2SortMergeShuffleSuite:
|
Test build #37744 has finished for PR 7456 at commit
|
this | ||
} | ||
override def writeKey[T: ClassTag](key: T): SerializationStream = { | ||
assert(key.isInstanceOf[Int]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you need to add some comment explaining why we are not doing anything when writing keys.
var dataRemaining: Int = row.getSizeInBytes | ||
val baseObject = row.getBaseObject | ||
var rowReadPosition: Long = row.getBaseOffset | ||
while (dataRemaining > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably doesn't matter in the MVP, but if we know the UnsafeRow is backed by a byte array, we don't need to do this copying, do we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A nice way to address this would be to add a writeTo
method to UnsafeRow
itself. That method could contain a special case to handle the case where the row is backed by an on-heap byte array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Looks pretty good. I'm going to merge it. Please submit a followup pr to address some of the comments on documentation and choice of buffer size. |
…safeRows in Exchange) This patch addresses code review feedback from #7456. Author: Josh Rosen <[email protected]> Closes #7551 from JoshRosen/unsafe-exchange-followup and squashes the following commits: 76dbdf8 [Josh Rosen] Add comments + more methods to UnsafeRowSerializer 3d7a1f2 [Josh Rosen] Add writeToStream() method to UnsafeRow
This pull request aims to improve the performance of SQL's Exchange operator when shuffling UnsafeRows. It also makes several general efficiency improvements to Exchange.
Key changes:
(partitioningColumRow: InternalRow, row: InternalRow)
pair into the shuffle. This is very inefficient because it ends up redundantly serializing the partitioning columns only to immediately discard them after the shuffle. After this patch's changes, Exchange now shuffles(partitionId: Int, row: InternalRow)
pairs. This still isn't optimal, since we're still shuffling extra data that we don't need, but it's significantly more efficient than the old implementation; in the future, we may be able to further optimize this once we implement a new shuffle write interface that accepts non-key-value-pair inputs.compute()
method has been significantly simplified; the new code has less duplication and thus is easier to understand.UnsafeRowSerializer
to serialize these rows. This serializer is significantly more efficient since it simply copies the UnsafeRow's underlying bytes. Note that this approach does not work for UnsafeRows that use the ObjectPool mechanism; I did not add support for this because we are planning to remove ObjectPool in the next few weeks.