Skip to content

Commit

Permalink
in memory session
Browse files Browse the repository at this point in the history
  • Loading branch information
turboFei committed Dec 1, 2024
1 parent 510a30b commit 95a8de6
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 23 deletions.
1 change: 0 additions & 1 deletion docs/configuration/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.batch.application.starvation.timeout | PT3M | Threshold above which to warn batch application may be starved. | duration | 1.7.0 |
| kyuubi.batch.conf.ignore.list || A comma-separated list of ignored keys for batch conf. If the batch conf contains any of them, the key and the corresponding value will be removed silently during batch job submission. Note that this rule is for server-side protection defined via administrators to prevent some essential configs from tampering. You can also pre-define some config for batch job submission with the prefix: kyuubi.batchConf.[batchType]. For example, you can pre-define `spark.master` for the Spark batch job with key `kyuubi.batchConf.spark.spark.master`. | set | 1.6.0 |
| kyuubi.batch.extra.resource.file.max.size | 0 | The maximum size in bytes of each uploaded extra resource file when creating batch. 0 or negative value means no limit. | long | 1.10.0 |
| kyuubi.batch.pending.check.window | PT24H | The time window to check the batch pending max elapse time from metadata store. | duration | 1.10.1 |
| kyuubi.batch.resource.file.max.size | 0 | The maximum size in bytes of the uploaded resource file when creating batch. 0 or negative value means no limit. | long | 1.10.0 |
| kyuubi.batch.search.window | <undefined> | The time window to search the batch from metadata store. | duration | 1.10.1 |
| kyuubi.batch.session.idle.timeout | PT6H | Batch session idle timeout, it will be closed when it's not accessed for this duration | duration | 1.6.2 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1930,14 +1930,6 @@ object KyuubiConf {
.stringConf
.createWithDefault("1")

val BATCH_PENDING_CHECK_WINDOW: ConfigEntry[Long] =
buildConf("kyuubi.batch.pending.check.window")
.doc("The time window to check the batch pending max elapse time from metadata store.")
.version("1.10.1")
.timeConf
.checkValue(_ > 0, "must be positive number")
.createWithDefault(Duration.ofDays(1).toMillis)

val BATCH_SEARCH_WINDOW: OptionalConfigEntry[Long] =
buildConf("kyuubi.batch.search.window")
.doc("The time window to search the batch from metadata store.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,14 @@ class BatchJobSubmission(
Utils.deleteDirectoryRecursively(session.resourceUploadFolderPath.toFile)
}
}

def getPendingElapsedTime: Long = {
if (state == OperationState.PENDING) {
System.currentTimeMillis() - createTime
} else {
0L
}
}
}

object BatchJobSubmission {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,14 @@ import org.eclipse.jetty.servlet.{ErrorPageErrorHandler, FilterHolder}
import org.apache.kyuubi.{KyuubiException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.metrics.MetricsConstants
import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_BATCH_PENDING_MAX_ELAPSE
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.server.api.v1.ApiRootResource
import org.apache.kyuubi.server.http.authentication.{AuthenticationFilter, KyuubiHttpAuthenticationFactory}
import org.apache.kyuubi.server.metadata.api.MetadataFilter
import org.apache.kyuubi.server.ui.{JettyServer, JettyUtils}
import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service, ServiceUtils}
import org.apache.kyuubi.service.authentication.{AuthTypes, AuthUtils}
import org.apache.kyuubi.session.{KyuubiSessionManager, SessionHandle}
import org.apache.kyuubi.session.{KyuubiBatchSession, KyuubiSessionManager, SessionHandle}
import org.apache.kyuubi.util.{JavaUtils, ThreadUtils}
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay

Expand Down Expand Up @@ -207,12 +205,11 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
}

private def getBatchPendingMaxElapse(): Long = {
val filter = MetadataFilter(
state = OperationState.PENDING.toString,
kyuubiInstance = connectionUrl,
createTime = System.currentTimeMillis() - conf.get(BATCH_PENDING_CHECK_WINDOW))
sessionManager.getBatchesFromMetadataStore(filter, 0, 1, desc = false, orderByKeyId = false)
.headOption.map { batch => System.currentTimeMillis() - batch.getCreateTime }.getOrElse(0L)
val batchPendingElapseTimes = sessionManager.allSessions().map {
case session: KyuubiBatchSession => session.batchJobSubmissionOp.getPendingElapsedTime
case _ => 0L
}
if (batchPendingElapseTimes.isEmpty) 0L else batchPendingElapseTimes.max
}

def waitForServerStarted(): Unit = {
Expand All @@ -234,10 +231,7 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
startBatchChecker()
recoverBatchSessions()
MetricsSystem.tracing { ms =>
ms.registerGauge(
MetricsConstants.OPERATION_BATCH_PENDING_MAX_ELAPSE,
getBatchPendingMaxElapse,
0)
ms.registerGauge(OPERATION_BATCH_PENDING_MAX_ELAPSE, getBatchPendingMaxElapse, 0)
}
} catch {
case e: Exception => throw new KyuubiException(s"Cannot start $getName", e)
Expand Down

0 comments on commit 95a8de6

Please sign in to comment.