diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 50006b95ea1..d96f536a8c7 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1876,6 +1876,15 @@ object KyuubiConf { .checkValue(_ >= 1000, "must >= 1s if set") .createOptional + val OPERATION_QUERY_TIMEOUT_MONITOR_ENABLED: ConfigEntry[Boolean] = + buildConf("kyuubi.operation.query.timeout.monitor.enabled") + .doc("Whether to monitor timeout query timeout check on server side.") + .version("1.8.0") + .serverOnly + .internal + .booleanConf + .createWithDefault(true) + val OPERATION_RESULT_MAX_ROWS: ConfigEntry[Int] = buildConf("kyuubi.operation.result.max.rows") .doc("Max rows of Spark query results. Rows exceeding the limit would be ignored. " + diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala index 4767cbf121b..86bd3f8c84c 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala @@ -25,6 +25,7 @@ import org.apache.hive.service.rpc.thrift.TOperationState._ import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf.OPERATION_QUERY_TIMEOUT_MONITOR_ENABLED import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem} import org.apache.kyuubi.operation.FetchOrientation.FETCH_NEXT import org.apache.kyuubi.operation.log.OperationLog @@ -58,6 +59,10 @@ class ExecuteStatement( OperationLog.removeCurrentOperationLog() } + private val isTimeoutMonitorEnabled: Boolean = confOverlay.getOrElse[String]( + OPERATION_QUERY_TIMEOUT_MONITOR_ENABLED.key, + OPERATION_QUERY_TIMEOUT_MONITOR_ENABLED.defaultValStr).toBoolean + private def executeStatement(): Unit = { try { // We need to avoid executing query in sync mode, because there is no heartbeat mechanism @@ -84,7 +89,7 @@ class ExecuteStatement( var lastStateUpdateTime: Long = 0L val stateUpdateInterval = session.sessionManager.getConf.get(KyuubiConf.OPERATION_STATUS_UPDATE_INTERVAL) - while (!isComplete) { + while (!isComplete && !isTerminalState(state)) { fetchQueryLog() verifyTStatus(statusResp.getStatus) if (statusResp.getProgressUpdateResponse != null) { @@ -143,6 +148,9 @@ class ExecuteStatement( // see if anymore log could be fetched fetchQueryLog() } catch onError() + finally { + shutdownTimeoutMonitor() + } private def fetchQueryLog(): Unit = { getOperationLog.foreach { logger => @@ -157,6 +165,9 @@ class ExecuteStatement( } override protected def runInternal(): Unit = { + if (isTimeoutMonitorEnabled) { + addTimeoutMonitor(queryTimeout) + } executeStatement() val sessionManager = session.sessionManager val asyncOperation: Runnable = () => waitStatementComplete()