Skip to content

Commit

Permalink
[KYUUBI #5507][FLINK] Support Initialize SQL in Flink Engine
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_
We shoud support initialize SQL to init session context the  in Flink SQL  (e.g. setting up catalogs).

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_
No.

Closes #5518 from hadoopkandy/KYUUBI-5507.

Closes #5507

b1720ec [Cheng Pan] Update externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala
0782738 [Paul Lin] [KYUUBI-5507] Improve codestyle
13035f3 [Paul Lin] Update externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
2ce8031 [Paul Lin] [KYUUBI-5507] Improve tests
a29ac38 [Paul Lin] [KYUUBI-5507] Run engine initial SQL at Engine start
b864af5 [wangkang] Merge branch 'apache:master' into KYUUBI-5507
b37d62d [kandy01.wang] [KYUUBI #5507] [FLINK] Support Initialize SQL in Flink Engine

Lead-authored-by: Kang Wang <[email protected]>
Co-authored-by: Paul Lin <[email protected]>
Co-authored-by: kandy01.wang <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Co-authored-by: wangkang <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
  • Loading branch information
4 people committed Nov 24, 2023
1 parent 10b13f5 commit 902815d
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.flink.table.gateway.service.context.DefaultContext
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.Utils.{addShutdownHook, currentUser, FLINK_ENGINE_SHUTDOWN_PRIORITY}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_INITIALIZE_SQL
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_NAME, KYUUBI_SESSION_USER_KEY}
import org.apache.kyuubi.engine.flink.FlinkSQLEngine.{countDownLatch, currentEngine}
import org.apache.kyuubi.service.Serverable
Expand Down Expand Up @@ -100,9 +101,7 @@ object FlinkSQLEngine extends Logging {
startEngine(engineContext)
info("Flink engine started")

if ("yarn-application".equalsIgnoreCase(executionTarget)) {
bootstrapFlinkApplicationExecutor()
}
bootstrap(executionTarget)

// blocking main thread
countDownLatch.await()
Expand All @@ -127,15 +126,22 @@ object FlinkSQLEngine extends Logging {
}
}

private def bootstrapFlinkApplicationExecutor() = {
// trigger an execution to initiate EmbeddedExecutor with the default flink conf
private def bootstrap(executionTarget: String) = {
val flinkConf = new Configuration()
flinkConf.set(PipelineOptions.NAME, "kyuubi-bootstrap-sql")
debug(s"Running bootstrap Flink SQL in application mode with flink conf: $flinkConf.")
val tableEnv = TableEnvironment.create(flinkConf)
val res = tableEnv.executeSql("select 'kyuubi'")
res.await()
info("Bootstrap Flink SQL finished.")

if ("yarn-application".equalsIgnoreCase(executionTarget)) {
// trigger an execution to initiate EmbeddedExecutor with the default flink conf
flinkConf.set(PipelineOptions.NAME, "kyuubi-bootstrap-sql")
debug(s"Running bootstrap Flink SQL in application mode with flink conf: $flinkConf.")
tableEnv.executeSql("select 'kyuubi'").await()
}

kyuubiConf.get(ENGINE_INITIALIZE_SQL).foreach { stmt =>
tableEnv.executeSql(stmt).await()
}

info("Bootstrap SQL finished.")
}

private def setDeploymentConf(executionTarget: String, flinkConf: Configuration): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.flink.table.gateway.service.session.{Session => FSession}
import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion}

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.flink.FlinkEngineUtils
import org.apache.kyuubi.engine.flink.udf.KDFRegistry
Expand Down Expand Up @@ -64,6 +65,15 @@ class FlinkSessionImpl(
override def open(): Unit = {
val executor = fSession.createExecutor(Configuration.fromMap(fSession.getSessionConfig))

sessionManager.getConf.get(ENGINE_SESSION_INITIALIZE_SQL).foreach { sql =>
try {
executor.executeStatement(OperationHandle.create, sql)
} catch {
case NonFatal(e) =>
throw KyuubiSQLException(s"execute ${ENGINE_SESSION_INITIALIZE_SQL.key} $sql ", e)
}
}

val (useCatalogAndDatabaseConf, otherConf) = normalizedConf.partition { case (k, _) =>
Array(USE_CATALOG, USE_DATABASE).contains(k)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with WithFlinkTestResources

private var zkServer: EmbeddedZookeeper = _

protected val conf: KyuubiConf = FlinkSQLEngine.kyuubiConf
protected val conf: KyuubiConf = new KyuubiConf(false)

protected def engineRefId: String

Expand All @@ -60,7 +60,6 @@ trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with WithFlinkTestResources
}
}
withKyuubiConf.foreach { case (k, v) =>
System.setProperty(k, v)
conf.set(k, v)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.flink.operation

import java.util.UUID

import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.flink.{WithDiscoveryFlinkSQLEngine, WithFlinkSQLEngineLocal}
import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, HA_NAMESPACE}
import org.apache.kyuubi.operation.{HiveJDBCTestHelper, NoneMode}

class FlinkEngineInitializeSuite extends HiveJDBCTestHelper
with WithDiscoveryFlinkSQLEngine with WithFlinkSQLEngineLocal {

protected def jdbcUrl: String = getFlinkEngineServiceUrl

protected val ENGINE_INITIALIZE_SQL_VALUE: String =
"show databases;"

protected val ENGINE_SESSION_INITIALIZE_SQL_VALUE: String =
"""create catalog cat_b with ('type'='generic_in_memory');
|create table blackhole(i int) with ('connector'='blackhole');
|create table datagen(i int) with (
|'connector'='datagen',
|'fields.i.kind'='sequence',
|'fields.i.start'='1',
|'fields.i.end'='10')""".stripMargin

override def withKyuubiConf: Map[String, String] = {
Map(
"flink.execution.target" -> "remote",
"flink.high-availability.cluster-id" -> "flink-mini-cluster",
"flink.app.name" -> "kyuubi_connection_flink_kandy",
HA_NAMESPACE.key -> namespace,
HA_ENGINE_REF_ID.key -> engineRefId,
ENGINE_TYPE.key -> "FLINK_SQL",
ENGINE_SHARE_LEVEL.key -> shareLevel,
OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name,
ENGINE_INITIALIZE_SQL.key -> ENGINE_INITIALIZE_SQL_VALUE,
ENGINE_SESSION_INITIALIZE_SQL.key -> ENGINE_SESSION_INITIALIZE_SQL_VALUE,
KYUUBI_SESSION_USER_KEY -> "kandy")
}

override protected def engineRefId: String = UUID.randomUUID().toString

def namespace: String = "/kyuubi/flink-local-engine-test"

def shareLevel: String = ShareLevel.USER.toString

def engineType: String = "flink"

test("execute statement - kyuubi engine initialize") {
withJdbcStatement() { statement =>
var resultSet = statement.executeQuery("show catalogs")
val expectedCatalogs = Set("default_catalog", "cat_b")
var actualCatalogs = Set[String]()
while (resultSet.next()) {
actualCatalogs += resultSet.getString(1)
}
assert(expectedCatalogs.subsetOf(actualCatalogs))

resultSet = statement.executeQuery("show databases")
assert(resultSet.next())
assert(resultSet.getString(1) === "default_database")
assert(!resultSet.next())

val expectedTables = Set("blackhole", "datagen")
resultSet = statement.executeQuery("show tables")
while (resultSet.next()) {
assert(expectedTables.contains(resultSet.getString(1)))
}
assert(!resultSet.next())

var dropResult = statement.executeQuery("drop catalog cat_b")
assert(dropResult.next())
assert(dropResult.getString(1) === "OK")

dropResult = statement.executeQuery("drop table blackhole")
assert(dropResult.next())
assert(dropResult.getString(1) === "OK")

dropResult = statement.executeQuery("drop table datagen")
assert(dropResult.next())
assert(dropResult.getString(1) === "OK")
}
}
}

0 comments on commit 902815d

Please sign in to comment.