From 45136718122ac608008f2ce9b3c3670c054ae704 Mon Sep 17 00:00:00 2001 From: Bowen Liang Date: Wed, 18 Oct 2023 11:45:45 +0800 Subject: [PATCH] extract a scheduled executor service for timeout monitoring to prevent possible thread leaks --- .../apache/kyuubi/operation/AbstractOperation.scala | 10 ++++++---- .../apache/kyuubi/operation/ExecuteStatement.scala | 11 ++++++++++- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala index 0a185b94266..aeee3685819 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala @@ -23,7 +23,7 @@ import java.util.concurrent.locks.ReentrantLock import scala.collection.JavaConverters._ import org.apache.commons.lang3.StringUtils -import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp, TProgressUpdateResp, TProtocolVersion, TStatus, TStatusCode} +import org.apache.hive.service.rpc.thrift._ import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils} import org.apache.kyuubi.config.KyuubiConf.OPERATION_IDLE_TIMEOUT @@ -57,10 +57,12 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin } } - protected def addTimeoutMonitor(queryTimeout: Long): Unit = { + protected def addTimeoutMonitor( + queryTimeout: Long, + scheduler: Option[ScheduledExecutorService] = None): Unit = { if (queryTimeout > 0) { - val timeoutExecutor = - ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread", false) + val timeoutExecutor = scheduler.getOrElse( + ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread", false)) val action: Runnable = () => cleanup(OperationState.TIMEOUT) timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS) statementTimeoutCleaner = Some(timeoutExecutor) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala index 68270912776..04355a1dc5c 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala @@ -17,6 +17,8 @@ package org.apache.kyuubi.operation +import java.util.concurrent.ScheduledExecutorService + import scala.collection.JavaConverters._ import com.codahale.metrics.MetricRegistry @@ -26,9 +28,11 @@ import org.apache.hive.service.rpc.thrift.TOperationState._ import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem} +import org.apache.kyuubi.operation.ExecuteStatement.queryTimeoutMonitorPool import org.apache.kyuubi.operation.FetchOrientation.FETCH_NEXT import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.session.Session +import org.apache.kyuubi.util.ThreadUtils class ExecuteStatement( session: Session, @@ -84,7 +88,7 @@ class ExecuteStatement( var lastStateUpdateTime: Long = 0L val stateUpdateInterval = session.sessionManager.getConf.get(KyuubiConf.OPERATION_STATUS_UPDATE_INTERVAL) - addTimeoutMonitor(queryTimeout) + addTimeoutMonitor(queryTimeout, scheduler = Some(queryTimeoutMonitorPool)) while (!isComplete && !isTerminalState(state)) { fetchQueryLog() verifyTStatus(statusResp.getStatus) @@ -171,3 +175,8 @@ class ExecuteStatement( override protected def eventEnabled: Boolean = true } + +object ExecuteStatement { + private lazy val queryTimeoutMonitorPool: ScheduledExecutorService = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("execute-statement-timeout-thread", false) +}