Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-8137][core] Improve treeAggregate to combine all data on each executor #7461

Closed
wants to merge 1 commit into from

Conversation

kmadhugit
Copy link

No description provided.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@JoshRosen
Copy link
Contributor

@shivaram, if I recall you had worked on something similar to this? I'll see if I can pull up that link to your code to compare approaches.

@shivaram
Copy link
Contributor

Yep - I can take a look as well. @kmadhugit it would be great if we could have a test case for this as well

@sryza
Copy link
Contributor

sryza commented Jul 17, 2015

Small thing, but can the title of this PR be changed to "...combine all data on each executor..." to clarify that all the data isn't going to a single executor?

@kmadhugit
Copy link
Author

@shivaram, we do have few testcases to test the functionality of treeAggregate with different depths. Since we haven't changed the external functionality of API, could you please tell me what kind additional test you would like me add.

@shivaram
Copy link
Contributor

I was thinking about having test cases that actually make sure that the treeAggregate is aggregating all the data on one executor as this change is supposed to do. For example in the DAGScheduler [1] we have tests to check if tasks run at the expected location.

[1]

test("reduce task locality preferences should only include machines with largest map outputs") {

@kmadhugit
Copy link
Author

@shivaram , I've created a test case using a different approach. The testcase would check whether the treeAggregate combines all local partitions together in first reduction operation. The testcase is given below,

https://github.com/kmadhugit/spark-testcase/blob/master/src/main/scala/org/madhu/App.scala

But the testcase fails, because DAG scheduler doesn't seems to be scheduling the reduce tasks based on the locality of partition in the mapside.

Lets take an example of a RDD with 6 partitions and 2 executors depth as 3. The executor A has partitions 1,3,5 and executor B has 2,4 & 6. The default tree Aggregation tries to merge no of partitions as (6 partitions)->(3)->(1) but we would like to it to aggregate partitions as (6)->(2)->(1). By using executor key in reduceByKey we were able trigger merging correctly, i.e 1,3,5 into one partition and 2,4,6 to another. However the actual merge of these partitions((1,3,5)=>X & (2,4,6)=>Y) is happening on executor A itself, so the data from partitions 2,4 & 6 are transferred from executor B to A. In effect we couldn't avoid the shuffle effect even after selecting correct partitions to merge.

I used two executors in same node in standalone mode. If that is the reason for this behavior then we may need relax the requirement as same work-node rather than same executor for local partitions merge.

@shivaram
Copy link
Contributor

Thanks @kmadhugit , the test case sounds good to me. Could you try this on a multi-node cluster ? We just set the locality preference for the scheduler and I guess in the case of shuffle data the default is NODE_LOCAL as the data is on shared disk (and not in memory ?).

@kmadhugit
Copy link
Author

I see that TaskSets are tagged as PROCESS_LOCAL,
15/08/17 20:31:29 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 9.3.126.251, PROCESS_LOCAL, 1487 bytes)

I tried it in multi-node cluster, the results are same. The testcase still fails.

@shivaram
Copy link
Contributor

Hmm I see - We'll need to instrument the DAGScheduler to see what is going on here -- for example printing the location preferences set for the reduce task might be helpful here (we can do that with say logDebug ?) [1]

cc @mateiz who is looking at related issues

[1]

val topLocsForReducer = mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId,

@kmadhugit
Copy link
Author

@shivaram , @mateiz ,

Found the root cause, its not a bug in DAGScheduler's location preference logic. The executors ares registered to SparkDeploySchedulerBackend in a lazy manner only when the first task needs to be deployed on them. The block manager registration is part of executor initialization hence they also gets registered with BlockManagerMasterEndPoint during the deployment of first task. The driver block manager getting registered as a part of SparkContext creation. So before DAGScheduler submits its first job, there will be only one block manager(i.e driver) registered with BlockManagerMasterEndPoint.

In treeAggregate(), at the time of calling sc.getExecutorStorageStatus.length the BlockManagerMaster would return 1 as there was no prior job submitted by DAGScheduler. So we end up doing something similar to rdd.aggregate() causing the failure.

I could think of couple of solutions,

  1. Call some simple action on the RDD may be count, or before calling sc.getExecutorStorageStatus.length
  2. Try to get number of executors instead of BlockManagers - This may give problem if some of executor didn't process the data.

Let me know your comments.


// Do one level of aggregation based on executorId before starting the tree
// NOTE: exclude the driver from list of executors
val numExecutors = math.max(context.getExecutorStorageStatus.length - 1, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be good to explicitly filter out the driver SparkContext.DRIVER_IDENTIFIER so we don't silently have a wrong number if we change this in the future

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this actually won't work in local mode since we only have 1 "executor / driver"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case numExecutors will become 1 and it will be aggregated in single pass similar to rdd.aggregate().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see there's a math.max(..., 1), never mind

@andrewor14
Copy link
Contributor

@kmadhugit can you update the title per @sryza's suggestion and add a few unit tests?

@kmadhugit kmadhugit changed the title [SPARK-8137][core] Improve treeAggregate to combine all data on one m… [SPARK-8137][core] Improve treeAggregate to combine all data on each executor Sep 2, 2015
@kmadhugit
Copy link
Author

I've modified the title.

I'm facing a problem during unit test. When I call context.getExecutorStorageStatus.length it returns 1 instead of numExecutors + 1(for driver). This happens because there is no prior job submitted to the executor by DAG, so the only registered block manager is the driver. We may need to find an alternative way to know number of executors(with an assumption that all executors will store some blocks of the RDD). Without that change this fix won't work.

@shivaram
Copy link
Contributor

shivaram commented Sep 2, 2015

btw @kmadhugit -- your point about numExecutors is very true. I thought about this a bit more and we could in some cases have a very large cluster and not all executors might have this RDD for instance. So in that case we should only use executors which have this RDD.

Unfortunately right now this either requires two passes over the RDD or some techniques to inspect the output of the map stage and then size the number of reducers based on that. cc @mateiz who has been working on the latter and might have something to add.

@andrewor14
Copy link
Contributor

Yeah the problem is there's no reliable way to tell whether the number of executors that your job will be run with in advance. New executors can register within the small window and tasks can be scheduled on them.

@kmadhugit
Copy link
Author

@andrewor14 , @shivaram , @mateiz

I debugged little more and found that its the timing issue causing context.getExecutorStorageStatus.length to return 1 in my testcase. The executors are registered asynchronously,so when treeAggregate calls context.getexecutorStorageStatus, the executor registration process was still on the way, none of them were completed. So if I add a "Thread sleep 5000" before calling treeAggregate then the testcase passes, i.e it gives sufficient time for executors to get registered with the sparkcontext of the application.

This scenario would rarely occur in actual customer application and we can always skip this optimization if no of executors are less than a threshold value.Now, the question is should we go ahead with the fix by adding a testcase with the delay and sanity check for this specific case in treeAggregate?

Though there is no way to exactly predict the correct number of executors, there may not be drastic change in number of executors between the point we call context.getExecutorStorageStatus.length and reduceByKey() inside treeAggregate. An approximate number of executor should be sufficient for this optimization as minor change to number of executor shouldn't cause any optimization/functional issues.

Let me know.

@rxin
Copy link
Contributor

rxin commented Jun 15, 2016

Thanks for the pull request. I'm going through a list of pull requests to cut them down since the sheer number is breaking some of the tooling we have. Due to lack of activity on this pull request, I'm going to push a commit to close it. Feel free to reopen it or create a new one.

@asfgit asfgit closed this in 1a33f2e Jun 15, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants