Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-8103][core] DAGScheduler should not submit multiple concurrent attempts for a stage #6750

Closed
wants to merge 38 commits into from

Conversation

squito
Copy link
Contributor

@squito squito commented Jun 10, 2015

@squito
Copy link
Contributor Author

squito commented Jun 10, 2015

(separate comment just to avoid putting this in the pr description for eventual merging)

@kayousterhout I agree that your test is much clearer than running jobs with various pauses. I had tried to come up with something clearer but your version is much better.

I am a little torn, however -- this is certainly better for understanding this narrow case, but I feel it also gives a false sense of confidence b/c you only cover a narrow set of cases. Eg., your new case covers the extra failure happening after the reduce stage is retried, but not if the failure happens before the reduce stage is retried, or after the reduce stage retry has completely finished. To address that, I've been running a a variety of tests like this https://github.com/squito/spark/blob/combined_multi_attempt_fixes/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerFailureRecoverySuite.scala#L39
with some randomization in the parameters. FWIW, with these changes and #6648, those tests pass consistently for me. Of course, that is still getting limited coverage, and I'd like to expand it even more. I'd like to find a better home for running tests like that (not just running them periodically on my laptop), but I guess that is separate issue. So I think this can be merged as is, w/ the tests you wrote.

@squito squito changed the title [SPARK-8103][core] DAGScheduler should now submit multiple concurrent attempts for a stage [SPARK-8103][core] DAGScheduler should not submit multiple concurrent attempts for a stage Jun 10, 2015
@squito
Copy link
Contributor Author

squito commented Jun 11, 2015

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Jun 11, 2015

Test build #34655 has finished for PR 6750 at commit b6bc248.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class SparkIllegalStateException(message: String, cause: Throwable)

@SparkQA
Copy link

SparkQA commented Jun 11, 2015

Test build #34665 has finished for PR 6750 at commit ecb4e7d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class SparkIllegalStateException(message: String, cause: Throwable)

@SparkQA
Copy link

SparkQA commented Jun 11, 2015

Test build #34689 has finished for PR 6750 at commit 9601b47.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class SparkIllegalStateException(message: String, cause: Throwable)

@SparkQA
Copy link

SparkQA commented Jun 11, 2015

Test build #34692 has finished for PR 6750 at commit 89a59b6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class SparkIllegalStateException(message: String, cause: Throwable)

@SparkQA
Copy link

SparkQA commented Jun 11, 2015

Test build #34695 has finished for PR 6750 at commit 46bc26a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class SparkIllegalStateException(message: String, cause: Throwable)

Conflicts:
	core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@SparkQA
Copy link

SparkQA commented Jun 29, 2015

Test build #36042 has finished for PR 6750 at commit d8eb202.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class SparkIllegalStateException(message: String, cause: Throwable)

@squito
Copy link
Contributor Author

squito commented Jun 30, 2015

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Jun 30, 2015

Test build #36118 has finished for PR 6750 at commit d8eb202.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class SparkIllegalStateException(message: String, cause: Throwable)

@squito
Copy link
Contributor Author

squito commented Jun 30, 2015

ping @kayousterhout @markhamstra @mateiz

