diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala index b8d1f85692b..50096793651 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala @@ -28,6 +28,7 @@ import org.apache.flink.table.gateway.service.session.{Session => FSession} import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion} import org.apache.kyuubi.KyuubiSQLException +import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY import org.apache.kyuubi.engine.flink.FlinkEngineUtils import org.apache.kyuubi.engine.flink.udf.KDFRegistry @@ -64,6 +65,24 @@ class FlinkSessionImpl( override def open(): Unit = { val executor = fSession.createExecutor(Configuration.fromMap(fSession.getSessionConfig)) + sessionManager.getConf.get(ENGINE_INITIALIZE_SQL).foreach { sql => + try { + executor.executeStatement(OperationHandle.create, sql) + } catch { + case NonFatal(e) => + throw KyuubiSQLException(s"execute ${ENGINE_INITIALIZE_SQL.key} $sql ", e) + } + } + + sessionManager.getConf.get(ENGINE_SESSION_INITIALIZE_SQL).foreach { sql => + try { + executor.executeStatement(OperationHandle.create, sql) + } catch { + case NonFatal(e) => + throw KyuubiSQLException(s"execute ${ENGINE_SESSION_INITIALIZE_SQL.key} $sql ", e) + } + } + val (useCatalogAndDatabaseConf, otherConf) = normalizedConf.partition { case (k, _) => Array(USE_CATALOG, USE_DATABASE).contains(k) } diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala index 92c1bcd83fc..ccaefb496b0 100644 --- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala @@ -45,7 +45,7 @@ trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with WithFlinkTestResources private var zkServer: EmbeddedZookeeper = _ - protected val conf: KyuubiConf = FlinkSQLEngine.kyuubiConf + protected val conf: KyuubiConf = new KyuubiConf(false) protected def engineRefId: String @@ -60,7 +60,6 @@ trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with WithFlinkTestResources } } withKyuubiConf.foreach { case (k, v) => - System.setProperty(k, v) conf.set(k, v) } diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala new file mode 100644 index 00000000000..c5d23bd7902 --- /dev/null +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.flink.operation + +import java.util.UUID + +import org.apache.kyuubi.config.KyuubiConf._ +import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY +import org.apache.kyuubi.engine.ShareLevel +import org.apache.kyuubi.engine.flink.{WithDiscoveryFlinkSQLEngine, WithFlinkSQLEngineLocal} +import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, HA_NAMESPACE} +import org.apache.kyuubi.operation.{HiveJDBCTestHelper, NoneMode} + +class FlinkEngineInitializeSuite extends HiveJDBCTestHelper + with WithDiscoveryFlinkSQLEngine with WithFlinkSQLEngineLocal { + + protected def jdbcUrl: String = getFlinkEngineServiceUrl + + protected val ENGINE_INITIALIZE_SQL_VALUE: String = + s""" + create catalog cat_a with ('type'='generic_in_memory'); + create table blackhole(i int) with ('connector'='blackhole') + """ + protected val ENGINE_SESSION_INITIALIZE_SQL_VALUE: String = + s""" + create table datagen(i int) with ( + 'connector'='datagen', + 'fields.i.kind'='sequence', + 'fields.i.start'='1', + 'fields.i.end'='10') + """ + + override def withKyuubiConf: Map[String, String] = { + Map( + "flink.execution.target" -> "remote", + "flink.high-availability.cluster-id" -> "flink-mini-cluster", + "flink.app.name" -> "kyuubi_connection_flink_kandy", + HA_NAMESPACE.key -> namespace, + HA_ENGINE_REF_ID.key -> engineRefId, + ENGINE_TYPE.key -> "FLINK_SQL", + ENGINE_SHARE_LEVEL.key -> shareLevel, + OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name, + ENGINE_INITIALIZE_SQL.key -> ENGINE_INITIALIZE_SQL_VALUE, + ENGINE_SESSION_INITIALIZE_SQL.key -> ENGINE_SESSION_INITIALIZE_SQL_VALUE, + KYUUBI_SESSION_USER_KEY -> "kandy") + } + + override protected def engineRefId: String = UUID.randomUUID().toString + + def namespace: String = "/kyuubi/flink-local-engine-test" + + def shareLevel: String = ShareLevel.USER.toString + + def engineType: String = "flink" + + test("execute statement - kyuubi engine initialize") { + withJdbcStatement() { statement => + var resultSet = statement.executeQuery("show catalogs") + val expectedCatalogs = Set("default_catalog", "cat_a") + while (resultSet.next()) { + assert(expectedCatalogs.contains(resultSet.getString(1))) + } + assert(!resultSet.next()) + + resultSet = statement.executeQuery("show databases") + assert(resultSet.next()) + assert(resultSet.getString(1) === "default_database") + assert(!resultSet.next()) + + val expectedTables = Set("blackhole", "datagen") + resultSet = statement.executeQuery("show tables") + while (resultSet.next()) { + assert(expectedTables.contains(resultSet.getString(1))) + } + assert(!resultSet.next()) + + var dropResult = statement.executeQuery("drop catalog cat_a") + assert(dropResult.next()) + assert(dropResult.getString(1) === "OK") + + dropResult = statement.executeQuery("drop table blackhole") + assert(dropResult.next()) + assert(dropResult.getString(1) === "OK") + + dropResult = statement.executeQuery("drop table datagen") + assert(dropResult.next()) + assert(dropResult.getString(1) === "OK") + } + } +}