Skip to content

Commit

Permalink
[KYUUBI #5392] Add query timeout monitor on server-side in ExecuteSta…
Browse files Browse the repository at this point in the history
…tement

### _Why are the changes needed?_

As reported in #5392, currently the server is unable to guarantee that the statement timed-out when the engine may have no proper response for the server's request therefore the query timeout does not work.

Introduce a server-side statement query timeout monitor, to ensure the time-out query statements are set to TIMEOUT state and help the JDBC client get out of the blocked status.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_

No.

Closes #5398 from bowenliang123/stmt-timeout.

Closes #5392

f5733b3 [Bowen Liang] use addTimeoutMonitor for server-side query timeout checks

Authored-by: Bowen Liang <[email protected]>
Signed-off-by: liangbowen <[email protected]>
  • Loading branch information
bowenliang123 committed Oct 18, 2023
1 parent 6126379 commit bdc28ac
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1876,6 +1876,15 @@ object KyuubiConf {
.checkValue(_ >= 1000, "must >= 1s if set")
.createOptional

val OPERATION_QUERY_TIMEOUT_MONITOR_ENABLED: ConfigEntry[Boolean] =
buildConf("kyuubi.operation.query.timeout.monitor.enabled")
.doc("Whether to monitor timeout query timeout check on server side.")
.version("1.8.0")
.serverOnly
.internal
.booleanConf
.createWithDefault(true)

val OPERATION_RESULT_MAX_ROWS: ConfigEntry[Int] =
buildConf("kyuubi.operation.result.max.rows")
.doc("Max rows of Spark query results. Rows exceeding the limit would be ignored. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.hive.service.rpc.thrift.TOperationState._

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.OPERATION_QUERY_TIMEOUT_MONITOR_ENABLED
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.operation.FetchOrientation.FETCH_NEXT
import org.apache.kyuubi.operation.log.OperationLog
Expand Down Expand Up @@ -58,6 +59,10 @@ class ExecuteStatement(
OperationLog.removeCurrentOperationLog()
}

private val isTimeoutMonitorEnabled: Boolean = confOverlay.getOrElse[String](
OPERATION_QUERY_TIMEOUT_MONITOR_ENABLED.key,
OPERATION_QUERY_TIMEOUT_MONITOR_ENABLED.defaultValStr).toBoolean

private def executeStatement(): Unit = {
try {
// We need to avoid executing query in sync mode, because there is no heartbeat mechanism
Expand All @@ -84,7 +89,7 @@ class ExecuteStatement(
var lastStateUpdateTime: Long = 0L
val stateUpdateInterval =
session.sessionManager.getConf.get(KyuubiConf.OPERATION_STATUS_UPDATE_INTERVAL)
while (!isComplete) {
while (!isComplete && !isTerminalState(state)) {
fetchQueryLog()
verifyTStatus(statusResp.getStatus)
if (statusResp.getProgressUpdateResponse != null) {
Expand Down Expand Up @@ -143,6 +148,9 @@ class ExecuteStatement(
// see if anymore log could be fetched
fetchQueryLog()
} catch onError()
finally {
shutdownTimeoutMonitor()
}

private def fetchQueryLog(): Unit = {
getOperationLog.foreach { logger =>
Expand All @@ -157,6 +165,9 @@ class ExecuteStatement(
}

override protected def runInternal(): Unit = {
if (isTimeoutMonitorEnabled) {
addTimeoutMonitor(queryTimeout)
}
executeStatement()
val sessionManager = session.sessionManager
val asyncOperation: Runnable = () => waitStatementComplete()
Expand Down

0 comments on commit bdc28ac

Please sign in to comment.