if (taskSets.size > 1) {
throw new SparkIllegalStateException("more than one active taskSet for stage " + stage)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kayousterhout How much of a concern should the extra overhead be here? Just wondering whether this (let's hope rare) condition might better be handled only in a non-production environment and behind an if(debug) kind of flag.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps the code could just look for an existing task set that matches the stage ID of the task set being added? That should be a little better than the filter / groupBy. Something like:

activeTaskSets.exists { case (_, ts) => ts.stageId == taskSet.stageId }

}
if (failedStage.latestInfo.attemptId != task.stageAttemptId) {
logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" +
s" ${task.stageAttemptId}, which has already failed")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change this to s" ${task.stageAttemptId}, and there is a more recent attempt for that stage (attempt ID ${failedStage.latestInfo.attemptId}) running"? It's a little misleading as-is, because we actually do still do something with the fetch failure if the stage has already been marked as failed, as long as there's not already a newer version of the stage running.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, good idea. done

@kayousterhout
Copy link
Contributor

I had one last idea for how to simplify this: squito#4, that I made sure doesn't break any tests this time! Let me know what your thoughts are.

@SparkQA
Copy link

SparkQA commented Jul 17, 2015

Test build #37558 has finished for PR 6750 at commit e43ac25.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class UnresolvedAttribute(nameParts: Seq[String]) extends Attribute
    • abstract class Star extends LeafExpression with NamedExpression
    • case class UnresolvedAlias(child: Expression) extends UnaryExpression with NamedExpression
    • case class SortOrder(child: Expression, direction: SortDirection) extends UnaryExpression
    • trait AggregateExpression extends Expression
    • trait PartialAggregate extends AggregateExpression
    • case class Min(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Max(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Count(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Average(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Sum(child: Expression) extends UnaryExpression with PartialAggregate
    • case class SumDistinct(child: Expression) extends UnaryExpression with PartialAggregate
    • case class First(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Last(child: Expression) extends UnaryExpression with PartialAggregate
    • trait Generator extends Expression
    • case class Explode(child: Expression) extends UnaryExpression with Generator
    • trait NamedExpression extends Expression
    • abstract class Attribute extends LeafExpression with NamedExpression
    • case class PrettyAttribute(name: String) extends Attribute
    • abstract class LeafNode extends LogicalPlan
    • abstract class UnaryNode extends LogicalPlan

.flatMap(activeTaskSets.get)
.map(taskSetMgr => (id, taskSetMgr.stageId, taskSetMgr.taskSet.attempt, metrics))
for {
(stageId, stageAttemptId) <- taskIdToStageIdAndAttempt.get(id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use the taskSetManagerForTask(id) method here?

@kayousterhout
Copy link
Contributor

Ok I did a last pass over this...just some last style comments, and it sounds like you're also going to do the cleanup of making the hashmap indexed on task id store the task set manager as a value? Then this looks good.

@squito
Copy link
Contributor Author

squito commented Jul 17, 2015

thanks for the last set of comments @kayousterhout . I've updated based on your suggestions

@kayousterhout
Copy link
Contributor

There's just one lingering erroneous log message, and then this LGTM.

@kayousterhout
Copy link
Contributor

Thanks for all the iterations on this...I'm hopeful that if we made the scheduler code a little better / easier to read with each PR, these will become less painful over time.

@squito
Copy link
Contributor Author

squito commented Jul 17, 2015

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Jul 17, 2015

Test build #37655 has finished for PR 6750 at commit e01b7aa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 17, 2015

Test build #37663 has finished for PR 6750 at commit fb3acfc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor Author

squito commented Jul 17, 2015

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Jul 17, 2015

Test build #32 has finished for PR 6750 at commit fb3acfc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class StandaloneRecoveryModeFactory(conf: SparkConf, serializer: Serializer)
    • class RFormula(override val uid: String)
    • case class UnresolvedAttribute(nameParts: Seq[String]) extends Attribute
    • abstract class Star extends LeafExpression with NamedExpression
    • case class UnresolvedAlias(child: Expression) extends UnaryExpression with NamedExpression
    • abstract class LeafExpression extends Expression
    • abstract class UnaryExpression extends Expression
    • abstract class BinaryExpression extends Expression
    • case class SortOrder(child: Expression, direction: SortDirection) extends UnaryExpression
    • trait AggregateExpression extends Expression
    • trait PartialAggregate extends AggregateExpression
    • case class Min(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Max(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Count(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Average(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Sum(child: Expression) extends UnaryExpression with PartialAggregate
    • case class SumDistinct(child: Expression) extends UnaryExpression with PartialAggregate
    • case class First(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Last(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Pmod(left: Expression, right: Expression) extends BinaryArithmetic
    • final class SpecificRow extends $
    • trait Generator extends Expression
    • case class Explode(child: Expression) extends UnaryExpression with Generator
    • trait NamedExpression extends Expression
    • abstract class Attribute extends LeafExpression with NamedExpression
    • case class PrettyAttribute(name: String) extends Attribute
    • case class Length(child: Expression) extends UnaryExpression with ExpectsInputTypes
    • case class FormatNumber(x: Expression, d: Expression)
    • abstract class LeafNode extends LogicalPlan
    • abstract class UnaryNode extends LogicalPlan
    • abstract class BinaryNode extends LogicalPlan

@SparkQA
Copy link

SparkQA commented Jul 17, 2015

Test build #33 has finished for PR 6750 at commit fb3acfc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class StandaloneRecoveryModeFactory(conf: SparkConf, serializer: Serializer)
    • class RFormula(override val uid: String)
    • case class UnresolvedAttribute(nameParts: Seq[String]) extends Attribute
    • abstract class Star extends LeafExpression with NamedExpression
    • case class UnresolvedAlias(child: Expression) extends UnaryExpression with NamedExpression
    • abstract class LeafExpression extends Expression
    • abstract class UnaryExpression extends Expression
    • abstract class BinaryExpression extends Expression
    • case class SortOrder(child: Expression, direction: SortDirection) extends UnaryExpression
    • trait AggregateExpression extends Expression
    • trait PartialAggregate extends AggregateExpression
    • case class Min(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Max(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Count(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Average(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Sum(child: Expression) extends UnaryExpression with PartialAggregate
    • case class SumDistinct(child: Expression) extends UnaryExpression with PartialAggregate
    • case class First(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Last(child: Expression) extends UnaryExpression with PartialAggregate
    • case class Pmod(left: Expression, right: Expression) extends BinaryArithmetic
    • final class SpecificRow extends $
    • trait Generator extends Expression
    • case class Explode(child: Expression) extends UnaryExpression with Generator
    • trait NamedExpression extends Expression
    • abstract class Attribute extends LeafExpression with NamedExpression
    • case class PrettyAttribute(name: String) extends Attribute
    • case class Length(child: Expression) extends UnaryExpression with ExpectsInputTypes
    • case class FormatNumber(x: Expression, d: Expression)
    • abstract class LeafNode extends LogicalPlan
    • abstract class UnaryNode extends LogicalPlan
    • abstract class BinaryNode extends LogicalPlan

@SparkQA
Copy link

SparkQA commented Jul 17, 2015

Test build #37668 has finished for PR 6750 at commit fb3acfc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in 80e2568 Jul 20, 2015
markhamstra pushed a commit to markhamstra/spark that referenced this pull request Jul 21, 2015
… attempts for a stage

https://issues.apache.org/jira/browse/SPARK-8103

cc kayousterhout (thanks for the extra test case)

Author: Imran Rashid <[email protected]>
Author: Kay Ousterhout <[email protected]>
Author: Imran Rashid <[email protected]>

Closes apache#6750 from squito/SPARK-8103 and squashes the following commits:

fb3acfc [Imran Rashid] fix log msg
e01b7aa [Imran Rashid] fix some comments, style
584acd4 [Imran Rashid] simplify going from taskId to taskSetMgr
e43ac25 [Imran Rashid] Merge branch 'master' into SPARK-8103
6bc23af [Imran Rashid] update log msg
4470fa1 [Imran Rashid] rename
c04707e [Imran Rashid] style
88b61cc [Imran Rashid] add tests to make sure that TaskSchedulerImpl schedules correctly with zombie attempts
d7f1ef2 [Imran Rashid] get rid of activeTaskSets
a21c8b5 [Imran Rashid] Merge branch 'master' into SPARK-8103
906d626 [Imran Rashid] fix merge
109900e [Imran Rashid] Merge branch 'master' into SPARK-8103
c0d4d90 [Imran Rashid] Revert "Index active task sets by stage Id rather than by task set id"
f025154 [Imran Rashid] Merge pull request #2 from kayousterhout/imran_SPARK-8103
baf46e1 [Kay Ousterhout] Index active task sets by stage Id rather than by task set id
19685bb [Imran Rashid] switch to using latestInfo.attemptId, and add comments
a5f7c8c [Imran Rashid] remove comment for reviewers
227b40d [Imran Rashid] style
517b6e5 [Imran Rashid] get rid of SparkIllegalStateException
b2faef5 [Imran Rashid] faster check for conflicting task sets
6542b42 [Imran Rashid] remove extra stageAttemptId
ada7726 [Imran Rashid] reviewer feedback
d8eb202 [Imran Rashid] Merge branch 'master' into SPARK-8103
46bc26a [Imran Rashid] more cleanup of debug garbage
cb245da [Imran Rashid] finally found the issue ... clean up debug stuff
8c29707 [Imran Rashid] Merge branch 'master' into SPARK-8103
89a59b6 [Imran Rashid] more printlns ...
9601b47 [Imran Rashid] more debug printlns
ecb4e7d [Imran Rashid] debugging printlns
b6bc248 [Imran Rashid] style
55f4a94 [Imran Rashid] get rid of more random test case since kays tests are clearer
7021d28 [Imran Rashid] update test since listenerBus.waitUntilEmpty now throws an exception instead of returning a boolean
883fe49 [Kay Ousterhout] Unit tests for concurrent stages issue
6e14683 [Imran Rashid] unit test just to make sure we fail fast on concurrent attempts
06a0af6 [Imran Rashid] ignore for jenkins
c443def [Imran Rashid] better fix and simpler test case
28d70aa [Imran Rashid] wip on getting a better test case ...
a9bf31f [Imran Rashid] wip
markhamstra pushed a commit to markhamstra/spark that referenced this pull request Jul 22, 2015
… attempts for a stage

https://issues.apache.org/jira/browse/SPARK-8103

cc kayousterhout (thanks for the extra test case)

Author: Imran Rashid <[email protected]>
Author: Kay Ousterhout <[email protected]>
Author: Imran Rashid <[email protected]>

Closes apache#6750 from squito/SPARK-8103 and squashes the following commits:

fb3acfc [Imran Rashid] fix log msg
e01b7aa [Imran Rashid] fix some comments, style
584acd4 [Imran Rashid] simplify going from taskId to taskSetMgr
e43ac25 [Imran Rashid] Merge branch 'master' into SPARK-8103
6bc23af [Imran Rashid] update log msg
4470fa1 [Imran Rashid] rename
c04707e [Imran Rashid] style
88b61cc [Imran Rashid] add tests to make sure that TaskSchedulerImpl schedules correctly with zombie attempts
d7f1ef2 [Imran Rashid] get rid of activeTaskSets
a21c8b5 [Imran Rashid] Merge branch 'master' into SPARK-8103
906d626 [Imran Rashid] fix merge
109900e [Imran Rashid] Merge branch 'master' into SPARK-8103
c0d4d90 [Imran Rashid] Revert "Index active task sets by stage Id rather than by task set id"
f025154 [Imran Rashid] Merge pull request #2 from kayousterhout/imran_SPARK-8103
baf46e1 [Kay Ousterhout] Index active task sets by stage Id rather than by task set id
19685bb [Imran Rashid] switch to using latestInfo.attemptId, and add comments
a5f7c8c [Imran Rashid] remove comment for reviewers
227b40d [Imran Rashid] style
517b6e5 [Imran Rashid] get rid of SparkIllegalStateException
b2faef5 [Imran Rashid] faster check for conflicting task sets
6542b42 [Imran Rashid] remove extra stageAttemptId
ada7726 [Imran Rashid] reviewer feedback
d8eb202 [Imran Rashid] Merge branch 'master' into SPARK-8103
46bc26a [Imran Rashid] more cleanup of debug garbage
cb245da [Imran Rashid] finally found the issue ... clean up debug stuff
8c29707 [Imran Rashid] Merge branch 'master' into SPARK-8103
89a59b6 [Imran Rashid] more printlns ...
9601b47 [Imran Rashid] more debug printlns
ecb4e7d [Imran Rashid] debugging printlns
b6bc248 [Imran Rashid] style
55f4a94 [Imran Rashid] get rid of more random test case since kays tests are clearer
7021d28 [Imran Rashid] update test since listenerBus.waitUntilEmpty now throws an exception instead of returning a boolean
883fe49 [Kay Ousterhout] Unit tests for concurrent stages issue
6e14683 [Imran Rashid] unit test just to make sure we fail fast on concurrent attempts
06a0af6 [Imran Rashid] ignore for jenkins
c443def [Imran Rashid] better fix and simpler test case
28d70aa [Imran Rashid] wip on getting a better test case ...
a9bf31f [Imran Rashid] wip
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants