Skip to content

Commit

Permalink
unify CREATE TABLE syntax for data source and hive serde tables
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Dec 30, 2016
1 parent 63036ae commit a1dbf61
Show file tree
Hide file tree
Showing 20 changed files with 414 additions and 85 deletions.
56 changes: 48 additions & 8 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -522,14 +522,10 @@ Hive metastore. Persistent tables will still exist even after your Spark program
long as you maintain your connection to the same metastore. A DataFrame for a persistent table can
be created by calling the `table` method on a `SparkSession` with the name of the table.

By default `saveAsTable` will create a "managed table", meaning that the location of the data will
be controlled by the metastore. Managed tables will also have their data deleted automatically
when a table is dropped.

Currently, `saveAsTable` does not expose an API supporting the creation of an "External table" from a `DataFrame`,
however, this functionality can be achieved by providing a `path` option to the `DataFrameWriter` with `path` as the key
and location of the external table as its value (String) when saving the table with `saveAsTable`. When an External table
is dropped only its metadata is removed.
You can specify the table path via the `path` option, e.g. `df.write.option("path", "/some/path").saveAsTable("t")`.
When the table is dropped, the specified table path will not be removed and the table data is still there.
If you do not specify a table path, Spark SQL will generate a default table path to store the table data.
When the table is dropped, the default table path will be removed too.

## Parquet Files

Expand Down Expand Up @@ -947,6 +943,50 @@ adds support for finding tables in the MetaStore and writing queries using HiveQ
</div>
</div>

### Specifying storage format for Hive tables

When you create a Hive table, you need to define how this table should read/write data from/to file system,
i.e. the "input format" and "output format". You also need to define how this table should deserialize the data
to rows, or serialize rows to data, i.e. the "serde". The following options can be used to specify the storage
format("serde", "input format", "output format"), e.g. `CREATE TABLE src(id int) USING hive OPTIONS(format 'parquet')`.
By default, we will read the table files as plain text.

<table class="table">
<tr><th>Property Name</th><th>Meaning</th></tr>
<tr>
<td><code>format</code></td>
<td>
A format is kind of a package of storage format specification, including "serde", "input format" and
"output format". Currently we supports 6 formats: 'sequencefile', 'rcfile', 'orc', 'parquet', 'textfile' and 'avro'.
</td>
</tr>

<tr>
<td><code>inputFormat, outputFormat</code></td>
<td>
These 2 options specify the name of a corresponding `InputFormat` and `OutputFormat` class as a string literal,
e.g. `org.apache.hadoop.hive.ql.io.orc.OrcInputFormat`. These 2 options must be appeared in pair, and you can not
specify them if you already specified the `format` option.
</td>
</tr>

<tr>
<td><code>serde</code></td>
<td>
This option specifies the name of a serde class. When the `format` option is specified, do not specify this option
if the given `format` already include the information of serde. Currently "sequencefile", "textfile" and "rcfile"
don't include the serde information and you can use this option with these 3 formats.
</td>
</tr>

<tr>
<td><code>fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim</code></td>
<td>
These options can only be used with "textfile" format. They define how to read delimited files into rows.
</td>
</tr>
</table>

### Interacting with Different Versions of Hive Metastore

One of the most important pieces of Spark SQL's Hive support is interaction with Hive metastore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static void main(String[] args) {
.enableHiveSupport()
.getOrCreate();

spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");

// Queries are expressed in HiveQL
Expand Down
2 changes: 1 addition & 1 deletion examples/src/main/python/sql/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
.getOrCreate()

