Skip to content

Commit

Permalink
[SPARK-18885][SQL] unify CREATE TABLE syntax for data source and hive…
Browse files Browse the repository at this point in the history
… serde tables

## What changes were proposed in this pull request?

Today we have different syntax to create data source or hive serde tables, we should unify them to not confuse users and step forward to make hive a data source.

Please read https://issues.apache.org/jira/secure/attachment/12843835/CREATE-TABLE.pdf for  details.

TODO(for follow-up PRs):
1. TBLPROPERTIES is not added to the new syntax, we should decide if we wanna add it later.
2. `SHOW CREATE TABLE` should be updated to use the new syntax.
3. we should decide if we wanna change the behavior of `SET LOCATION`.

## How was this patch tested?

new tests

Author: Wenchen Fan <[email protected]>

Closes #16296 from cloud-fan/create-table.
  • Loading branch information
cloud-fan authored and yhuai committed Jan 6, 2017
1 parent f5d18af commit cca945b
Show file tree
Hide file tree
Showing 24 changed files with 526 additions and 143 deletions.
60 changes: 52 additions & 8 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -522,14 +522,11 @@ Hive metastore. Persistent tables will still exist even after your Spark program
long as you maintain your connection to the same metastore. A DataFrame for a persistent table can
be created by calling the `table` method on a `SparkSession` with the name of the table.

By default `saveAsTable` will create a "managed table", meaning that the location of the data will
be controlled by the metastore. Managed tables will also have their data deleted automatically
when a table is dropped.

Currently, `saveAsTable` does not expose an API supporting the creation of an "external table" from a `DataFrame`.
However, this functionality can be achieved by providing a `path` option to the `DataFrameWriter` with `path` as the key
and location of the external table as its value (a string) when saving the table with `saveAsTable`. When an External table
is dropped only its metadata is removed.
For file-based data source, e.g. text, parquet, json, etc. you can specify a custom table path via the
`path` option, e.g. `df.write.option("path", "/some/path").saveAsTable("t")`. When the table is dropped,
the custom table path will not be removed and the table data is still there. If no custom table path is
specifed, Spark will write data to a default table path under the warehouse directory. When the table is
dropped, the default table path will be removed too.

Starting from Spark 2.1, persistent datasource tables have per-partition metadata stored in the Hive metastore. This brings several benefits:

Expand Down Expand Up @@ -954,6 +951,53 @@ adds support for finding tables in the MetaStore and writing queries using HiveQ
</div>
</div>

### Specifying storage format for Hive tables

When you create a Hive table, you need to define how this table should read/write data from/to file system,
i.e. the "input format" and "output format". You also need to define how this table should deserialize the data
to rows, or serialize rows to data, i.e. the "serde". The following options can be used to specify the storage
format("serde", "input format", "output format"), e.g. `CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')`.
By default, we will read the table files as plain text. Note that, Hive storage handler is not supported yet when
creating table, you can create a table using storage handler at Hive side, and use Spark SQL to read it.

<table class="table">
<tr><th>Property Name</th><th>Meaning</th></tr>
<tr>
<td><code>fileFormat</code></td>
<td>
A fileFormat is kind of a package of storage format specifications, including "serde", "input format" and
"output format". Currently we support 6 fileFormats: 'sequencefile', 'rcfile', 'orc', 'parquet', 'textfile' and 'avro'.
</td>
</tr>

<tr>
<td><code>inputFormat, outputFormat</code></td>
<td>
These 2 options specify the name of a corresponding `InputFormat` and `OutputFormat` class as a string literal,
e.g. `org.apache.hadoop.hive.ql.io.orc.OrcInputFormat`. These 2 options must be appeared in pair, and you can not
specify them if you already specified the `fileFormat` option.
</td>
</tr>

<tr>
<td><code>serde</code></td>
<td>
This option specifies the name of a serde class. When the `fileFormat` option is specified, do not specify this option
if the given `fileFormat` already include the information of serde. Currently "sequencefile", "textfile" and "rcfile"
don't include the serde information and you can use this option with these 3 fileFormats.
</td>
</tr>

<tr>
<td><code>fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim</code></td>
<td>
These options can only be used with "textfile" fileFormat. They define how to read delimited files into rows.
</td>
</tr>
</table>

All other properties defined with `OPTIONS` will be regarded as Hive serde properties.

### Interacting with Different Versions of Hive Metastore

One of the most important pieces of Spark SQL's Hive support is interaction with Hive metastore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static void main(String[] args) {
.enableHiveSupport()
.getOrCreate();

spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");

// Queries are expressed in HiveQL
Expand Down
2 changes: 1 addition & 1 deletion examples/src/main/python/sql/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
.getOrCreate()

# spark is an existing SparkSession
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries are expressed in HiveQL
Expand Down
2 changes: 1 addition & 1 deletion examples/src/main/r/RSparkSQLExample.R
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ head(teenagers)
# $example on:spark_hive$
# enableHiveSupport defaults to TRUE
sparkR.session(enableHiveSupport = TRUE)
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries can be expressed in HiveQL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object SparkHiveExample {
import spark.implicits._
import spark.sql

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,18 @@ statement
| ALTER DATABASE identifier SET DBPROPERTIES tablePropertyList #setDatabaseProperties
| DROP DATABASE (IF EXISTS)? identifier (RESTRICT | CASCADE)? #dropDatabase
| createTableHeader ('(' colTypeList ')')? tableProvider
(OPTIONS tablePropertyList)?
(OPTIONS options=tablePropertyList)?
(PARTITIONED BY partitionColumnNames=identifierList)?
bucketSpec? (AS? query)? #createTableUsing
bucketSpec? locationSpec?
(COMMENT comment=STRING)?
(AS? query)? #createTable
| createTableHeader ('(' columns=colTypeList ')')?
(COMMENT STRING)?
(COMMENT comment=STRING)?
(PARTITIONED BY '(' partitionColumns=colTypeList ')')?
bucketSpec? skewSpec?
rowFormat? createFileFormat? locationSpec?
(TBLPROPERTIES tablePropertyList)?
(AS? query)? #createTable
(AS? query)? #createHiveTable
| CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier
LIKE source=tableIdentifier #createTableLike
| ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]

override def get(k: String): Option[String] = baseMap.get(k.toLowerCase)

override def contains(k: String): Boolean = baseMap.contains(k.toLowerCase)

override def + [B1 >: String](kv: (String, B1)): Map[String, B1] =
baseMap + kv.copy(_1 = kv._1.toLowerCase)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,42 +342,48 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}

/**
* Create a data source table, returning a [[CreateTable]] logical plan.
* Create a table, returning a [[CreateTable]] logical plan.
*
* Expected format:
* {{{
* CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
* CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name
* USING table_provider
* [OPTIONS table_property_list]
* [PARTITIONED BY (col_name, col_name, ...)]
* [CLUSTERED BY (col_name, col_name, ...)
* [SORTED BY (col_name [ASC|DESC], ...)]
* INTO num_buckets BUCKETS
* ]
* [LOCATION path]
* [COMMENT table_comment]
* [AS select_statement];
* }}}
*/
override def visitCreateTableUsing(ctx: CreateTableUsingContext): LogicalPlan = withOrigin(ctx) {
override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
if (external) {
operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx)
}
val options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
val provider = ctx.tableProvider.qualifiedName.getText
if (provider.toLowerCase == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Cannot create hive serde table with CREATE TABLE USING")
}
val schema = Option(ctx.colTypeList()).map(createSchema)
val partitionColumnNames =
Option(ctx.partitionColumnNames)
.map(visitIdentifierList(_).toArray)
.getOrElse(Array.empty[String])
val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)

// TODO: this may be wrong for non file-based data source like JDBC, which should be external
// even there is no `path` in options. We should consider allow the EXTERNAL keyword.
val location = Option(ctx.locationSpec).map(visitLocationSpec)
val storage = DataSource.buildStorageFormatFromOptions(options)
val tableType = if (storage.locationUri.isDefined) {

if (location.isDefined && storage.locationUri.isDefined) {
throw new ParseException(
"LOCATION and 'path' in OPTIONS are both used to indicate the custom table path, " +
"you can only specify one of them.", ctx)
}
val customLocation = storage.locationUri.orElse(location)

val tableType = if (customLocation.isDefined) {
CatalogTableType.EXTERNAL
} else {
CatalogTableType.MANAGED
Expand All @@ -386,12 +392,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
val tableDesc = CatalogTable(
identifier = table,
tableType = tableType,
storage = storage,
storage = storage.copy(locationUri = customLocation),
schema = schema.getOrElse(new StructType),
provider = Some(provider),
partitionColumnNames = partitionColumnNames,
bucketSpec = bucketSpec
)
bucketSpec = bucketSpec,
comment = Option(ctx.comment).map(string))

// Determine the storage mode.
val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
Expand Down Expand Up @@ -1011,10 +1017,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}

/**
* Create a table, returning a [[CreateTable]] logical plan.
* Create a Hive serde table, returning a [[CreateTable]] logical plan.
*
* This is not used to create datasource tables, which is handled through
* "CREATE TABLE ... USING ...".
* This is a legacy syntax for Hive compatibility, we recommend users to use the Spark SQL
* CREATE TABLE syntax to create Hive serde table, e.g. "CREATE TABLE ... USING hive ..."
*
* Note: several features are currently not supported - temporary tables, bucketing,
* skewed columns and storage handlers (STORED BY).
Expand All @@ -1032,7 +1038,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
* [AS select_statement];
* }}}
*/
override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = withOrigin(ctx) {
val (name, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
// TODO: implement temporary tables
if (temp) {
Expand All @@ -1046,7 +1052,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
if (ctx.bucketSpec != null) {
operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx)
}
val comment = Option(ctx.STRING).map(string)
val dataCols = Option(ctx.columns).map(visitColTypeList).getOrElse(Nil)
val partitionCols = Option(ctx.partitionColumns).map(visitColTypeList).getOrElse(Nil)
val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
Expand All @@ -1057,19 +1062,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
val schema = StructType(dataCols ++ partitionCols)

// Storage format
val defaultStorage: CatalogStorageFormat = {
val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile")
val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType)
CatalogStorageFormat(
locationUri = None,
inputFormat = defaultHiveSerde.flatMap(_.inputFormat)
.orElse(Some("org.apache.hadoop.mapred.TextInputFormat")),
outputFormat = defaultHiveSerde.flatMap(_.outputFormat)
.orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
serde = defaultHiveSerde.flatMap(_.serde),
compressed = false,
properties = Map())
}
val defaultStorage = HiveSerDe.getDefaultStorage(conf)
validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx)
val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
.getOrElse(CatalogStorageFormat.empty)
Expand Down Expand Up @@ -1104,7 +1097,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
provider = Some(DDLUtils.HIVE_PROVIDER),
partitionColumnNames = partitionCols.map(_.name),
properties = properties,
comment = comment)
comment = Option(ctx.comment).map(string))

val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

object DDLStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case CreateTable(tableDesc, mode, None)
if tableDesc.provider.get == DDLUtils.HIVE_PROVIDER =>
case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) =>
val cmd = CreateTableCommand(tableDesc, ifNotExists = mode == SaveMode.Ignore)
ExecutedCommandExec(cmd) :: Nil

Expand All @@ -421,8 +420,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// CREATE TABLE ... AS SELECT ... for hive serde table is handled in hive module, by rule
// `CreateTables`

case CreateTable(tableDesc, mode, Some(query))
if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER =>
case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isDatasourceTable(tableDesc) =>
val cmd =
CreateDataSourceTableAsSelectCommand(
tableDesc,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -762,8 +762,12 @@ case class AlterTableSetLocationCommand(
object DDLUtils {
val HIVE_PROVIDER = "hive"

def isHiveTable(table: CatalogTable): Boolean = {
table.provider.isDefined && table.provider.get.toLowerCase == HIVE_PROVIDER
}

def isDatasourceTable(table: CatalogTable): Boolean = {
table.provider.isDefined && table.provider.get != HIVE_PROVIDER
table.provider.isDefined && table.provider.get.toLowerCase != HIVE_PROVIDER
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
throw new AnalysisException("Saving data into a view is not allowed.")
}

if (existingTable.provider.get == DDLUtils.HIVE_PROVIDER) {
if (DDLUtils.isHiveTable(existingTable)) {
throw new AnalysisException(s"Saving data in the Hive serde table $tableName is " +
"not supported yet. Please use the insertInto() API as an alternative.")
}
Expand Down Expand Up @@ -233,7 +233,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
checkDuplication(normalizedPartitionCols, "partition")

if (schema.nonEmpty && normalizedPartitionCols.length == schema.length) {
if (table.provider.get == DDLUtils.HIVE_PROVIDER) {
if (DDLUtils.isHiveTable(table)) {
// When we hit this branch, it means users didn't specify schema for the table to be
// created, as we always include partition columns in table schema for hive serde tables.
// The real schema will be inferred at hive metastore by hive serde, plus the given
Expand Down Expand Up @@ -380,8 +380,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
object HiveOnlyCheck extends (LogicalPlan => Unit) {
def apply(plan: LogicalPlan): Unit = {
plan.foreach {
case CreateTable(tableDesc, _, Some(_))
if tableDesc.provider.get == DDLUtils.HIVE_PROVIDER =>
case CreateTable(tableDesc, _, Some(_)) if DDLUtils.isHiveTable(tableDesc) =>
throw new AnalysisException("Hive support is required to use CREATE Hive TABLE AS SELECT")

case _ => // OK
Expand Down
Loading

0 comments on commit cca945b

Please sign in to comment.