-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make sources report their partitioning to Spark #176
base: spark-3.2
Are you sure you want to change the base?
Conversation
5eaba8c
to
5f2b057
Compare
Unit Test Results 832 files + 26 832 suites +26 34m 14s ⏱️ + 6m 8s Results for commit 40599a5. ± Comparison against base commit 6c4b463. This pull request removes 88 and adds 181 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
8a53242
to
7c3cb43
Compare
With Spark 3.3, using partitioning information reported by |
…es in one partition
40599a5
to
9771957
Compare
Test Results 992 files + 31 992 suites +31 43m 30s ⏱️ + 12m 44s Results for commit 9771957. ± Comparison against base commit 1429b4f. This pull request removes 88 and adds 181 tests. Note that renamed tests count towards both.
|
Having the sources report their partitioning to Spark allows Spark to exploit the existing partitioning and avoid shuffling all data for operations that require the existing partitioning.
For instance, reading triples with predicate partitioning produces a Dataset that is already partitioned by column
"predicate"
, so agroupBy("predicate")
would not need to shuffle the data at all:The
groupBy("predicate")
will not shuffle the graph data after reading from Dgraph.The Spark plan for this Dataset is:
Without reporting the existing partitioning, the plan would look like:
By reporting the partitioning, Spark remove the
Exchange hashpartitioning(predicate#3100, 2), true, [id=#1300]
step, as it becomes be redundant.This refactors
SingletonPartitioner
to extendPredicatePartitioner
but with a single partition (all predicates per partition). This allows 'NodeSourcein wide node mode to reject any predicate-partitioned partitioner while relying on
SingletonPartitioner` to provide the same behaviour as PredicatePartitioner with one partition did so far.