Skip to content

Commit

Permalink
[KYUUBI apache#5331] Spark engine should throw an exception when it f…
Browse files Browse the repository at this point in the history
…ails to start

### _Why are the changes needed?_

Close apache#5331

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_

No.

Closes apache#5332 from ASiegeLion/master.

Closes apache#5331

21342f5 [sychen] wrap InterruptedException
1f2542c [sychen] fix UT
e433b54 [liupeiyue] [KYUUBI apache#5331]Spark Engine should throw an exception to let K8s know when the engine fails to start

Lead-authored-by: liupeiyue <[email protected]>
Co-authored-by: sychen <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
  • Loading branch information
2 people authored and pan3793 committed Nov 13, 2023
1 parent c2e90a9 commit 7c5f583
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,8 @@ object SparkSQLEngine extends Logging {
// blocking main thread
countDownLatch.await()
} catch {
case e: KyuubiException => currentEngine match {
case e: KyuubiException =>
currentEngine match {
case Some(engine) =>
engine.stop()
val event = EngineEvent(engine)
Expand All @@ -370,16 +371,21 @@ object SparkSQLEngine extends Logging {
error(event, e)
case _ => error("Current SparkSQLEngine is not created.")
}
throw e

}
} catch {
case i: InterruptedException if !sparkSessionCreated.get =>
error(
val msg =
s"The Engine main thread was interrupted, possibly due to `createSpark` timeout." +
s" The `${ENGINE_INIT_TIMEOUT.key}` is ($initTimeout ms) " +
s" and submitted at $submitTime.",
i)
case t: Throwable => error(s"Failed to instantiate SparkSession: ${t.getMessage}", t)
s" and submitted at $submitTime."
error(msg, i)
throw new InterruptedException(msg)
case e: KyuubiException => throw e
case t: Throwable =>
error(s"Failed to instantiate SparkSession: ${t.getMessage}", t)
throw t
} finally {
if (spark != null) {
spark.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@ class SparkEngineSuites extends KyuubiFunSuite {
SparkSQLEngine.currentEngine = None
val logAppender = new LogAppender("test createSpark timeout")
withLogAppender(logAppender) {
SparkSQLEngine.main(Array.empty)
try {
SparkSQLEngine.main(Array.empty)
} catch {
case e: Exception => error("", e)
}
}
assert(SparkSQLEngine.currentEngine.isEmpty)
val errorMsg = s"The Engine main thread was interrupted, possibly due to `createSpark`" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
}
val elapsedTime = System.currentTimeMillis() - startTime
assert(elapsedTime < 60 * 1000)
assert(exception.getMessage contains "The engine application has been terminated.")
assert(exception.getMessage contains "Could not open client transport with JDBC Uri")
}
}
}

0 comments on commit 7c5f583

Please sign in to comment.