Skip to content

Commit

Permalink
[SPARK-16968][SQL][BACKPORT-2.0] Add additional options in jdbc when …
Browse files Browse the repository at this point in the history
…creating a new table

### What changes were proposed in this pull request?
This PR is to backport the PRs #14559 and #14683

---
In the PR, we just allow the user to add additional options when create a new table in JDBC writer.
The options can be table_options or partition_options.
E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8"

Here is the usage example:
```
df.write.option("createTableOptions", "ENGINE=InnoDB DEFAULT CHARSET=utf8").jdbc(...)
```
### How was this patch tested?
Added a test case.

Author: gatorsmile <[email protected]>

Closes #16634 from gatorsmile/backportSPARK-16968.
  • Loading branch information
GraceH authored and gatorsmile committed Jan 19, 2017
1 parent ee4e8fa commit 9fc053c
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 8 deletions.
7 changes: 7 additions & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,13 @@ the Data Sources API. The following options are supported:
The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows).
</td>
</tr>

<tr>
<td><code>createTableOptions</code></td>
<td>
This is a JDBC writer related option. If specified, this option allows setting of database-specific table and partition options when creating a table. For example: <code>CREATE TABLE t (name string) ENGINE=InnoDB.</code>
</td>
</tr>
</table>

<div class="codetabs">
Expand Down
24 changes: 17 additions & 7 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource, HadoopFsRelation}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}

/**
* Interface used to write a [[Dataset]] to external storage systems (e.g. file systems,
Expand Down Expand Up @@ -399,6 +399,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
assertNotPartitioned("jdbc")
assertNotBucketed("jdbc")

// to add required options like URL and dbtable
val params = extraOptions.toMap ++ Map("url" -> url, "dbtable" -> table)
val jdbcOptions = new JDBCOptions(params)
val jdbcUrl = jdbcOptions.url
val jdbcTable = jdbcOptions.table

val props = new Properties()
extraOptions.foreach { case (key, value) =>
props.put(key, value)
Expand All @@ -408,25 +414,29 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val conn = JdbcUtils.createConnectionFactory(url, props)()

try {
var tableExists = JdbcUtils.tableExists(conn, url, table)
var tableExists = JdbcUtils.tableExists(conn, jdbcUrl, jdbcTable)

if (mode == SaveMode.Ignore && tableExists) {
return
}

if (mode == SaveMode.ErrorIfExists && tableExists) {
sys.error(s"Table $table already exists.")
sys.error(s"Table $jdbcTable already exists.")
}

if (mode == SaveMode.Overwrite && tableExists) {
JdbcUtils.dropTable(conn, table)
JdbcUtils.dropTable(conn, jdbcTable)
tableExists = false
}

// Create the table if the table didn't exist.
if (!tableExists) {
val schema = JdbcUtils.schemaString(df, url)
val sql = s"CREATE TABLE $table ($schema)"
val schema = JdbcUtils.schemaString(df, jdbcUrl)
// To allow certain options to append when create a new table, which can be
// table_options or partition_options.
// E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8"
val createtblOptions = jdbcOptions.createTableOptions
val sql = s"CREATE TABLE $jdbcTable ($schema) $createtblOptions"
val statement = conn.createStatement
try {
statement.executeUpdate(sql)
Expand All @@ -438,7 +448,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
conn.close()
}

JdbcUtils.saveTable(df, url, table, props)
JdbcUtils.saveTable(df, jdbcUrl, jdbcTable, props)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,21 @@ package org.apache.spark.sql.execution.datasources.jdbc
/**
* Options for the JDBC data source.
*/
private[jdbc] class JDBCOptions(
class JDBCOptions(
@transient private val parameters: Map[String, String])
extends Serializable {

// ------------------------------------------------------------
// Required parameters
// ------------------------------------------------------------
// a JDBC URL
val url = parameters.getOrElse("url", sys.error("Option 'url' not specified"))
// name of table
val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified"))

// ------------------------------------------------------------
// Optional parameter list
// ------------------------------------------------------------
// the column used to partition
val partitionColumn = parameters.getOrElse("partitionColumn", null)
// the lower bound of partition column
Expand All @@ -36,4 +43,12 @@ private[jdbc] class JDBCOptions(
val upperBound = parameters.getOrElse("upperBound", null)
// the number of partitions
val numPartitions = parameters.getOrElse("numPartitions", null)

// ------------------------------------------------------------
// The options for DataFrameWriter
// ------------------------------------------------------------
// the create table option , which can be table_options or partition_options.
// E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8"
// TODO: to reuse the existing partition parameters for those partition specific options
val createTableOptions = parameters.getOrElse("createTableOptions", "")
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,16 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
assert(2 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length)
}

test("createTableOptions") {
val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)

val m = intercept[org.h2.jdbc.JdbcSQLException] {
df.write.option("createTableOptions", "ENGINE tableEngineName")
.jdbc(url1, "TEST.CREATETBLOPTS", properties)
}.getMessage
assert(m.contains("Class \"TABLEENGINENAME\" not found"))
}

test("Incompatible INSERT to append") {
val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
val df2 = spark.createDataFrame(sparkContext.parallelize(arr2x3), schema3)
Expand Down

0 comments on commit 9fc053c

Please sign in to comment.