Skip to content

Commit

Permalink
extract a scheduled executor service for timeout monitoring to preven…
Browse files Browse the repository at this point in the history
…t possible thread leaks
  • Loading branch information
bowenliang123 committed Oct 18, 2023
1 parent 5913698 commit 4513671
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.concurrent.locks.ReentrantLock
import scala.collection.JavaConverters._

import org.apache.commons.lang3.StringUtils
import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp, TProgressUpdateResp, TProtocolVersion, TStatus, TStatusCode}
import org.apache.hive.service.rpc.thrift._

import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf.OPERATION_IDLE_TIMEOUT
Expand Down Expand Up @@ -57,10 +57,12 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin
}
}

protected def addTimeoutMonitor(queryTimeout: Long): Unit = {
protected def addTimeoutMonitor(
queryTimeout: Long,
scheduler: Option[ScheduledExecutorService] = None): Unit = {
if (queryTimeout > 0) {
val timeoutExecutor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread", false)
val timeoutExecutor = scheduler.getOrElse(
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread", false))
val action: Runnable = () => cleanup(OperationState.TIMEOUT)
timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS)
statementTimeoutCleaner = Some(timeoutExecutor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.kyuubi.operation

import java.util.concurrent.ScheduledExecutorService

import scala.collection.JavaConverters._

import com.codahale.metrics.MetricRegistry
Expand All @@ -26,9 +28,11 @@ import org.apache.hive.service.rpc.thrift.TOperationState._
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.operation.ExecuteStatement.queryTimeoutMonitorPool
import org.apache.kyuubi.operation.FetchOrientation.FETCH_NEXT
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
import org.apache.kyuubi.util.ThreadUtils

class ExecuteStatement(
session: Session,
Expand Down Expand Up @@ -84,7 +88,7 @@ class ExecuteStatement(
var lastStateUpdateTime: Long = 0L
val stateUpdateInterval =
session.sessionManager.getConf.get(KyuubiConf.OPERATION_STATUS_UPDATE_INTERVAL)
addTimeoutMonitor(queryTimeout)
addTimeoutMonitor(queryTimeout, scheduler = Some(queryTimeoutMonitorPool))
while (!isComplete && !isTerminalState(state)) {
fetchQueryLog()
verifyTStatus(statusResp.getStatus)
Expand Down Expand Up @@ -171,3 +175,8 @@ class ExecuteStatement(

override protected def eventEnabled: Boolean = true
}

object ExecuteStatement {
private lazy val queryTimeoutMonitorPool: ScheduledExecutorService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("execute-statement-timeout-thread", false)
}

0 comments on commit 4513671

Please sign in to comment.