diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala index af88d9591b56a..4b4ed82dc6520 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming.continuous import java.util.UUID -import org.apache.spark.{SparkContext, SparkEnv} +import org.apache.spark.{SparkContext, SparkEnv, SparkException} import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, SparkListenerTaskStart} import org.apache.spark.sql._ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, WriteToDataSourceV2Exec} @@ -221,20 +221,15 @@ class ContinuousSuite extends ContinuousSuiteBase { testStream(df, useV2Sink = true)( StartStream(Trigger.Continuous(100)), Execute(waitForRateSourceTriggers(_, 2)), - Execute { query => + Execute { _ => // Wait until a task is started, then kill its first attempt. eventually(timeout(streamingTimeout)) { assert(taskId != -1) } spark.sparkContext.killTaskAttempt(taskId) - eventually(timeout(streamingTimeout)) { - assert(query.exception.isDefined) - } - assert( - query.exception.get.getCause != null && - query.exception.get.getCause.getCause != null && - query.exception.get.getCause.getCause.getCause - .isInstanceOf[ContinuousTaskRetryException]) + }, + ExpectFailure[SparkException] { e => + e.getCause != null && e.getCause.getCause.isInstanceOf[ContinuousTaskRetryException] }) } finally { spark.sparkContext.removeSparkListener(listener)