# spark is an existing SparkSession
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries are expressed in HiveQL
Expand Down
2 changes: 1 addition & 1 deletion examples/src/main/r/RSparkSQLExample.R
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ head(teenagers)
# $example on:spark_hive$
# enableHiveSupport defaults to TRUE
sparkR.session(enableHiveSupport = TRUE)
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries can be expressed in HiveQL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object SparkHiveExample {
import spark.implicits._
import spark.sql

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,18 @@ statement
| ALTER DATABASE identifier SET DBPROPERTIES tablePropertyList #setDatabaseProperties
| DROP DATABASE (IF EXISTS)? identifier (RESTRICT | CASCADE)? #dropDatabase
| createTableHeader ('(' colTypeList ')')? tableProvider
(OPTIONS tablePropertyList)?
(OPTIONS options=tablePropertyList)?
(PARTITIONED BY partitionColumnNames=identifierList)?
bucketSpec? (AS? query)? #createTableUsing
bucketSpec? locationSpec?
(COMMENT comment=STRING)?
(AS? query)? #createTable
| createTableHeader ('(' columns=colTypeList ')')?
(COMMENT STRING)?
(COMMENT comment=STRING)?
(PARTITIONED BY '(' partitionColumns=colTypeList ')')?
bucketSpec? skewSpec?
rowFormat? createFileFormat? locationSpec?
(TBLPROPERTIES tablePropertyList)?
(AS? query)? #createTable
(AS? query)? #createHiveTable
| CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier
LIKE source=tableIdentifier #createTableLike
| ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]

override def get(k: String): Option[String] = baseMap.get(k.toLowerCase)

override def contains(k: String): Boolean = baseMap.contains(k.toLowerCase)

override def + [B1 >: String](kv: (String, B1)): Map[String, B1] =
baseMap + kv.copy(_1 = kv._1.toLowerCase)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,42 +342,46 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}

/**
* Create a data source table, returning a [[CreateTable]] logical plan.
* Create a table, returning a [[CreateTable]] logical plan.
*
* Expected format:
* {{{
* CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
* CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name
* USING table_provider
* [OPTIONS table_property_list]
* [PARTITIONED BY (col_name, col_name, ...)]
* [CLUSTERED BY (col_name, col_name, ...)
* [SORTED BY (col_name [ASC|DESC], ...)]
* INTO num_buckets BUCKETS
* ]
* [TBLPROPERTIES (property_name=property_value, ...)]
* [COMMENT table_comment]
* [AS select_statement];
* }}}
*/
override def visitCreateTableUsing(ctx: CreateTableUsingContext): LogicalPlan = withOrigin(ctx) {
override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
if (external) {
operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx)
}
val options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
val provider = ctx.tableProvider.qualifiedName.getText
if (provider.toLowerCase == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Cannot create hive serde table with CREATE TABLE USING")
}
val schema = Option(ctx.colTypeList()).map(createSchema)
val partitionColumnNames =
Option(ctx.partitionColumnNames)
.map(visitIdentifierList(_).toArray)
.getOrElse(Array.empty[String])
val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)

// TODO: this may be wrong for non file-based data source like JDBC, which should be external
// even there is no `path` in options. We should consider allow the EXTERNAL keyword.
val location = Option(ctx.locationSpec).map(visitLocationSpec)
val storage = DataSource.buildStorageFormatFromOptions(options)
val tableType = if (storage.locationUri.isDefined) {

if (location.isDefined && storage.locationUri.isDefined) {
throw new ParseException("Cannot specify LOCATION when there is 'path' in OPTIONS.", ctx)
}
val customLocation = storage.locationUri.orElse(location)

val tableType = if (customLocation.isDefined) {
CatalogTableType.EXTERNAL
} else {
CatalogTableType.MANAGED
Expand All @@ -386,12 +390,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
val tableDesc = CatalogTable(
identifier = table,
tableType = tableType,
storage = storage,
storage = storage.copy(locationUri = customLocation),
schema = schema.getOrElse(new StructType),
provider = Some(provider),
partitionColumnNames = partitionColumnNames,
bucketSpec = bucketSpec
)
bucketSpec = bucketSpec,
comment = Option(ctx.comment).map(string))

// Determine the storage mode.
val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
Expand Down Expand Up @@ -1011,10 +1015,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}

/**
* Create a table, returning a [[CreateTable]] logical plan.
* Create a Hive serde table, returning a [[CreateTable]] logical plan.
*
* This is not used to create datasource tables, which is handled through
* "CREATE TABLE ... USING ...".
* This is a legacy syntax for Hive compatibility, we recommend users to use the Spark SQL
* CREATE TABLE syntax to create Hive serde table, e.g. "CREATE TABLE ... USING hive ..."
*
* Note: several features are currently not supported - temporary tables, bucketing,
* skewed columns and storage handlers (STORED BY).
Expand All @@ -1032,7 +1036,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
* [AS select_statement];
* }}}
*/
override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = withOrigin(ctx) {
val (name, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
// TODO: implement temporary tables
if (temp) {
Expand All @@ -1046,7 +1050,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
if (ctx.bucketSpec != null) {
operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx)
}
val comment = Option(ctx.STRING).map(string)
val dataCols = Option(ctx.columns).map(visitColTypeList).getOrElse(Nil)
val partitionCols = Option(ctx.partitionColumns).map(visitColTypeList).getOrElse(Nil)
val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
Expand Down Expand Up @@ -1104,7 +1107,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
provider = Some(DDLUtils.HIVE_PROVIDER),
partitionColumnNames = partitionCols.map(_.name),
properties = properties,
comment = comment)
comment = Option(ctx.comment).map(string))

val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

object DDLStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case CreateTable(tableDesc, mode, None)
if tableDesc.provider.get == DDLUtils.HIVE_PROVIDER =>
case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) =>
val cmd = CreateTableCommand(tableDesc, ifNotExists = mode == SaveMode.Ignore)
ExecutedCommandExec(cmd) :: Nil

Expand All @@ -432,8 +431,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// CREATE TABLE ... AS SELECT ... for hive serde table is handled in hive module, by rule
// `CreateTables`

case CreateTable(tableDesc, mode, Some(query))
if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER =>
case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isDatasourceTable(tableDesc) =>
val cmd =
CreateDataSourceTableAsSelectCommand(
tableDesc,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -763,8 +763,12 @@ case class AlterTableSetLocationCommand(
object DDLUtils {
val HIVE_PROVIDER = "hive"

def isHiveTable(table: CatalogTable): Boolean = {
table.provider.isDefined && table.provider.get.toLowerCase == HIVE_PROVIDER
}

def isDatasourceTable(table: CatalogTable): Boolean = {
table.provider.isDefined && table.provider.get != HIVE_PROVIDER
table.provider.isDefined && table.provider.get.toLowerCase != HIVE_PROVIDER
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
throw new AnalysisException("Saving data into a view is not allowed.")
}

if (existingTable.provider.get == DDLUtils.HIVE_PROVIDER) {
if (DDLUtils.isHiveTable(existingTable)) {
throw new AnalysisException(s"Saving data in the Hive serde table $tableName is " +
"not supported yet. Please use the insertInto() API as an alternative.")
}
Expand Down Expand Up @@ -233,7 +233,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
checkDuplication(normalizedPartitionCols, "partition")

if (schema.nonEmpty && normalizedPartitionCols.length == schema.length) {
if (table.provider.get == DDLUtils.HIVE_PROVIDER) {
if (DDLUtils.isHiveTable(table)) {
// When we hit this branch, it means users didn't specify schema for the table to be
// created, as we always include partition columns in table schema for hive serde tables.
// The real schema will be inferred at hive metastore by hive serde, plus the given
Expand Down Expand Up @@ -380,8 +380,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
object HiveOnlyCheck extends (LogicalPlan => Unit) {
def apply(plan: LogicalPlan): Unit = {
plan.foreach {
case CreateTable(tableDesc, _, Some(_))
if tableDesc.provider.get == DDLUtils.HIVE_PROVIDER =>
case CreateTable(tableDesc, _, Some(_)) if DDLUtils.isHiveTable(tableDesc) =>
throw new AnalysisException("Hive support is required to use CREATE Hive TABLE AS SELECT")

case _ => // OK
Expand Down
Loading

0 comments on commit a1dbf61

Please sign in to comment.