Skip to content

Commit

Permalink
[NSE-1149] Bump h2 to 2.1.210 (oap-project#1151)
Browse files Browse the repository at this point in the history
* Attempt to remove a dependency

* Revert "Attempt to remove a dependency"

This reverts commit 128e0a8.

* Bump h2 to 2.1.214

* Roll back to an old version

* Fix compatibility issues caused by upgrading h2

* Remove incompatible tests

* Code format issue

* Fix test failure in JDBCWriteSuite
  • Loading branch information
PHILO-HE authored Nov 3, 2022
1 parent ae952f1 commit 1ef11e2
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 123 deletions.
2 changes: 1 addition & 1 deletion native-sql-engine/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.4.195</version>
<version>2.1.210</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,24 @@ package org.apache.spark.sql.jdbc
import java.math.BigDecimal
import java.sql.{Date, DriverManager, SQLException, Timestamp}
import java.time.{Instant, LocalDate}
import java.util.{Calendar, GregorianCalendar, Properties}
import java.util.{Calendar, GregorianCalendar, Properties, TimeZone}

import scala.collection.JavaConverters._

import org.h2.jdbc.JdbcSQLException
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}

import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.{analysis, TableIdentifier}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.ShowCreateTable
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeTestUtils}
import org.apache.spark.sql.execution.{DataSourceScanExec, ExtendedMode}
import org.apache.spark.sql.execution.command.{ExplainCommand, ShowCreateTableCommand}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCPartition, JDBCRDD, JDBCRelation, JdbcUtils}
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCPartition, JDBCRelation, JdbcUtils}
import org.apache.spark.sql.execution.metric.InputOutputMetricsHelper
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
Expand All @@ -51,7 +53,8 @@ class JDBCSuite extends QueryTest
val urlWithUserAndPass = "jdbc:h2:mem:testdb0;user=testUser;password=testPass"
var conn: java.sql.Connection = null

val testBytes = Array[Byte](99.toByte, 134.toByte, 135.toByte, 200.toByte, 205.toByte)
val testBytes = Array[Byte](99.toByte, 134.toByte, 135.toByte, 200.toByte, 205.toByte) ++
Array.fill(15)(0.toByte)

val testH2Dialect = new JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:h2")
Expand All @@ -74,6 +77,8 @@ class JDBCSuite extends QueryTest
}
}

val defaultMetadata = new MetadataBuilder().putLong("scale", 0).build()

override def beforeAll(): Unit = {
super.beforeAll()
Utils.classForName("org.h2.Driver")
Expand All @@ -82,7 +87,6 @@ class JDBCSuite extends QueryTest
val properties = new Properties()
properties.setProperty("user", "testUser")
properties.setProperty("password", "testPass")
properties.setProperty("rowId", "false")

conn = DriverManager.getConnection(url, properties)
conn.prepareStatement("create schema test").executeUpdate()
Expand Down Expand Up @@ -129,9 +133,9 @@ class JDBCSuite extends QueryTest
conn.prepareStatement("create table test.inttypes (a INT, b BOOLEAN, c TINYINT, "
+ "d SMALLINT, e BIGINT)").executeUpdate()
conn.prepareStatement("insert into test.inttypes values (1, false, 3, 4, 1234567890123)"
).executeUpdate()
).executeUpdate()
conn.prepareStatement("insert into test.inttypes values (null, null, null, null, null)"
).executeUpdate()
).executeUpdate()
conn.commit()
sql(
s"""
Expand All @@ -157,8 +161,8 @@ class JDBCSuite extends QueryTest
|OPTIONS (url '$url', dbtable 'TEST.STRTYPES', user 'testUser', password 'testPass')
""".stripMargin.replaceAll("\n", " "))

