Skip to content

Commit

Permalink
[KYUUBI #5464] JDBC Engine supports MySQL
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_

To close #5464.
To support JDBC engine use MySQL Dialect (kyuubi.engine.jdbc.type=mysql).

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_

No.

Closes #5588 from Kwafoor/kyuubi_5464.

Closes #5464

1019a61 [wangjunbo] [KYUUBI #5464]rename function name `getProviderClass` to `getDriverClass`
9901bba [wangjunbo] [KYUUBI #5464]handle properly to keep compatiblity
b33d79e [wangjunbo] [KYUUBI #5464]handle properly to keep compatiblity
86e6ee2 [wangjunbo] [KYUUBI #5464]handle properly to keep compatiblity
d76cb32 [wangjunbo] [KYUUBI #5464]update the docs
4a1acff [wangjunbo] [KYUUBI #5464]update the docs
1aff55e [wangjunbo] [KYUUBI #5464]update the docs of kyuubi.engine.type
84202ea [wangjunbo] [KYUUBI #5464] update the docs of kyuubi.engine.type
e3c1e94 [wangjunbo] [KYUUBI #5464] fix check
cdf820d [wangjunbo] [KYUUBI #5464] fix check
ff0f30a [wangjunbo] [KYUUBI #5464] fix check
01321dc [wangjunbo] [KYUUBI #5464] JDBC Engine supports MySQL
756f530 [wangjunbo] [KYUUBI #5464] JDBC Engine supports MySQL

Authored-by: wangjunbo <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
  • Loading branch information
wangjunbo authored and pan3793 committed Nov 24, 2023
1 parent e22a7f7 commit 5481bf5
Show file tree
Hide file tree
Showing 20 changed files with 905 additions and 14 deletions.
2 changes: 1 addition & 1 deletion docs/configuration/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.engine.jdbc.connection.password | &lt;undefined&gt; | The password is used for connecting to server | string | 1.6.0 |
| kyuubi.engine.jdbc.connection.propagateCredential | false | Whether to use the session's user and password to connect to database | boolean | 1.8.0 |
| kyuubi.engine.jdbc.connection.properties || The additional properties are used for connecting to server | seq | 1.6.0 |
| kyuubi.engine.jdbc.connection.provider | &lt;undefined&gt; | The connection provider is used for getting a connection from the server | string | 1.6.0 |
| kyuubi.engine.jdbc.connection.provider | &lt;undefined&gt; | A JDBC connection provider plugin for the Kyuubi Server to establish a connection to the JDBC URL. The configuration value should be a subclass of `org.apache.kyuubi.engine.jdbc.connection.JdbcConnectionProvider`. Kyuubi provides the following built-in implementations: <li>doris: For establishing Doris connections.</li> <li>mysql: For establishing MySQL connections.</li> <li>phoenix: For establishing Phoenix connections.</li> <li>postgresql: For establishing PostgreSQL connections.</li> | string | 1.6.0 |
| kyuubi.engine.jdbc.connection.url | &lt;undefined&gt; | The server url that engine will connect to | string | 1.6.0 |
| kyuubi.engine.jdbc.connection.user | &lt;undefined&gt; | The user is used for connecting to server | string | 1.6.0 |
| kyuubi.engine.jdbc.driver.class | &lt;undefined&gt; | The driver class for JDBC engine connection | string | 1.6.0 |
Expand Down
6 changes: 6 additions & 0 deletions externals/kyuubi-jdbc-engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.dimafeng</groupId>
<artifactId>testcontainers-scala-mysql_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>${hive.jdbc.artifact}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@
#

org.apache.kyuubi.engine.jdbc.doris.DorisConnectionProvider
org.apache.kyuubi.engine.jdbc.mysql.MySQLConnectionProvider
org.apache.kyuubi.engine.jdbc.phoenix.PhoenixConnectionProvider
org.apache.kyuubi.engine.jdbc.postgresql.PostgreSQLConnectionProvider
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@
#

org.apache.kyuubi.engine.jdbc.dialect.DorisDialect
org.apache.kyuubi.engine.jdbc.dialect.MySQLDialect
org.apache.kyuubi.engine.jdbc.dialect.PhoenixDialect
org.apache.kyuubi.engine.jdbc.dialect.PostgreSQLDialect
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.kyuubi.util.reflect.ReflectUtils._
abstract class AbstractConnectionProvider extends Logging {
protected val providers = loadProviders()

def getProviderClass(kyuubiConf: KyuubiConf): String = {
def getDriverClass(kyuubiConf: KyuubiConf): String = {
val driverClass: Class[_ <: Driver] = Option(
DynClasses.builder().impl(kyuubiConf.get(ENGINE_JDBC_DRIVER_CLASS).get)
.orNull().build[Driver]()).getOrElse {
Expand All @@ -38,7 +38,7 @@ abstract class AbstractConnectionProvider extends Logging {
}

def create(kyuubiConf: KyuubiConf): Connection = {
val filteredProviders = providers.filter(_.canHandle(getProviderClass(kyuubiConf)))
val filteredProviders = providers.filter(_.canHandle(getDriverClass(kyuubiConf)))
if (filteredProviders.isEmpty) {
throw new IllegalArgumentException(
"Empty list of JDBC connection providers for the specified driver and options")
Expand All @@ -57,10 +57,9 @@ abstract class AbstractConnectionProvider extends Logging {
case None =>
// TODO
if (filteredProviders.size != 1) {
throw new IllegalArgumentException(
"JDBC connection initiated but more than one connection provider was found. Use " +
s"${ENGINE_JDBC_CONNECTION_PROVIDER.key} option to select a specific provider. " +
s"Found active providers ${filteredProviders.mkString("[", ", ", "]")}")
warn("JDBC connection initiated but more than one connection provider was found. Use " +
s"${ENGINE_JDBC_CONNECTION_PROVIDER.key} option to select a specific provider. " +
s"Found active providers ${filteredProviders.mkString("[", ", ", "]")}")
}
filteredProviders.head
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.dialect
import java.sql.{Connection, ResultSet, Statement}
import java.util

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.commons.lang3.StringUtils

import org.apache.kyuubi.engine.jdbc.mysql.{MySQLRowSetHelper, MySQLSchemaHelper}
import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper}
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session

class MySQLDialect extends JdbcDialect {
override def createStatement(connection: Connection, fetchSize: Int): Statement = {
val statement =
connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
statement.setFetchSize(Integer.MIN_VALUE)
statement
}

override def getTablesQuery(
catalog: String,
schema: String,
tableName: String,
tableTypes: util.List[String]): String = {
val tTypes =
if (tableTypes == null || tableTypes.isEmpty) {
Set("BASE TABLE", "SYSTEM VIEW", "VIEW")
} else {
tableTypes.asScala.toSet
}
val query = new StringBuilder(
s"""
|SELECT TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, ENGINE,
|TABLE_ROWS, AVG_ROW_LENGTH, DATA_LENGTH,
|CREATE_TIME, UPDATE_TIME, TABLE_COLLATION, TABLE_COMMENT
|FROM INFORMATION_SCHEMA.TABLES
|""".stripMargin)

val filters = ArrayBuffer[String]()
if (StringUtils.isNotBlank(catalog)) {
filters += s"$TABLE_CATALOG = '$catalog'"
}

if (StringUtils.isNotBlank(schema)) {
filters += s"$TABLE_SCHEMA LIKE '$schema'"
}

if (StringUtils.isNotBlank(tableName)) {
filters += s"$TABLE_NAME LIKE '$tableName'"
}

if (tTypes.nonEmpty) {
filters += s"(${
tTypes.map { tableType => s"$TABLE_TYPE = '$tableType'" }
.mkString(" OR ")
})"
}

if (filters.nonEmpty) {
query.append(" WHERE ")
query.append(filters.mkString(" AND "))
}

query.toString()
}

override def getColumnsQuery(
session: Session,
catalogName: String,
schemaName: String,
tableName: String,
columnName: String): String = {
val query = new StringBuilder(
"""
|SELECT
|`TABLE_CATALOG`,`TABLE_SCHEMA`,`TABLE_NAME`, `COLUMN_NAME`,`ORDINAL_POSITION`,
|`COLUMN_DEFAULT`,`IS_NULLABLE`,`DATA_TYPE`,`CHARACTER_MAXIMUM_LENGTH`,
|`CHARACTER_OCTET_LENGTH`,`NUMERIC_PRECISION`,`NUMERIC_SCALE`,`DATETIME_PRECISION`,
|`CHARACTER_SET_NAME`,`COLLATION_NAME`,`COLUMN_TYPE`,`COLUMN_KEY`,`EXTRA`,`PRIVILEGES`,
|`COLUMN_COMMENT`,`GENERATION_EXPRESSION`
|FROM information_schema.columns
|""".stripMargin)

val filters = ArrayBuffer[String]()
if (StringUtils.isNotEmpty(catalogName)) {
filters += s"$TABLE_CATALOG = '$catalogName'"
}
if (StringUtils.isNotEmpty(schemaName)) {
filters += s"$TABLE_SCHEMA LIKE '$schemaName'"
}
if (StringUtils.isNotEmpty(tableName)) {
filters += s"$TABLE_NAME LIKE '$tableName'"
}
if (StringUtils.isNotEmpty(columnName)) {
filters += s"$COLUMN_NAME LIKE '$columnName'"
}

if (filters.nonEmpty) {
query.append(" WHERE ")
query.append(filters.mkString(" AND "))
}

query.toString()
}

override def getRowSetHelper(): RowSetHelper = {
new MySQLRowSetHelper
}

override def getSchemaHelper(): SchemaHelper = {
new MySQLSchemaHelper
}

override def name(): String = {
"mysql"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.mysql

class MySQLConnectionProvider extends Mysql8ConnectionProvider {

override val name: String = classOf[MySQLConnectionProvider].getSimpleName
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.mysql

import java.sql.Types

import org.apache.hive.service.rpc.thrift.{TColumn, TColumnValue}

import org.apache.kyuubi.engine.jdbc.schema.RowSetHelper

class MySQLRowSetHelper extends RowSetHelper {

override def toTinyIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn =
toIntegerTColumn(rows, ordinal)

override def toSmallIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn =
toIntegerTColumn(rows, ordinal)

override def toTinyIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue =
toIntegerTColumnValue(row, ordinal)

override def toSmallIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue =
toIntegerTColumnValue(row, ordinal)

override protected def toIntegerTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = {
val colHead = if (rows.isEmpty) None else rows.head(ordinal)
colHead match {
case v: Integer => super.toIntegerTColumn(rows, ordinal)
case v: java.lang.Long => super.toBigIntTColumn(rows, ordinal)
case _ => super.toDefaultTColumn(rows, ordinal, Types.INTEGER)
}
}

override protected def toIntegerTColumnValue(row: List[Any], ordinal: Int): TColumnValue = {
row(ordinal) match {
case v: Integer => super.toIntegerTColumnValue(row, ordinal)
case v: java.lang.Long => super.toBigIntTColumnValue(row, ordinal)
case _ => super.toDefaultTColumnValue(row, ordinal, Types.INTEGER)
}
}

override protected def toBigIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = {
val colHead = if (rows.isEmpty) None else rows.head(ordinal)
colHead match {
case v: java.lang.Long => super.toBigIntTColumn(rows, ordinal)
case _ => super.toDefaultTColumn(rows, ordinal, Types.BIGINT)
}
}

override protected def toBigIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue =
row(ordinal) match {
case v: java.lang.Long => super.toBigIntTColumnValue(row, ordinal)
case _ => super.toDefaultTColumnValue(row, ordinal, Types.BIGINT)
}
}
Loading

0 comments on commit 5481bf5

Please sign in to comment.