Skip to content

Commit

Permalink
use addTimeoutMonitor for server-side query timeout checks
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenliang123 committed Oct 18, 2023
1 parent c5854f7 commit 72479a7
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1876,6 +1876,14 @@ object KyuubiConf {
.checkValue(_ >= 1000, "must >= 1s if set")
.createOptional

val OPERATION_QUERY_TIMEOUT_MONITOR_ENABLED: ConfigEntry[Boolean] =
buildConf("kyuubi.operation.query.timeout.monitor.enabled")
.doc("Whether to monitor timeout query timeout check on server side.")
.version("1.8.0")
.internal
.booleanConf
.createWithDefault(true)

val OPERATION_RESULT_MAX_ROWS: ConfigEntry[Int] =
buildConf("kyuubi.operation.result.max.rows")
.doc("Max rows of Spark query results. Rows exceeding the limit would be ignored. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.hive.service.rpc.thrift.TOperationState._

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.OPERATION_QUERY_TIMEOUT_MONITOR_ENABLED
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.operation.FetchOrientation.FETCH_NEXT
import org.apache.kyuubi.operation.log.OperationLog
Expand Down Expand Up @@ -58,6 +59,10 @@ class ExecuteStatement(
OperationLog.removeCurrentOperationLog()
}

private val isTimeoutMonitorEnabled: Boolean = confOverlay.getOrElse[String](
OPERATION_QUERY_TIMEOUT_MONITOR_ENABLED.key,
OPERATION_QUERY_TIMEOUT_MONITOR_ENABLED.defaultValStr).toBoolean

private def executeStatement(): Unit = {
try {
// We need to avoid executing query in sync mode, because there is no heartbeat mechanism
Expand All @@ -84,7 +89,7 @@ class ExecuteStatement(
var lastStateUpdateTime: Long = 0L
val stateUpdateInterval =
session.sessionManager.getConf.get(KyuubiConf.OPERATION_STATUS_UPDATE_INTERVAL)
while (!isComplete) {
while (!isComplete && !isTerminalState(state)) {
fetchQueryLog()
verifyTStatus(statusResp.getStatus)
if (statusResp.getProgressUpdateResponse != null) {
Expand Down Expand Up @@ -159,7 +164,15 @@ class ExecuteStatement(
override protected def runInternal(): Unit = {
executeStatement()
val sessionManager = session.sessionManager
val asyncOperation: Runnable = () => waitStatementComplete()
val asyncOperation: Runnable = () =>
try {
if (isTimeoutMonitorEnabled) {
addTimeoutMonitor(queryTimeout)
}
waitStatementComplete()
} finally {
shutdownTimeoutMonitor()
}
try {
val opHandle = sessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(opHandle)
Expand Down

0 comments on commit 72479a7

Please sign in to comment.