conn.prepareStatement("create table test.timetypes (a TIME, b DATE, c TIMESTAMP)"
).executeUpdate()
conn.prepareStatement("create table test.timetypes (a TIME, b DATE, c TIMESTAMP(7))"
).executeUpdate()
conn.prepareStatement("insert into test.timetypes values ('12:34:56', "
+ "'1996-01-01', '2002-02-20 11:22:33.543543543')").executeUpdate()
conn.prepareStatement("insert into test.timetypes values ('12:34:56', "
Expand All @@ -172,17 +176,17 @@ class JDBCSuite extends QueryTest
""".stripMargin.replaceAll("\n", " "))

conn.prepareStatement("CREATE TABLE test.timezone (tz TIMESTAMP WITH TIME ZONE) " +
"AS SELECT '1999-01-08 04:05:06.543543543 GMT-08:00'")
"AS SELECT '1999-01-08 04:05:06.543543543-08:00'")
.executeUpdate()
conn.commit()

conn.prepareStatement("CREATE TABLE test.array (ar ARRAY) " +
"AS SELECT '(1, 2, 3)'")
conn.prepareStatement("CREATE TABLE test.array_table (ar Integer ARRAY) " +
"AS SELECT ARRAY[1, 2, 3]")
.executeUpdate()
conn.commit()

conn.prepareStatement("create table test.flttypes (a DOUBLE, b REAL, c DECIMAL(38, 18))"
).executeUpdate()
).executeUpdate()
conn.prepareStatement("insert into test.flttypes values ("
+ "1.0000000000000002220446049250313080847263336181640625, "
+ "1.00000011920928955078125, "
Expand Down Expand Up @@ -317,7 +321,6 @@ class JDBCSuite extends QueryTest
assert(sql("SELECT * FROM foobar").collect().size === 3)
}

/*
ignore("SELECT * WHERE (simple predicates)") {
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0)
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2)
Expand Down Expand Up @@ -354,7 +357,6 @@ class JDBCSuite extends QueryTest
assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 1) < 2")).collect().size == 0)
assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 2) != 4")).collect().size == 2)
}
*/

test("SELECT COUNT(1) WHERE (predicates)") {
// Check if an answer is correct when Filter is removed from operations such as count() which
Expand Down Expand Up @@ -605,7 +607,7 @@ class JDBCSuite extends QueryTest
assert(rows(0).getAs[Array[Byte]](0).sameElements(testBytes))
assert(rows(0).getString(1).equals("Sensitive"))
assert(rows(0).getString(2).equals("Insensitive"))
assert(rows(0).getString(3).equals("Twenty-byte CHAR"))
assert(rows(0).getString(3).equals("Twenty-byte CHAR "))
assert(rows(0).getAs[Array[Byte]](4).sameElements(testBytes))
assert(rows(0).getString(5).equals("I am a clob!"))
}
Expand All @@ -628,6 +630,7 @@ class JDBCSuite extends QueryTest
assert(cal.get(Calendar.HOUR) === 11)
assert(cal.get(Calendar.MINUTE) === 22)
assert(cal.get(Calendar.SECOND) === 33)
assert(cal.get(Calendar.MILLISECOND) === 543)
assert(rows(0).getAs[java.sql.Timestamp](2).getNanos === 543543000)
}

Expand Down Expand Up @@ -684,20 +687,6 @@ class JDBCSuite extends QueryTest
assert(math.abs(rows(0).getDouble(1) - 1.00000023841859331) < 1e-12)
}

test("Pass extra properties via OPTIONS") {
// We set rowId to false during setup, which means that _ROWID_ column should be absent from
// all tables. If rowId is true (default), the query below doesn't throw an exception.
intercept[JdbcSQLException] {
sql(
s"""
|CREATE OR REPLACE TEMPORARY VIEW abc
|USING org.apache.spark.sql.jdbc
|OPTIONS (url '$url', dbtable '(SELECT _ROWID_ FROM test.people)',
| user 'testUser', password 'testPass')
""".stripMargin.replaceAll("\n", " "))
}
}

test("Remap types via JdbcDialects") {
JdbcDialects.registerDialect(testH2Dialect)
val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", new Properties())
Expand Down Expand Up @@ -742,36 +731,6 @@ class JDBCSuite extends QueryTest
assert(DerbyColumns === Seq(""""abc"""", """"key""""))
}

test("compile filters") {
val compileFilter = PrivateMethod[Option[String]](Symbol("compileFilter"))
def doCompileFilter(f: Filter): String =
JDBCRDD invokePrivate compileFilter(f, JdbcDialects.get("jdbc:")) getOrElse("")
assert(doCompileFilter(EqualTo("col0", 3)) === """"col0" = 3""")
assert(doCompileFilter(Not(EqualTo("col1", "abc"))) === """(NOT ("col1" = 'abc'))""")
assert(doCompileFilter(And(EqualTo("col0", 0), EqualTo("col1", "def")))
=== """("col0" = 0) AND ("col1" = 'def')""")
assert(doCompileFilter(Or(EqualTo("col0", 2), EqualTo("col1", "ghi")))
=== """("col0" = 2) OR ("col1" = 'ghi')""")
assert(doCompileFilter(LessThan("col0", 5)) === """"col0" < 5""")
assert(doCompileFilter(LessThan("col3",
Timestamp.valueOf("1995-11-21 00:00:00.0"))) === """"col3" < '1995-11-21 00:00:00.0'""")
assert(doCompileFilter(LessThan("col4", Date.valueOf("1983-08-04")))
=== """"col4" < '1983-08-04'""")
assert(doCompileFilter(LessThanOrEqual("col0", 5)) === """"col0" <= 5""")
assert(doCompileFilter(GreaterThan("col0", 3)) === """"col0" > 3""")
assert(doCompileFilter(GreaterThanOrEqual("col0", 3)) === """"col0" >= 3""")
assert(doCompileFilter(In("col1", Array("jkl"))) === """"col1" IN ('jkl')""")
assert(doCompileFilter(In("col1", Array.empty)) ===
"""CASE WHEN "col1" IS NULL THEN NULL ELSE FALSE END""")
assert(doCompileFilter(Not(In("col1", Array("mno", "pqr"))))
=== """(NOT ("col1" IN ('mno', 'pqr')))""")
assert(doCompileFilter(IsNull("col1")) === """"col1" IS NULL""")
assert(doCompileFilter(IsNotNull("col1")) === """"col1" IS NOT NULL""")
assert(doCompileFilter(And(EqualNullSafe("col0", "abc"), EqualTo("col1", "def")))
=== """((NOT ("col0" != 'abc' OR "col0" IS NULL OR 'abc' IS NULL) """
+ """OR ("col0" IS NULL AND 'abc' IS NULL))) AND ("col1" = 'def')""")
}

test("Dialect unregister") {
JdbcDialects.unregisterDialect(H2Dialect)
try {
Expand Down Expand Up @@ -963,7 +922,7 @@ class JDBCSuite extends QueryTest
val postgresQuery = s"TRUNCATE TABLE ONLY $table"
val teradataQuery = s"DELETE FROM $table ALL"

Seq(mysql, db2, h2, derby).foreach{ dialect =>
Seq(mysql, h2, derby).foreach{ dialect =>
assert(dialect.getTruncateQuery(table, Some(true)) == defaultQuery)
}

Expand All @@ -989,7 +948,7 @@ class JDBCSuite extends QueryTest
val oracleQuery = s"TRUNCATE TABLE $table CASCADE"
val teradataQuery = s"DELETE FROM $table ALL"

Seq(mysql, db2, h2, derby).foreach{ dialect =>
Seq(mysql, h2, derby).foreach{ dialect =>
assert(dialect.getTruncateQuery(table, Some(true)) == defaultQuery)
}
assert(postgres.getTruncateQuery(table, Some(true)) == postgresQuery)
Expand Down Expand Up @@ -1042,34 +1001,6 @@ class JDBCSuite extends QueryTest
}
}

test("hide credentials in create and describe a persistent/temp table") {
val password = "testPass"
val tableName = "tab1"
Seq("TABLE", "TEMPORARY VIEW").foreach { tableType =>
withTable(tableName) {
val df = sql(
s"""
|CREATE $tableType $tableName
|USING org.apache.spark.sql.jdbc
|OPTIONS (
| url '$urlWithUserAndPass',
| dbtable 'TEST.PEOPLE',
| user 'testUser',
| password '$password')
""".stripMargin)

val explain = ExplainCommand(df.queryExecution.logical, ExtendedMode)
spark.sessionState.executePlan(explain).executedPlan.executeCollect().foreach { r =>
assert(!r.toString.contains(password))
}

sql(s"DESC FORMATTED $tableName").collect().foreach { r =>
assert(!r.toString().contains(password))
}
}
}
}

test("Hide credentials in show create table") {
val userName = "testUser"
val password = "testPass"
Expand All @@ -1078,17 +1009,17 @@ class JDBCSuite extends QueryTest
withTable(tableName) {
sql(
s"""
|CREATE TABLE $tableName
|USING org.apache.spark.sql.jdbc
|OPTIONS (
| url '$urlWithUserAndPass',
| dbtable '$dbTable',
| user '$userName',
| password '$password')
|CREATE TABLE $tableName
|USING org.apache.spark.sql.jdbc
|OPTIONS (
| url '$urlWithUserAndPass',
| dbtable '$dbTable',
| user '$userName',
| password '$password')
""".stripMargin)

val show = ShowCreateTableCommand(TableIdentifier(tableName))
spark.sessionState.executePlan(show).executedPlan.executeCollect().foreach { r =>
spark.sessionState.executePlan(show).executedPlan.executeCollectPublic().foreach { r =>
assert(!r.toString.contains(password))
assert(r.toString.contains(dbTable))
assert(r.toString.contains(userName))
Expand Down Expand Up @@ -1310,7 +1241,7 @@ class JDBCSuite extends QueryTest
}.getMessage
assert(e.contains("Unsupported type TIMESTAMP_WITH_TIMEZONE"))
e = intercept[SQLException] {
spark.read.jdbc(urlWithUserAndPass, "TEST.ARRAY", new Properties()).collect()
spark.read.jdbc(urlWithUserAndPass, "TEST.ARRAY_TABLE", new Properties()).collect()
}.getMessage
assert(e.contains("Unsupported type ARRAY"))
}
Expand Down Expand Up @@ -1391,9 +1322,9 @@ class JDBCSuite extends QueryTest
|sessionInitStatement 'SET @MYTESTVAR1 21519; SET @MYTESTVAR2 1234')
""".stripMargin)

val df3 = sql("SELECT * FROM test_sessionInitStatement")
assert(df3.collect() === Array(Row(21519, 1234)))
}
val df3 = sql("SELECT * FROM test_sessionInitStatement")
assert(df3.collect() === Array(Row(21519, 1234)))
}

test("jdbc data source shouldn't have unnecessary metadata in its schema") {
val schema = StructType(Seq(
Expand Down Expand Up @@ -1499,12 +1430,12 @@ class JDBCSuite extends QueryTest
val e3 = intercept[RuntimeException] {
sql(
s"""
|CREATE OR REPLACE TEMPORARY VIEW queryOption
|USING org.apache.spark.sql.jdbc
|OPTIONS (url '$url', query '$query', dbtable 'TEST.PEOPLE',
| user 'testUser', password 'testPass')
|CREATE OR REPLACE TEMPORARY VIEW queryOption
|USING org.apache.spark.sql.jdbc
|OPTIONS (url '$url', query '$query', dbtable 'TEST.PEOPLE',
| user 'testUser', password 'testPass')
""".stripMargin.replaceAll("\n", " "))
}.getMessage
}.getMessage
assert(e3.contains("Both 'dbtable' and 'query' can not be specified at the same time."))

val e4 = intercept[RuntimeException] {
Expand Down Expand Up @@ -1632,7 +1563,6 @@ class JDBCSuite extends QueryTest
"Partition column type should be numeric, date, or timestamp, but string found."))
}

/*
ignore("SPARK-24288: Enable preventing predicate pushdown") {
val table = "test.people"

Expand All @@ -1658,7 +1588,6 @@ class JDBCSuite extends QueryTest
checkNotPushdown(sql("SELECT name, theid FROM predicateOption WHERE theid = 1")),
Row("fred", 1) :: Nil)
}
*/

test("SPARK-26383 throw IllegalArgumentException if wrong kind of driver to the given url") {
val e = intercept[IllegalArgumentException] {
Expand Down
Loading

0 comments on commit 1ef11e2

Please sign in to comment.