diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index d38aed1db719f..274310fc4c5d4 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1023,6 +1023,13 @@ the Data Sources API. The following options are supported:
The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows).
+
+
+ createTableOptions |
+
+ This is a JDBC writer related option. If specified, this option allows setting of database-specific table and partition options when creating a table. For example: CREATE TABLE t (name string) ENGINE=InnoDB.
+ |
+
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index a4c4a5defa1b3..3cad7df447627 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource, HadoopFsRelation}
-import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
+import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
/**
* Interface used to write a [[Dataset]] to external storage systems (e.g. file systems,
@@ -399,6 +399,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
assertNotPartitioned("jdbc")
assertNotBucketed("jdbc")
+ // to add required options like URL and dbtable
+ val params = extraOptions.toMap ++ Map("url" -> url, "dbtable" -> table)
+ val jdbcOptions = new JDBCOptions(params)
+ val jdbcUrl = jdbcOptions.url
+ val jdbcTable = jdbcOptions.table
+
val props = new Properties()
extraOptions.foreach { case (key, value) =>
props.put(key, value)
@@ -408,25 +414,29 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val conn = JdbcUtils.createConnectionFactory(url, props)()
try {
- var tableExists = JdbcUtils.tableExists(conn, url, table)
+ var tableExists = JdbcUtils.tableExists(conn, jdbcUrl, jdbcTable)
if (mode == SaveMode.Ignore && tableExists) {
return
}
if (mode == SaveMode.ErrorIfExists && tableExists) {
- sys.error(s"Table $table already exists.")
+ sys.error(s"Table $jdbcTable already exists.")
}
if (mode == SaveMode.Overwrite && tableExists) {
- JdbcUtils.dropTable(conn, table)
+ JdbcUtils.dropTable(conn, jdbcTable)
tableExists = false
}
// Create the table if the table didn't exist.
if (!tableExists) {
- val schema = JdbcUtils.schemaString(df, url)
- val sql = s"CREATE TABLE $table ($schema)"
+ val schema = JdbcUtils.schemaString(df, jdbcUrl)
+ // To allow certain options to append when create a new table, which can be
+ // table_options or partition_options.
+ // E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8"
+ val createtblOptions = jdbcOptions.createTableOptions
+ val sql = s"CREATE TABLE $jdbcTable ($schema) $createtblOptions"
val statement = conn.createStatement
try {
statement.executeUpdate(sql)
@@ -438,7 +448,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
conn.close()
}
- JdbcUtils.saveTable(df, url, table, props)
+ JdbcUtils.saveTable(df, jdbcUrl, jdbcTable, props)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index 6c6ec89746ee1..a9f156d51f964 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -20,14 +20,21 @@ package org.apache.spark.sql.execution.datasources.jdbc
/**
* Options for the JDBC data source.
*/
-private[jdbc] class JDBCOptions(
+class JDBCOptions(
@transient private val parameters: Map[String, String])
extends Serializable {
+ // ------------------------------------------------------------
+ // Required parameters
+ // ------------------------------------------------------------
// a JDBC URL
val url = parameters.getOrElse("url", sys.error("Option 'url' not specified"))
// name of table
val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified"))
+
+ // ------------------------------------------------------------
+ // Optional parameter list
+ // ------------------------------------------------------------
// the column used to partition
val partitionColumn = parameters.getOrElse("partitionColumn", null)
// the lower bound of partition column
@@ -36,4 +43,12 @@ private[jdbc] class JDBCOptions(
val upperBound = parameters.getOrElse("upperBound", null)
// the number of partitions
val numPartitions = parameters.getOrElse("numPartitions", null)
+
+ // ------------------------------------------------------------
+ // The options for DataFrameWriter
+ // ------------------------------------------------------------
+ // the create table option , which can be table_options or partition_options.
+ // E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8"
+ // TODO: to reuse the existing partition parameters for those partition specific options
+ val createTableOptions = parameters.getOrElse("createTableOptions", "")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index 2c6449fa6870b..a6e63e826d92a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -155,6 +155,16 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
assert(2 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length)
}
+ test("createTableOptions") {
+ val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
+
+ val m = intercept[org.h2.jdbc.JdbcSQLException] {
+ df.write.option("createTableOptions", "ENGINE tableEngineName")
+ .jdbc(url1, "TEST.CREATETBLOPTS", properties)
+ }.getMessage
+ assert(m.contains("Class \"TABLEENGINENAME\" not found"))
+ }
+
test("Incompatible INSERT to append") {
val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
val df2 = spark.createDataFrame(sparkContext.parallelize(arr2x3), schema3)