diff --git a/native-sql-engine/core/pom.xml b/native-sql-engine/core/pom.xml index 182310337..ebab7d397 100644 --- a/native-sql-engine/core/pom.xml +++ b/native-sql-engine/core/pom.xml @@ -255,7 +255,7 @@ com.h2database h2 - 1.4.195 + 2.1.210 test diff --git a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index ae43cbe30..e52c64002 100644 --- a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -20,22 +20,24 @@ package org.apache.spark.sql.jdbc import java.math.BigDecimal import java.sql.{Date, DriverManager, SQLException, Timestamp} import java.time.{Instant, LocalDate} -import java.util.{Calendar, GregorianCalendar, Properties} +import java.util.{Calendar, GregorianCalendar, Properties, TimeZone} import scala.collection.JavaConverters._ -import org.h2.jdbc.JdbcSQLException +import org.mockito.ArgumentMatchers._ +import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} -import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.{analysis, TableIdentifier} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.plans.logical.ShowCreateTable import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeTestUtils} import org.apache.spark.sql.execution.{DataSourceScanExec, ExtendedMode} import org.apache.spark.sql.execution.command.{ExplainCommand, ShowCreateTableCommand} import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCPartition, JDBCRDD, JDBCRelation, JdbcUtils} +import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCPartition, JDBCRelation, JdbcUtils} import org.apache.spark.sql.execution.metric.InputOutputMetricsHelper import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ @@ -51,7 +53,8 @@ class JDBCSuite extends QueryTest val urlWithUserAndPass = "jdbc:h2:mem:testdb0;user=testUser;password=testPass" var conn: java.sql.Connection = null - val testBytes = Array[Byte](99.toByte, 134.toByte, 135.toByte, 200.toByte, 205.toByte) + val testBytes = Array[Byte](99.toByte, 134.toByte, 135.toByte, 200.toByte, 205.toByte) ++ + Array.fill(15)(0.toByte) val testH2Dialect = new JdbcDialect { override def canHandle(url: String): Boolean = url.startsWith("jdbc:h2") @@ -74,6 +77,8 @@ class JDBCSuite extends QueryTest } } + val defaultMetadata = new MetadataBuilder().putLong("scale", 0).build() + override def beforeAll(): Unit = { super.beforeAll() Utils.classForName("org.h2.Driver") @@ -82,7 +87,6 @@ class JDBCSuite extends QueryTest val properties = new Properties() properties.setProperty("user", "testUser") properties.setProperty("password", "testPass") - properties.setProperty("rowId", "false") conn = DriverManager.getConnection(url, properties) conn.prepareStatement("create schema test").executeUpdate() @@ -129,9 +133,9 @@ class JDBCSuite extends QueryTest conn.prepareStatement("create table test.inttypes (a INT, b BOOLEAN, c TINYINT, " + "d SMALLINT, e BIGINT)").executeUpdate() conn.prepareStatement("insert into test.inttypes values (1, false, 3, 4, 1234567890123)" - ).executeUpdate() + ).executeUpdate() conn.prepareStatement("insert into test.inttypes values (null, null, null, null, null)" - ).executeUpdate() + ).executeUpdate() conn.commit() sql( s""" @@ -157,8 +161,8 @@ class JDBCSuite extends QueryTest |OPTIONS (url '$url', dbtable 'TEST.STRTYPES', user 'testUser', password 'testPass') """.stripMargin.replaceAll("\n", " ")) - conn.prepareStatement("create table test.timetypes (a TIME, b DATE, c TIMESTAMP)" - ).executeUpdate() + conn.prepareStatement("create table test.timetypes (a TIME, b DATE, c TIMESTAMP(7))" + ).executeUpdate() conn.prepareStatement("insert into test.timetypes values ('12:34:56', " + "'1996-01-01', '2002-02-20 11:22:33.543543543')").executeUpdate() conn.prepareStatement("insert into test.timetypes values ('12:34:56', " @@ -172,17 +176,17 @@ class JDBCSuite extends QueryTest """.stripMargin.replaceAll("\n", " ")) conn.prepareStatement("CREATE TABLE test.timezone (tz TIMESTAMP WITH TIME ZONE) " + - "AS SELECT '1999-01-08 04:05:06.543543543 GMT-08:00'") + "AS SELECT '1999-01-08 04:05:06.543543543-08:00'") .executeUpdate() conn.commit() - conn.prepareStatement("CREATE TABLE test.array (ar ARRAY) " + - "AS SELECT '(1, 2, 3)'") + conn.prepareStatement("CREATE TABLE test.array_table (ar Integer ARRAY) " + + "AS SELECT ARRAY[1, 2, 3]") .executeUpdate() conn.commit() conn.prepareStatement("create table test.flttypes (a DOUBLE, b REAL, c DECIMAL(38, 18))" - ).executeUpdate() + ).executeUpdate() conn.prepareStatement("insert into test.flttypes values (" + "1.0000000000000002220446049250313080847263336181640625, " + "1.00000011920928955078125, " @@ -317,7 +321,6 @@ class JDBCSuite extends QueryTest assert(sql("SELECT * FROM foobar").collect().size === 3) } - /* ignore("SELECT * WHERE (simple predicates)") { assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0) assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2) @@ -354,7 +357,6 @@ class JDBCSuite extends QueryTest assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 1) < 2")).collect().size == 0) assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 2) != 4")).collect().size == 2) } - */ test("SELECT COUNT(1) WHERE (predicates)") { // Check if an answer is correct when Filter is removed from operations such as count() which @@ -605,7 +607,7 @@ class JDBCSuite extends QueryTest assert(rows(0).getAs[Array[Byte]](0).sameElements(testBytes)) assert(rows(0).getString(1).equals("Sensitive")) assert(rows(0).getString(2).equals("Insensitive")) - assert(rows(0).getString(3).equals("Twenty-byte CHAR")) + assert(rows(0).getString(3).equals("Twenty-byte CHAR ")) assert(rows(0).getAs[Array[Byte]](4).sameElements(testBytes)) assert(rows(0).getString(5).equals("I am a clob!")) } @@ -628,6 +630,7 @@ class JDBCSuite extends QueryTest assert(cal.get(Calendar.HOUR) === 11) assert(cal.get(Calendar.MINUTE) === 22) assert(cal.get(Calendar.SECOND) === 33) + assert(cal.get(Calendar.MILLISECOND) === 543) assert(rows(0).getAs[java.sql.Timestamp](2).getNanos === 543543000) } @@ -684,20 +687,6 @@ class JDBCSuite extends QueryTest assert(math.abs(rows(0).getDouble(1) - 1.00000023841859331) < 1e-12) } - test("Pass extra properties via OPTIONS") { - // We set rowId to false during setup, which means that _ROWID_ column should be absent from - // all tables. If rowId is true (default), the query below doesn't throw an exception. - intercept[JdbcSQLException] { - sql( - s""" - |CREATE OR REPLACE TEMPORARY VIEW abc - |USING org.apache.spark.sql.jdbc - |OPTIONS (url '$url', dbtable '(SELECT _ROWID_ FROM test.people)', - | user 'testUser', password 'testPass') - """.stripMargin.replaceAll("\n", " ")) - } - } - test("Remap types via JdbcDialects") { JdbcDialects.registerDialect(testH2Dialect) val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", new Properties()) @@ -742,36 +731,6 @@ class JDBCSuite extends QueryTest assert(DerbyColumns === Seq(""""abc"""", """"key"""")) } - test("compile filters") { - val compileFilter = PrivateMethod[Option[String]](Symbol("compileFilter")) - def doCompileFilter(f: Filter): String = - JDBCRDD invokePrivate compileFilter(f, JdbcDialects.get("jdbc:")) getOrElse("") - assert(doCompileFilter(EqualTo("col0", 3)) === """"col0" = 3""") - assert(doCompileFilter(Not(EqualTo("col1", "abc"))) === """(NOT ("col1" = 'abc'))""") - assert(doCompileFilter(And(EqualTo("col0", 0), EqualTo("col1", "def"))) - === """("col0" = 0) AND ("col1" = 'def')""") - assert(doCompileFilter(Or(EqualTo("col0", 2), EqualTo("col1", "ghi"))) - === """("col0" = 2) OR ("col1" = 'ghi')""") - assert(doCompileFilter(LessThan("col0", 5)) === """"col0" < 5""") - assert(doCompileFilter(LessThan("col3", - Timestamp.valueOf("1995-11-21 00:00:00.0"))) === """"col3" < '1995-11-21 00:00:00.0'""") - assert(doCompileFilter(LessThan("col4", Date.valueOf("1983-08-04"))) - === """"col4" < '1983-08-04'""") - assert(doCompileFilter(LessThanOrEqual("col0", 5)) === """"col0" <= 5""") - assert(doCompileFilter(GreaterThan("col0", 3)) === """"col0" > 3""") - assert(doCompileFilter(GreaterThanOrEqual("col0", 3)) === """"col0" >= 3""") - assert(doCompileFilter(In("col1", Array("jkl"))) === """"col1" IN ('jkl')""") - assert(doCompileFilter(In("col1", Array.empty)) === - """CASE WHEN "col1" IS NULL THEN NULL ELSE FALSE END""") - assert(doCompileFilter(Not(In("col1", Array("mno", "pqr")))) - === """(NOT ("col1" IN ('mno', 'pqr')))""") - assert(doCompileFilter(IsNull("col1")) === """"col1" IS NULL""") - assert(doCompileFilter(IsNotNull("col1")) === """"col1" IS NOT NULL""") - assert(doCompileFilter(And(EqualNullSafe("col0", "abc"), EqualTo("col1", "def"))) - === """((NOT ("col0" != 'abc' OR "col0" IS NULL OR 'abc' IS NULL) """ - + """OR ("col0" IS NULL AND 'abc' IS NULL))) AND ("col1" = 'def')""") - } - test("Dialect unregister") { JdbcDialects.unregisterDialect(H2Dialect) try { @@ -963,7 +922,7 @@ class JDBCSuite extends QueryTest val postgresQuery = s"TRUNCATE TABLE ONLY $table" val teradataQuery = s"DELETE FROM $table ALL" - Seq(mysql, db2, h2, derby).foreach{ dialect => + Seq(mysql, h2, derby).foreach{ dialect => assert(dialect.getTruncateQuery(table, Some(true)) == defaultQuery) } @@ -989,7 +948,7 @@ class JDBCSuite extends QueryTest val oracleQuery = s"TRUNCATE TABLE $table CASCADE" val teradataQuery = s"DELETE FROM $table ALL" - Seq(mysql, db2, h2, derby).foreach{ dialect => + Seq(mysql, h2, derby).foreach{ dialect => assert(dialect.getTruncateQuery(table, Some(true)) == defaultQuery) } assert(postgres.getTruncateQuery(table, Some(true)) == postgresQuery) @@ -1042,34 +1001,6 @@ class JDBCSuite extends QueryTest } } - test("hide credentials in create and describe a persistent/temp table") { - val password = "testPass" - val tableName = "tab1" - Seq("TABLE", "TEMPORARY VIEW").foreach { tableType => - withTable(tableName) { - val df = sql( - s""" - |CREATE $tableType $tableName - |USING org.apache.spark.sql.jdbc - |OPTIONS ( - | url '$urlWithUserAndPass', - | dbtable 'TEST.PEOPLE', - | user 'testUser', - | password '$password') - """.stripMargin) - - val explain = ExplainCommand(df.queryExecution.logical, ExtendedMode) - spark.sessionState.executePlan(explain).executedPlan.executeCollect().foreach { r => - assert(!r.toString.contains(password)) - } - - sql(s"DESC FORMATTED $tableName").collect().foreach { r => - assert(!r.toString().contains(password)) - } - } - } - } - test("Hide credentials in show create table") { val userName = "testUser" val password = "testPass" @@ -1078,17 +1009,17 @@ class JDBCSuite extends QueryTest withTable(tableName) { sql( s""" - |CREATE TABLE $tableName - |USING org.apache.spark.sql.jdbc - |OPTIONS ( - | url '$urlWithUserAndPass', - | dbtable '$dbTable', - | user '$userName', - | password '$password') + |CREATE TABLE $tableName + |USING org.apache.spark.sql.jdbc + |OPTIONS ( + | url '$urlWithUserAndPass', + | dbtable '$dbTable', + | user '$userName', + | password '$password') """.stripMargin) val show = ShowCreateTableCommand(TableIdentifier(tableName)) - spark.sessionState.executePlan(show).executedPlan.executeCollect().foreach { r => + spark.sessionState.executePlan(show).executedPlan.executeCollectPublic().foreach { r => assert(!r.toString.contains(password)) assert(r.toString.contains(dbTable)) assert(r.toString.contains(userName)) @@ -1310,7 +1241,7 @@ class JDBCSuite extends QueryTest }.getMessage assert(e.contains("Unsupported type TIMESTAMP_WITH_TIMEZONE")) e = intercept[SQLException] { - spark.read.jdbc(urlWithUserAndPass, "TEST.ARRAY", new Properties()).collect() + spark.read.jdbc(urlWithUserAndPass, "TEST.ARRAY_TABLE", new Properties()).collect() }.getMessage assert(e.contains("Unsupported type ARRAY")) } @@ -1391,9 +1322,9 @@ class JDBCSuite extends QueryTest |sessionInitStatement 'SET @MYTESTVAR1 21519; SET @MYTESTVAR2 1234') """.stripMargin) - val df3 = sql("SELECT * FROM test_sessionInitStatement") - assert(df3.collect() === Array(Row(21519, 1234))) - } + val df3 = sql("SELECT * FROM test_sessionInitStatement") + assert(df3.collect() === Array(Row(21519, 1234))) + } test("jdbc data source shouldn't have unnecessary metadata in its schema") { val schema = StructType(Seq( @@ -1499,12 +1430,12 @@ class JDBCSuite extends QueryTest val e3 = intercept[RuntimeException] { sql( s""" - |CREATE OR REPLACE TEMPORARY VIEW queryOption - |USING org.apache.spark.sql.jdbc - |OPTIONS (url '$url', query '$query', dbtable 'TEST.PEOPLE', - | user 'testUser', password 'testPass') + |CREATE OR REPLACE TEMPORARY VIEW queryOption + |USING org.apache.spark.sql.jdbc + |OPTIONS (url '$url', query '$query', dbtable 'TEST.PEOPLE', + | user 'testUser', password 'testPass') """.stripMargin.replaceAll("\n", " ")) - }.getMessage + }.getMessage assert(e3.contains("Both 'dbtable' and 'query' can not be specified at the same time.")) val e4 = intercept[RuntimeException] { @@ -1632,7 +1563,6 @@ class JDBCSuite extends QueryTest "Partition column type should be numeric, date, or timestamp, but string found.")) } - /* ignore("SPARK-24288: Enable preventing predicate pushdown") { val table = "test.people" @@ -1658,7 +1588,6 @@ class JDBCSuite extends QueryTest checkNotPushdown(sql("SELECT name, theid FROM predicateOption WHERE theid = 1")), Row("fred", 1) :: Nil) } - */ test("SPARK-26383 throw IllegalArgumentException if wrong kind of driver to the given url") { val e = intercept[IllegalArgumentException] { diff --git a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index efa2773bf..c5f6f7dfe 100644 --- a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -227,7 +227,7 @@ class JDBCWriteSuite extends SharedSparkSession with BeforeAndAfter { JdbcDialects.registerDialect(testH2Dialect) val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) - val m = intercept[org.h2.jdbc.JdbcSQLException] { + val m = intercept[org.h2.jdbc.JdbcSQLSyntaxErrorException] { df.write.option("createTableOptions", "ENGINE tableEngineName") .jdbc(url1, "TEST.CREATETBLOPTS", properties) }.getMessage @@ -326,7 +326,7 @@ class JDBCWriteSuite extends SharedSparkSession with BeforeAndAfter { test("save errors if wrong user/password combination") { val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) - val e = intercept[org.h2.jdbc.JdbcSQLException] { + val e = intercept[org.h2.jdbc.JdbcSQLInvalidAuthorizationSpecException] { df.write.format("jdbc") .option("dbtable", "TEST.SAVETEST") .option("url", url1) @@ -416,9 +416,9 @@ class JDBCWriteSuite extends SharedSparkSession with BeforeAndAfter { test("SPARK-10849: create table using user specified column type and verify on target table") { def testUserSpecifiedColTypes( - df: DataFrame, - createTableColTypes: String, - expectedTypes: Map[String, String]): Unit = { + df: DataFrame, + createTableColTypes: String, + expectedTypes: Map[String, String]): Unit = { df.write .mode(SaveMode.Overwrite) .option("createTableColumnTypes", createTableColTypes) @@ -427,16 +427,16 @@ class JDBCWriteSuite extends SharedSparkSession with BeforeAndAfter { // verify the data types of the created table by reading the database catalog of H2 val query = """ - |(SELECT column_name, type_name, character_maximum_length + |(SELECT column_name, data_type, character_maximum_length | FROM information_schema.columns WHERE table_name = 'DBCOLTYPETEST') - """.stripMargin + """.stripMargin val rows = spark.read.jdbc(url1, query, properties).collect() rows.foreach { row => val typeName = row.getString(1) // For CHAR and VARCHAR, we also compare the max length if (typeName.contains("CHAR")) { - val charMaxLength = row.getInt(2) + val charMaxLength = row.getLong(2) assert(expectedTypes(row.getString(0)) == s"$typeName($charMaxLength)") } else { assert(expectedTypes(row.getString(0)) == typeName) @@ -452,15 +452,18 @@ class JDBCWriteSuite extends SharedSparkSession with BeforeAndAfter { val df = spark.createDataFrame(sparkContext.parallelize(data), schema) // out-of-order - val expected1 = Map("id" -> "BIGINT", "first#name" -> "VARCHAR(123)", "city" -> "CHAR(20)") + val expected1 = + Map("id" -> "BIGINT", "first#name" -> "CHARACTER VARYING(123)", "city" -> "CHARACTER(20)") testUserSpecifiedColTypes(df, "`first#name` VARCHAR(123), id BIGINT, city CHAR(20)", expected1) // partial schema - val expected2 = Map("id" -> "INTEGER", "first#name" -> "VARCHAR(123)", "city" -> "CHAR(20)") + val expected2 = + Map("id" -> "INTEGER", "first#name" -> "CHARACTER VARYING(123)", "city" -> "CHARACTER(20)") testUserSpecifiedColTypes(df, "`first#name` VARCHAR(123), city CHAR(20)", expected2) withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { // should still respect the original column names - val expected = Map("id" -> "INTEGER", "first#name" -> "VARCHAR(123)", "city" -> "CLOB") + val expected = Map("id" -> "INTEGER", "first#name" -> "CHARACTER VARYING(123)", + "city" -> "CHARACTER LARGE OBJECT(9223372036854775807)") testUserSpecifiedColTypes(df, "`FiRsT#NaMe` VARCHAR(123)", expected) } @@ -470,7 +473,9 @@ class JDBCWriteSuite extends SharedSparkSession with BeforeAndAfter { StructField("First#Name", StringType) :: StructField("city", StringType) :: Nil) val df = spark.createDataFrame(sparkContext.parallelize(data), schema) - val expected = Map("id" -> "INTEGER", "First#Name" -> "VARCHAR(123)", "city" -> "CLOB") + val expected = + Map("id" -> "INTEGER", "First#Name" -> "CHARACTER VARYING(123)", + "city" -> "CHARACTER LARGE OBJECT(9223372036854775807)") testUserSpecifiedColTypes(df, "`First#Name` VARCHAR(123)", expected) } }