From bdc28acf411e2cb48c1e5e7699f4aa2db286045a Mon Sep 17 00:00:00 2001 From: Bowen Liang Date: Wed, 18 Oct 2023 21:46:06 +0800 Subject: [PATCH] [KYUUBI #5392] Add query timeout monitor on server-side in ExecuteStatement ### _Why are the changes needed?_ As reported in #5392, currently the server is unable to guarantee that the statement timed-out when the engine may have no proper response for the server's request therefore the query timeout does not work. Introduce a server-side statement query timeout monitor, to ensure the time-out query statements are set to TIMEOUT state and help the JDBC client get out of the blocked status. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request ### _Was this patch authored or co-authored using generative AI tooling?_ No. Closes #5398 from bowenliang123/stmt-timeout. Closes #5392 f5733b3f9 [Bowen Liang] use addTimeoutMonitor for server-side query timeout checks Authored-by: Bowen Liang Signed-off-by: liangbowen --- .../scala/org/apache/kyuubi/config/KyuubiConf.scala | 9 +++++++++ .../apache/kyuubi/operation/ExecuteStatement.scala | 13 ++++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 50006b95ea1..d96f536a8c7 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1876,6 +1876,15 @@ object KyuubiConf { .checkValue(_ >= 1000, "must >= 1s if set") .createOptional + val OPERATION_QUERY_TIMEOUT_MONITOR_ENABLED: ConfigEntry[Boolean] = + buildConf("kyuubi.operation.query.timeout.monitor.enabled") + .doc("Whether to monitor timeout query timeout check on server side.") + .version("1.8.0") + .serverOnly + .internal + .booleanConf + .createWithDefault(true) + val OPERATION_RESULT_MAX_ROWS: ConfigEntry[Int] = buildConf("kyuubi.operation.result.max.rows") .doc("Max rows of Spark query results. Rows exceeding the limit would be ignored. " + diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala index 4767cbf121b..86bd3f8c84c 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala @@ -25,6 +25,7 @@ import org.apache.hive.service.rpc.thrift.TOperationState._ import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf.OPERATION_QUERY_TIMEOUT_MONITOR_ENABLED import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem} import org.apache.kyuubi.operation.FetchOrientation.FETCH_NEXT import org.apache.kyuubi.operation.log.OperationLog @@ -58,6 +59,10 @@ class ExecuteStatement( OperationLog.removeCurrentOperationLog() } + private val isTimeoutMonitorEnabled: Boolean = confOverlay.getOrElse[String]( + OPERATION_QUERY_TIMEOUT_MONITOR_ENABLED.key, + OPERATION_QUERY_TIMEOUT_MONITOR_ENABLED.defaultValStr).toBoolean + private def executeStatement(): Unit = { try { // We need to avoid executing query in sync mode, because there is no heartbeat mechanism @@ -84,7 +89,7 @@ class ExecuteStatement( var lastStateUpdateTime: Long = 0L val stateUpdateInterval = session.sessionManager.getConf.get(KyuubiConf.OPERATION_STATUS_UPDATE_INTERVAL) - while (!isComplete) { + while (!isComplete && !isTerminalState(state)) { fetchQueryLog() verifyTStatus(statusResp.getStatus) if (statusResp.getProgressUpdateResponse != null) { @@ -143,6 +148,9 @@ class ExecuteStatement( // see if anymore log could be fetched fetchQueryLog() } catch onError() + finally { + shutdownTimeoutMonitor() + } private def fetchQueryLog(): Unit = { getOperationLog.foreach { logger => @@ -157,6 +165,9 @@ class ExecuteStatement( } override protected def runInternal(): Unit = { + if (isTimeoutMonitorEnabled) { + addTimeoutMonitor(queryTimeout) + } executeStatement() val sessionManager = session.sessionManager val asyncOperation: Runnable = () => waitStatementComplete()