-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-2062][GraphX] VertexRDD.apply does not use the mergeFunc #1903
Conversation
create verticesDeduplicate with reduceByKey, using mergeFunc then proceed with verticesDedup
Can one of the admins verify this patch? |
This isn't quite the right approach, since the call to reduceByKey will result in two rounds of communication and hash aggregations (in reduceByKey and copartitionWithVertices) when only one is necessary. It would be better to add a ShippableVertexPartition constructor that takes a mergeFunc, then just pass the mergeFunc from here into that constructor. Also, the capitalization in I can make these changes this weekend if you like. |
Thanks |
It is ok now? About testing: how do I test it? I think it should be added in GraphSuite. Is it necessary? Thank you! |
This looks good! A test would be good too. VertexRDDSuite seems like the right place, since nothing else actually calls this variant of VertexRDD.apply. It should be OK to create an empty EdgeRDD in VertexRDDSuite for testing purposes. Here's a simple test: val verts = sc.parallelize(List((0L, 1), (0L, 2), (1L, 3)))
val edges = EdgeRDD.fromEdges(sc.parallelize(List.empty[Edge[Int]]))
val rdd = VertexRDD(verts, edges, 0, (a: Int, b: Int) => a + b)
assert(rdd.collect.toSet == Set((0L, 3), (1L, 3))) |
Thank you Ankur! I'll add test to it. |
ok to test |
QA tests have started for PR 1903 at commit
|
QA tests have finished for PR 1903 at commit
|
* and merging duplicate vertex atrribute with mergeFunc. | ||
*/ | ||
def apply[VD: ClassTag]( | ||
iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD, mergeFunc: (VD, VD) => VD) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this line is too long - it would be great if you could wrap it. Also, I think the Spark style is for parameter lists to be indented 4 spaces instead of 2.
QA tests have started for PR 1903 at commit
|
QA tests have finished for PR 1903 at commit
|
Can one of the admins verify this patch? |
As described in commit message:
So there's hidden rule that default mergeFunc should be (a, b) => a. |
ok to test |
Yeah, a note about that default would be great. |
QA tests have started for PR 1903 at commit
|
QA tests have finished for PR 1903 at commit
|
This looks good! I'll merge it pending the doc update. |
QA tests have started for PR 1903 at commit
|
QA tests have finished for PR 1903 at commit
|
Your doc update made me realize that now that we're taking a mergeFunc in ShippableVertexPartition.initFrom, it's not ideal to use the iterator concatenation approach for setting the default values anymore, because the mergeFunc will get run on the default value, which might surprise users. I submitted a PR (larryxiao#1) to avoid this by doing the merge first, then populating the default values. |
ShippableVertexPartition.initFrom: Don't run mergeFunc on default values
Thanks Ankur! |
Jenkins, test this please. |
QA tests have started for PR 1903 at commit
|
QA tests have finished for PR 1903 at commit
|
Unrelated failure in Streaming. Jenkins, retest this please. |
QA tests have started for PR 1903 at commit
|
QA tests have finished for PR 1903 at commit
|
Thanks! Merged into master and branch-1.1. |
VertexRDD.apply had a bug where it ignored the merge function for duplicate vertices and instead used whichever vertex attribute occurred first. This commit fixes the bug by passing the merge function through to ShippableVertexPartition.apply, which merges any duplicates using the merge function and then fills in missing vertices using the specified default vertex attribute. This commit also adds a unit test for VertexRDD.apply. Author: Larry Xiao <[email protected]> Author: Blie Arkansol <[email protected]> Author: Ankur Dave <[email protected]> Closes #1903 from larryxiao/2062 and squashes the following commits: 625aa9d [Blie Arkansol] Merge pull request #1 from ankurdave/SPARK-2062 476770b [Ankur Dave] ShippableVertexPartition.initFrom: Don't run mergeFunc on default values 614059f [Larry Xiao] doc update: note about the default null value vertices construction dfdb3c9 [Larry Xiao] minor fix 1c70366 [Larry Xiao] scalastyle check: wrap line, parameter list indent 4 spaces e4ca697 [Larry Xiao] [TEST] VertexRDD.apply mergeFunc 6a35ea8 [Larry Xiao] [TEST] VertexRDD.apply mergeFunc 4fbc29c [Blie Arkansol] undo unnecessary change efae765 [Larry Xiao] fix mistakes: should be able to call with or without mergeFunc b2422f9 [Larry Xiao] Merge branch '2062' of github.com:larryxiao/spark into 2062 52dc7f7 [Larry Xiao] pass mergeFunc to VertexPartitionBase, where merge is handled 581e9ee [Larry Xiao] TODO: VertexRDDSuite 20d80a3 [Larry Xiao] [SPARK-2062][GraphX] VertexRDD.apply does not use the mergeFunc (cherry picked from commit 3bbbdd8) Signed-off-by: Ankur Dave <[email protected]>
create verticesDeduplicate with reduceByKey, using mergeFunc
then proceed with verticesDedup
But this is not tested and I want to add a test on VertexRDD.apply,
because it need Edges, should I place it in VertexRDDSuite or else?