Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10797] RDD's coalesce should not write out the temporary key #8979

Closed
wants to merge 4 commits into from
Closed

[SPARK-10797] RDD's coalesce should not write out the temporary key #8979

wants to merge 4 commits into from

Conversation

zzvara
Copy link
Contributor

@zzvara zzvara commented Oct 5, 2015

I think we have the following options to solve this problem:

  1. [General, but too complex] Create a new ShuffledRDD, add support to shuffle system to read values instead of key-value pairs.
  2. [General, too complex] Create a new CoalescedRDD, that bypasses ShuffledRDD and reads from shuffle-layer directly - still need to add support to read values.
  3. [Minimal impact on code] Add option for shuffle-layer to write out values only and cast the key-value iterator at read-site to Iterator[T].

I've implemented and I'm using the 3rd option. You will experience speed-up based on the size of current payload (values).

@zzvara
Copy link
Contributor Author

zzvara commented Oct 7, 2015

Can I have a test build here?

@SparkQA
Copy link

SparkQA commented Oct 7, 2015

Test build #1850 has finished for PR 8979 at commit 2b5f77b.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class ShuffleHandle(val shuffleId: Int) extends Serializable

@zzvara
Copy link
Contributor Author

zzvara commented Oct 7, 2015

Can I have a test build here?

@JoshRosen
Copy link
Contributor

Hey @ehnalis, I like the core idea behind this patch. However, I'm a bit concerned about how many files it needs to touch and the level of complexity here.

I think that your approach 1, of creating a new ShuffledRDD variant to do value-only shuffle, actually won't be too much work. I did something similar, but optimized only for Spark SQL's UnsafeRow format, in #7456. Could you take a look at the implementation of ShuffledRowRDD in that patch to assess whether you could do something similar (but more generic) here?

@JoshRosen
Copy link
Contributor

Actually, let me think about this a bit more. The approach here might be easier, but I'll need to spend a bit more time to think through this in the context of some other planned shuffle refactorings.

@zzvara
Copy link
Contributor Author

zzvara commented Oct 20, 2015

It does touch a few files, but the level of complexity it introduces is minimal I think. It might be convenient for now, to apply this patch and reconsider a step-by-step shuffle-refactoring. The sad thing is that other approaches would touch ExternalSorter aswell as other classes heavily.

@zzvara
Copy link
Contributor Author

zzvara commented Oct 20, 2015

I'm going to look upon your previous PR tomorrow and will think through a more generic way of solving this, but I think it would mean a huge hit on the shuffle-codebase.

Let me know, if you would like me to rebase this PR.

@zzvara
Copy link
Contributor Author

zzvara commented Oct 21, 2015

I've looked into your implementation of ShuffledRowRDD. This was something that I was thinking of in the first place. Why I stuck with the simpler case is that we only use this upon repartitioning, and it would mean a not-so-beauty code-duplication in some sense. What do you think?

@zzvara
Copy link
Contributor Author

zzvara commented Dec 5, 2015

@JoshRosen, how's your refactoring? Should I rebase to PR again?

@andrewor14
Copy link
Contributor

@ehnalis have you done some benchmarking on how much this actually saves us? Both in terms of time and shuffle size?

@zzvara
Copy link
Contributor Author

zzvara commented Dec 15, 2015

@andrewor14 On a 4 node YARN cluster I've measured a 38% shuffle-size reduction when the payload was an (Int, String) pair (JavaSerializer), where the average length of String was 5.1 (English word). I did not benchmark time.

Practically you save an Int on each record.

@rxin
Copy link
Contributor

rxin commented Jun 15, 2016

@zzvara this is a pretty cool idea -- we probably want to generalize this a little bit more in Spark 2.1 or Spark 2.2.

I'm going through a list of pull requests to cut them down since the sheer number is breaking some of the tooling we have. Due to lack of activity on this pull request, I'm going to push a commit to close it. Let's keep discussing it in the jira ticket and revisit.

@asfgit asfgit closed this in 1a33f2e Jun 15, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants