Skip to content

Commit

Permalink
use ExpectFailure
Browse files Browse the repository at this point in the history
  • Loading branch information
jose-torres committed Jan 11, 2018
1 parent b40c8f0 commit b5d621b
Showing 1 changed file with 5 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming.continuous

import java.util.UUID

import org.apache.spark.{SparkContext, SparkEnv}
import org.apache.spark.{SparkContext, SparkEnv, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, SparkListenerTaskStart}
import org.apache.spark.sql._
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, WriteToDataSourceV2Exec}
Expand Down Expand Up @@ -221,20 +221,15 @@ class ContinuousSuite extends ContinuousSuiteBase {
testStream(df, useV2Sink = true)(
StartStream(Trigger.Continuous(100)),
Execute(waitForRateSourceTriggers(_, 2)),
Execute { query =>
Execute { _ =>
// Wait until a task is started, then kill its first attempt.
eventually(timeout(streamingTimeout)) {
assert(taskId != -1)
}
spark.sparkContext.killTaskAttempt(taskId)
eventually(timeout(streamingTimeout)) {
assert(query.exception.isDefined)
}
assert(
query.exception.get.getCause != null &&
query.exception.get.getCause.getCause != null &&
query.exception.get.getCause.getCause.getCause
.isInstanceOf[ContinuousTaskRetryException])
},
ExpectFailure[SparkException] { e =>
e.getCause != null && e.getCause.getCause.isInstanceOf[ContinuousTaskRetryException]
})
} finally {
spark.sparkContext.removeSparkListener(listener)
Expand Down

0 comments on commit b5d621b

Please sign in to comment.