Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23033][SS] Don't use task level retry for continuous processing #20225

Closed
wants to merge 15 commits into from
Closed
Prev Previous commit
Next Next commit
use ExpectFailure
jose-torres committed Jan 11, 2018
commit b5d621b70416199311a69f4b3b9fbd6c03acf6df
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming.continuous

import java.util.UUID

import org.apache.spark.{SparkContext, SparkEnv}
import org.apache.spark.{SparkContext, SparkEnv, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, SparkListenerTaskStart}
import org.apache.spark.sql._
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, WriteToDataSourceV2Exec}
@@ -221,20 +221,15 @@ class ContinuousSuite extends ContinuousSuiteBase {
testStream(df, useV2Sink = true)(
StartStream(Trigger.Continuous(100)),
Execute(waitForRateSourceTriggers(_, 2)),
Execute { query =>
Execute { _ =>
// Wait until a task is started, then kill its first attempt.
eventually(timeout(streamingTimeout)) {
assert(taskId != -1)
}
spark.sparkContext.killTaskAttempt(taskId)
eventually(timeout(streamingTimeout)) {
assert(query.exception.isDefined)
}
assert(
query.exception.get.getCause != null &&
query.exception.get.getCause.getCause != null &&
query.exception.get.getCause.getCause.getCause
.isInstanceOf[ContinuousTaskRetryException])
},
ExpectFailure[SparkException] { e =>
e.getCause != null && e.getCause.getCause.isInstanceOf[ContinuousTaskRetryException]
})
} finally {
spark.sparkContext.removeSparkListener(listener)