Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10666][SPARK-6880][CORE] Use properties from ActiveJob associated with a Stage #6291

Closed
wants to merge 10 commits into from

Conversation

markhamstra
Copy link
Contributor

This issue was addressed in #5494, but the fix in that PR, while safe in the sense that it will prevent the SparkContext from shutting down, misses the actual bug. The intent of submitMissingTasks should be understood as "submit the Tasks that are missing for the Stage, and run them as part of the ActiveJob identified by jobId". Because of a long-standing bug, the jobId parameter was never being used. Instead, we were trying to use the jobId with which the Stage was created -- which may no longer exist as an ActiveJob, hence the crash reported in SPARK-6880.

The correct fix is to use the ActiveJob specified by the supplied jobId parameter, which is guaranteed to exist at the call sites of submitMissingTasks.

This fix should be applied to all maintenance branches, since it has existed since 1.0.

@kayousterhout @pankajarora12

@SparkQA
Copy link

SparkQA commented May 20, 2015

Test build #33165 has finished for PR 6291 at commit 71ea2a6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class MultilabelMetrics(JavaModelWrapper):
    • class SimpleFunctionRegistry(val conf: CatalystConf) extends FunctionRegistry
    • class GroupedData protected[sql](

mbautin pushed a commit to mbautin/spark that referenced this pull request May 21, 2015
Author: Mark Hamstra
Apache Spark master PR: apache#6291

This issue was addressed in apache#5494, but the fix in that PR, while safe in the
sense that it will prevent the SparkContext from shutting down, misses the
actual bug. The intent of submitMissingTasks should be understood as "submit
the Tasks that are missing for the Stage, and run them as part of the ActiveJob
identified by jobId". Because of a long-standing bug, the jobId parameter was
never being used. Instead, we were trying to use the jobId with which the Stage
was created -- which may no longer exist as an ActiveJob, hence the crash
reported in SPARK-6880.

The correct fix is to use the ActiveJob specified by the supplied jobId
parameter, which is guaranteed to exist at the call sites of
submitMissingTasks.

This fix should be applied to all maintenance branches, since it has existed
since 1.0.
mbautin added a commit to mbautin/spark that referenced this pull request May 21, 2015
@squito
Copy link
Contributor

squito commented May 22, 2015

can you add a test case? Seems especially important given that a previous fix didn't actually catch the bug. I don't understand what's going on well enough to know what that test is [actually that is part of the reason I'd like to see a reproduction :) ] but maybe this is a case where it doesn't really make sense to make a very narrow unit case; but instead we need something which just stresses the DAGScheduler with a bunch of jobs that have a high likelihood of triggering this, so we just run it a lot. (Which might mean it needs to be run outside of the PR builder ...)

@markhamstra
Copy link
Contributor Author

@squito I'm not sure what you want to test. The change is actually very straightforward. In essence, we started with what should have been obviously broken code:

getMissingTasks(aStage, theActiveJobIdForTheStage)
...
def getMissingTasks(stage, jobId) = {
  // ignore jobId
  ...
  val properties = jobIdToActiveJob(wrongAndSometimesNotThereJobId).properties
  ...
}

...to code that covers up the problem:

  val properties = jobIdToActiveJob.get(wrongAndSometimesNotThereJobId).map(_.properties).orNull

...to code that just uses theActiveJobIdForTheStage as it should have all along.

@squito
Copy link
Contributor

squito commented May 26, 2015

@markhamstra oh I'm not saying that your change is bad or questionable at all. But I am wondering, what actually went wrong before this change? Are we sure this change fixes it? Can we protect against future regression? My point is that given that the previous attempted fix #5494 didn't solve it, and nobody has noticed the problem in all this time, it seems worth putting in a test which reproduces the exception without your change, and passes after your change.

The jira doesn't have enough info for me to suggest what that reproduction would be, but it seems like you understand it better than me.

@kayousterhout
Copy link
Contributor

The change looks good but +1 to @squito's request for a test. I'm still a little confused about how we could get into a situation where the Stage's jobId doesn't exist anymore, and a test would help clarify that and make sure the bug doesn't resurface in the future.

Is there a bigger issue lurking here, that the jobId associated with a Stage object is not generally safe for use?

@markhamstra
Copy link
Contributor Author

