Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-8137][core] Improve treeAggregate to combine all data on each executor #7461

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,21 @@ abstract class RDD[T: ClassTag](
var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it)))
var numPartitions = partiallyAggregated.partitions.length
val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)

// Do one level of aggregation based on executorId before starting the tree
// NOTE: exclude the driver from list of executors
val numExecutors = math.max(context.getExecutorStorageStatus.length - 1, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be good to explicitly filter out the driver SparkContext.DRIVER_IDENTIFIER so we don't silently have a wrong number if we change this in the future

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this actually won't work in local mode since we only have 1 "executor / driver"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case numExecutors will become 1 and it will be aggregated in single pass similar to rdd.aggregate().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see there's a math.max(..., 1), never mind

partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { case (idx, iter) =>
def isAllDigits(x: String) = x forall Character.isDigit
val execId = SparkEnv.get.executorId
if (isAllDigits(execId)) {
iter.map((execId.toInt, _))
} else {
iter.map((execId.hashCode, _))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just use the hash code all the time? It reduces the complexity.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm it looks like the reason I added this in an earlier version was to avoid some hashCollisions[1]. Not sure what was exactly the problem as I can't reproduce any collisions with all digit strings -- so going with hashCode for now seems fine.

[1] amplab/ml-matrix@5d7e00e

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I can actually reproduce it -- so the problem is that the keyspace get un-balanced when we use hashCode on Strings. For example if you start with 100 exec ids and want to get 50 partitions out, some partitions will get 3 ids while some will get 1

(0 until 100).map(_.toString).map(_.hashCode % 50).groupBy(identity).map(_._2.size)

while with integers you get a uniform distribution

(0 until 100).map(_.hashCode % 50).groupBy(identity).map(_._2.size)

So you could end up with unbalanced trees which is not desirable.

@andrewor14 Can we guarantee that execId will be an integer ? Is there any other integer we can get a handle on otherwise ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think executor ID is always an integer except maybe during tests, but given that it's a string everywhere that assumption may not be correct, and we should certainly not fail when that happens.

Actually why do we need to convert it to an int? As long as they are distinct then reduceByKey should do the right thing, no?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we don't necessarily need ints but its easy to reason about how hashCode get balanced for ints (since the value of integer is the hashCode). If you leave it as a String we will still have the same issue I pointed out above with uneven tree structure

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see it's because HashPartitioner would hash that string anyway. One thing we could do is to just try to cast it to an int and if it doesn't work we fall back to the old approach. Unfortunately it's a little more code.

}
}.reduceByKey(new HashPartitioner(numExecutors), combOp).values
numPartitions = numExecutors

// If creating an extra level doesn't help reduce
// the wall-clock time, we stop tree aggregation.
while (numPartitions > scale + numPartitions / scale) {
Expand Down