Skip to content

Commit

Permalink
[KYUUBI #5507] [FLINK] Support Initialize SQL in Flink Engine
Browse files Browse the repository at this point in the history
  • Loading branch information
hadoopkandy committed Nov 1, 2023
1 parent 591250c commit b37d62d
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.flink.table.gateway.service.session.{Session => FSession}
import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion}

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.flink.FlinkEngineUtils
import org.apache.kyuubi.engine.flink.udf.KDFRegistry
Expand Down Expand Up @@ -64,6 +65,24 @@ class FlinkSessionImpl(
override def open(): Unit = {
val executor = fSession.createExecutor(Configuration.fromMap(fSession.getSessionConfig))

sessionManager.getConf.get(ENGINE_INITIALIZE_SQL).foreach { sql =>
try {
executor.executeStatement(OperationHandle.create, sql)
} catch {
case NonFatal(e) =>
throw KyuubiSQLException(s"execute ${ENGINE_INITIALIZE_SQL.key} $sql ", e)
}
}

sessionManager.getConf.get(ENGINE_SESSION_INITIALIZE_SQL).foreach { sql =>
try {
executor.executeStatement(OperationHandle.create, sql)
} catch {
case NonFatal(e) =>
throw KyuubiSQLException(s"execute ${ENGINE_SESSION_INITIALIZE_SQL.key} $sql ", e)
}
}

val (useCatalogAndDatabaseConf, otherConf) = normalizedConf.partition { case (k, _) =>
Array(USE_CATALOG, USE_DATABASE).contains(k)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with WithFlinkTestResources

private var zkServer: EmbeddedZookeeper = _

protected val conf: KyuubiConf = FlinkSQLEngine.kyuubiConf
protected val conf: KyuubiConf = new KyuubiConf(false)

protected def engineRefId: String

Expand All @@ -60,7 +60,6 @@ trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with WithFlinkTestResources
}
}
withKyuubiConf.foreach { case (k, v) =>
System.setProperty(k, v)
conf.set(k, v)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.flink.operation

import java.util.UUID

import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.flink.{WithDiscoveryFlinkSQLEngine, WithFlinkSQLEngineLocal}
import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, HA_NAMESPACE}
import org.apache.kyuubi.operation.{HiveJDBCTestHelper, NoneMode}

class FlinkEngineInitializeSuite extends HiveJDBCTestHelper
with WithDiscoveryFlinkSQLEngine with WithFlinkSQLEngineLocal {

protected def jdbcUrl: String = getFlinkEngineServiceUrl

protected val ENGINE_INITIALIZE_SQL_VALUE: String =
s"""
create catalog cat_a with ('type'='generic_in_memory');
create table blackhole(i int) with ('connector'='blackhole')
"""
protected val ENGINE_SESSION_INITIALIZE_SQL_VALUE: String =
s"""
create table datagen(i int) with (
'connector'='datagen',
'fields.i.kind'='sequence',
'fields.i.start'='1',
'fields.i.end'='10')
"""

override def withKyuubiConf: Map[String, String] = {
Map(
"flink.execution.target" -> "remote",
"flink.high-availability.cluster-id" -> "flink-mini-cluster",
"flink.app.name" -> "kyuubi_connection_flink_kandy",
HA_NAMESPACE.key -> namespace,
HA_ENGINE_REF_ID.key -> engineRefId,
ENGINE_TYPE.key -> "FLINK_SQL",
ENGINE_SHARE_LEVEL.key -> shareLevel,
OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name,
ENGINE_INITIALIZE_SQL.key -> ENGINE_INITIALIZE_SQL_VALUE,
ENGINE_SESSION_INITIALIZE_SQL.key -> ENGINE_SESSION_INITIALIZE_SQL_VALUE,
KYUUBI_SESSION_USER_KEY -> "kandy")
}

override protected def engineRefId: String = UUID.randomUUID().toString

def namespace: String = "/kyuubi/flink-local-engine-test"

def shareLevel: String = ShareLevel.USER.toString

def engineType: String = "flink"

test("execute statement - kyuubi engine initialize") {
withJdbcStatement() { statement =>
var resultSet = statement.executeQuery("show catalogs")
val expectedCatalogs = Set("default_catalog", "cat_a")
while (resultSet.next()) {
assert(expectedCatalogs.contains(resultSet.getString(1)))
}
assert(!resultSet.next())

resultSet = statement.executeQuery("show databases")
assert(resultSet.next())
assert(resultSet.getString(1) === "default_database")
assert(!resultSet.next())

val expectedTables = Set("blackhole", "datagen")
resultSet = statement.executeQuery("show tables")
while (resultSet.next()) {
assert(expectedTables.contains(resultSet.getString(1)))
}
assert(!resultSet.next())

var dropResult = statement.executeQuery("drop catalog cat_a")
assert(dropResult.next())
assert(dropResult.getString(1) === "OK")

dropResult = statement.executeQuery("drop table blackhole")
assert(dropResult.next())
assert(dropResult.getString(1) === "OK")

dropResult = statement.executeQuery("drop table datagen")
assert(dropResult.next())
assert(dropResult.getString(1) === "OK")
}
}
}

0 comments on commit b37d62d

Please sign in to comment.