Ok, so what was happening is that a stage would get created as part of a particular job. When we'd getMissingTasks for that stage, we were always using the jobId under which the stage was created. That's the right thing to do as long as that job is an ActiveJob, but if that job completes and a subsequent job needs to recalculate the stage's results, trying to get the ActiveJob for the already completed job is going to fail.

We're trying to get an ActiveJob for the needed stage because we want to use that job's properties. In the sense that no exception will be thrown and the TaskSet will be submitted with default properties, it will work to use null properties when the lookup of the no-longer-ActiveJob fails, but that's not really what we want.

Before calling submitMissingTasks, we always get the ActiveJob under which we want those tasks to run, so we should just use that ActiveJob instead of ignoring it. In fact, we may also want to change this part of submitMissingTasks:

taskScheduler.submitTasks(
        new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))

What that is saying now is to use the jobId under which the stage was created as the priority, which will be a more urgent FIFO priority than the ActiveJob under which the stage is actually being run if the ActiveJob for the stage is no longer that at-stage-creation job. To be consistent with using the properties from the ActiveJob, we should also be using the jobId here instead of stage.jobId.

@kayousterhout
Copy link
Contributor

Ok thanks for the explanation, and it sounds like writing a unit test for that case should be straightforward?

I was going to suggest that we just pass the properties into this method to sidestep the problem altogether, but I agree with your assessment that we should also change the submitTasks() call to use the newer jobId!

@squito
Copy link
Contributor

squito commented May 26, 2015

thanks mark. If I understand correctly, the earlier PR did "fix" the NoSuchElementException as reported in the jira. So its not like you can write a test case which creates an exception before your fix here. But, the earlier PR didn't fix the real mistake in the code, so the remaining issue is things like wrong priority, jobGroup, etc.

@markhamstra
Copy link
Contributor Author

@kayousterhout In looking through our other uses of stage.jobId in the DAGScheduler, I didn't see anything that jumped out as an obvious problem except for the already mentioned submitTasks call. It's going too far to say that the jobId associated with a Stage object is not generally safe for use, but we do need to keep in mind that that jobId doesn't necessarily map to a still ActiveJob.

@kayousterhout
Copy link
Contributor

@markhamstra I just submitted #6418 to try to improve the naming around this a bit to avoid these issues in the future.

@markhamstra
Copy link
Contributor Author

Added the proposed change in submitTasks.

@SparkQA
Copy link

SparkQA commented May 26, 2015

Test build #33538 has finished for PR 6291 at commit 5570e8f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@markhamstra markhamstra force-pushed the SPARK-6880 branch 4 times, most recently from 26abe08 to c41f894 Compare May 27, 2015 16:42
@SparkQA
Copy link

SparkQA commented May 27, 2015

Test build #33595 has finished for PR 6291 at commit 26abe08.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 27, 2015

Test build #33598 has finished for PR 6291 at commit c41f894.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor

Should we aim to get this in for Spark 1.5? Looks like it's conflicted. @kayousterhout @squito, will you be able to sign off after this is brought up-to-date?

@markhamstra
Copy link
Contributor Author

I just brought it up-to-date. Putting together a test or two is something I'm going to try to get to today or tomorrow.

@SparkQA
Copy link

SparkQA commented Aug 1, 2015

Test build #39369 has finished for PR 6291 at commit af6b628.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@markhamstra
Copy link
Contributor Author

ping("test added")

@squito
Copy link
Contributor

squito commented Aug 4, 2015

thanks mark, I should be able to look at this later tonight

@markhamstra
Copy link
Contributor Author

Yeah, don't bother quite yet -- the test isn't actually exercising the code path that it needs to. :(

@markhamstra
Copy link
Contributor Author

Nothing actually new -- I still think this one is ready to go after a little review. I am noting, however, that the prior discussion misses one element that raises the significance of retaining the correct job properties a little more: the executionId in SQLExecution is also a local job property, so we really don't want to be losing that unnecessarily. @marmbrus


// remove job1 as an ActiveJob
cancel(jobId1)
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly for my own understanding -- I don't actually think this is necessary, right? At first I was thinking we need to put this in a bunch more places, as I know some other tests recently had some flakiness fixed by adding this in. But looking closer, this is only needed when are checking anything stored in our spark listener, not in the scheduler itself, right?

That said, if there is any doubt, better to leave it in place.

@squito
Copy link
Contributor

squito commented Nov 6, 2015

sorry for the delayed review @markhamstra. just some minor comments, otherwise lgtm.

As far as the sql executionId, I assume this change is also the right fix there as well -- it would be pretty weird if the executionId was supposed to retain the old value, even after the original job had been cancelled? Or are you just saying this fix is more important than we thought? Of course, the executionId suffers from the same problem we've already noted with properties & priority, that a taskSet which was already started is still "stuck" with the executionId it started with.

@marmbrus
Copy link
Contributor

marmbrus commented Nov 6, 2015

@zsxwing can you look at the execution id stuff here?

@markhamstra
Copy link
Contributor Author

@squito Yes, I'm pretty sure that this is the right fix for executionId as well. I'm just saying that previously we were talking about losing the job's properties only really affecting scheduling priority and job description, so there wasn't much impact to using the prior safe-but-not-quite-correct fix. With Spark SQL also relying upon the job properties, it's a little more important to make sure they are correct.

@zsxwing
Copy link
Member

zsxwing commented Nov 6, 2015

This fix looks good to me.

However, both SQL and Streaming use only SparkListenerJobStart.properties. So actually, they are not affected by this bug.

@markhamstra
Copy link
Contributor Author

Ok, how about the setting of JobGroup and the "spark.scheduler.pool" property in thriftserver/SparkExecuteStatementOperation.scala? Again, I don't see any reason why the fix would be any different; I'm just noting places where the bug can potentially have an effect.

@markhamstra
Copy link
Contributor Author

Once more, Imran.

@squito
Copy link
Contributor

squito commented Nov 13, 2015

lgtm pending tests!

@SparkQA
Copy link

SparkQA commented Nov 14, 2015

Test build #45892 has finished for PR 6291 at commit 2ff80ca.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@markhamstra
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Nov 14, 2015

Test build #45934 has finished for PR 6291 at commit 2ff80ca.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 16, 2015

Test build #45987 has finished for PR 6291 at commit 31ba5de.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in 0a5aef7 Nov 25, 2015
asfgit pushed a commit that referenced this pull request Nov 25, 2015
…ted with a Stage

This issue was addressed in #5494, but the fix in that PR, while safe in the sense that it will prevent the SparkContext from shutting down, misses the actual bug.  The intent of `submitMissingTasks` should be understood as "submit the Tasks that are missing for the Stage, and run them as part of the ActiveJob identified by jobId".  Because of a long-standing bug, the `jobId` parameter was never being used.  Instead, we were trying to use the jobId with which the Stage was created -- which may no longer exist as an ActiveJob, hence the crash reported in SPARK-6880.

The correct fix is to use the ActiveJob specified by the supplied jobId parameter, which is guaranteed to exist at the call sites of submitMissingTasks.

This fix should be applied to all maintenance branches, since it has existed since 1.0.

kayousterhout pankajarora12

Author: Mark Hamstra <[email protected]>
Author: Imran Rashid <[email protected]>

Closes #6291 from markhamstra/SPARK-6880.

(cherry picked from commit 0a5aef7)
Signed-off-by: Imran Rashid <[email protected]>
asfgit pushed a commit that referenced this pull request Nov 25, 2015
…ted with a Stage

This issue was addressed in #5494, but the fix in that PR, while safe in the sense that it will prevent the SparkContext from shutting down, misses the actual bug.  The intent of `submitMissingTasks` should be understood as "submit the Tasks that are missing for the Stage, and run them as part of the ActiveJob identified by jobId".  Because of a long-standing bug, the `jobId` parameter was never being used.  Instead, we were trying to use the jobId with which the Stage was created -- which may no longer exist as an ActiveJob, hence the crash reported in SPARK-6880.

The correct fix is to use the ActiveJob specified by the supplied jobId parameter, which is guaranteed to exist at the call sites of submitMissingTasks.

This fix should be applied to all maintenance branches, since it has existed since 1.0.

kayousterhout pankajarora12

Author: Mark Hamstra <[email protected]>
Author: Imran Rashid <[email protected]>

Closes #6291 from markhamstra/SPARK-6880.

(cherry picked from commit 0a5aef7)
Signed-off-by: Imran Rashid <[email protected]>
@squito
Copy link
Contributor

squito commented Nov 25, 2015

thanks, merged to master / 1.6 / 1.5

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants