diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 4cd21aef91235..0f6e3446559b5 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -522,14 +522,11 @@ Hive metastore. Persistent tables will still exist even after your Spark program long as you maintain your connection to the same metastore. A DataFrame for a persistent table can be created by calling the `table` method on a `SparkSession` with the name of the table. -By default `saveAsTable` will create a "managed table", meaning that the location of the data will -be controlled by the metastore. Managed tables will also have their data deleted automatically -when a table is dropped. - -Currently, `saveAsTable` does not expose an API supporting the creation of an "external table" from a `DataFrame`. -However, this functionality can be achieved by providing a `path` option to the `DataFrameWriter` with `path` as the key -and location of the external table as its value (a string) when saving the table with `saveAsTable`. When an External table -is dropped only its metadata is removed. +For file-based data source, e.g. text, parquet, json, etc. you can specify a custom table path via the +`path` option, e.g. `df.write.option("path", "/some/path").saveAsTable("t")`. When the table is dropped, +the custom table path will not be removed and the table data is still there. If no custom table path is +specifed, Spark will write data to a default table path under the warehouse directory. When the table is +dropped, the default table path will be removed too. Starting from Spark 2.1, persistent datasource tables have per-partition metadata stored in the Hive metastore. This brings several benefits: @@ -954,6 +951,53 @@ adds support for finding tables in the MetaStore and writing queries using HiveQ +### Specifying storage format for Hive tables + +When you create a Hive table, you need to define how this table should read/write data from/to file system, +i.e. the "input format" and "output format". You also need to define how this table should deserialize the data +to rows, or serialize rows to data, i.e. the "serde". The following options can be used to specify the storage +format("serde", "input format", "output format"), e.g. `CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')`. +By default, we will read the table files as plain text. Note that, Hive storage handler is not supported yet when +creating table, you can create a table using storage handler at Hive side, and use Spark SQL to read it. + + + + + + + + + + + + + + + + + + + + + + +
Property NameMeaning
fileFormat + A fileFormat is kind of a package of storage format specifications, including "serde", "input format" and + "output format". Currently we support 6 fileFormats: 'sequencefile', 'rcfile', 'orc', 'parquet', 'textfile' and 'avro'. +
inputFormat, outputFormat + These 2 options specify the name of a corresponding `InputFormat` and `OutputFormat` class as a string literal, + e.g. `org.apache.hadoop.hive.ql.io.orc.OrcInputFormat`. These 2 options must be appeared in pair, and you can not + specify them if you already specified the `fileFormat` option. +
serde + This option specifies the name of a serde class. When the `fileFormat` option is specified, do not specify this option + if the given `fileFormat` already include the information of serde. Currently "sequencefile", "textfile" and "rcfile" + don't include the serde information and you can use this option with these 3 fileFormats. +
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim + These options can only be used with "textfile" fileFormat. They define how to read delimited files into rows. +
+ +All other properties defined with `OPTIONS` will be regarded as Hive serde properties. + ### Interacting with Different Versions of Hive Metastore One of the most important pieces of Spark SQL's Hive support is interaction with Hive metastore, diff --git a/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java b/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java index 052153c9e9736..8d06d38cf2c6b 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java @@ -64,7 +64,7 @@ public static void main(String[] args) { .enableHiveSupport() .getOrCreate(); - spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"); + spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive"); spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"); // Queries are expressed in HiveQL diff --git a/examples/src/main/python/sql/hive.py b/examples/src/main/python/sql/hive.py index ad83fe1cf14b5..ba01544a5bd2e 100644 --- a/examples/src/main/python/sql/hive.py +++ b/examples/src/main/python/sql/hive.py @@ -44,7 +44,7 @@ .getOrCreate() # spark is an existing SparkSession - spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") + spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") # Queries are expressed in HiveQL diff --git a/examples/src/main/r/RSparkSQLExample.R b/examples/src/main/r/RSparkSQLExample.R index 373a36dba14f0..e647f0e1e9f17 100644 --- a/examples/src/main/r/RSparkSQLExample.R +++ b/examples/src/main/r/RSparkSQLExample.R @@ -195,7 +195,7 @@ head(teenagers) # $example on:spark_hive$ # enableHiveSupport defaults to TRUE sparkR.session(enableHiveSupport = TRUE) -sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") +sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") # Queries can be expressed in HiveQL. diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala index ded18dacf1fe3..d29ed958fe8f4 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala @@ -50,7 +50,7 @@ object SparkHiveExample { import spark.implicits._ import spark.sql - sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") + sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index a34087cb6cd4a..3222a9cdc2c4e 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -69,16 +69,18 @@ statement | ALTER DATABASE identifier SET DBPROPERTIES tablePropertyList #setDatabaseProperties | DROP DATABASE (IF EXISTS)? identifier (RESTRICT | CASCADE)? #dropDatabase | createTableHeader ('(' colTypeList ')')? tableProvider - (OPTIONS tablePropertyList)? + (OPTIONS options=tablePropertyList)? (PARTITIONED BY partitionColumnNames=identifierList)? - bucketSpec? (AS? query)? #createTableUsing + bucketSpec? locationSpec? + (COMMENT comment=STRING)? + (AS? query)? #createTable | createTableHeader ('(' columns=colTypeList ')')? - (COMMENT STRING)? + (COMMENT comment=STRING)? (PARTITIONED BY '(' partitionColumns=colTypeList ')')? bucketSpec? skewSpec? rowFormat? createFileFormat? locationSpec? (TBLPROPERTIES tablePropertyList)? - (AS? query)? #createTable + (AS? query)? #createHiveTable | CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier LIKE source=tableIdentifier #createTableLike | ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala index a7f7a8a66382a..29e49a58375b9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala @@ -27,6 +27,8 @@ class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String] override def get(k: String): Option[String] = baseMap.get(k.toLowerCase) + override def contains(k: String): Boolean = baseMap.contains(k.toLowerCase) + override def + [B1 >: String](kv: (String, B1)): Map[String, B1] = baseMap + kv.copy(_1 = kv._1.toLowerCase) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 14a983e43b005..41768d451261a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -342,11 +342,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Create a data source table, returning a [[CreateTable]] logical plan. + * Create a table, returning a [[CreateTable]] logical plan. * * Expected format: * {{{ - * CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name + * CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name * USING table_provider * [OPTIONS table_property_list] * [PARTITIONED BY (col_name, col_name, ...)] @@ -354,19 +354,18 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { * [SORTED BY (col_name [ASC|DESC], ...)] * INTO num_buckets BUCKETS * ] + * [LOCATION path] + * [COMMENT table_comment] * [AS select_statement]; * }}} */ - override def visitCreateTableUsing(ctx: CreateTableUsingContext): LogicalPlan = withOrigin(ctx) { + override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) { val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader) if (external) { operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx) } - val options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) + val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) val provider = ctx.tableProvider.qualifiedName.getText - if (provider.toLowerCase == DDLUtils.HIVE_PROVIDER) { - throw new AnalysisException("Cannot create hive serde table with CREATE TABLE USING") - } val schema = Option(ctx.colTypeList()).map(createSchema) val partitionColumnNames = Option(ctx.partitionColumnNames) @@ -374,10 +373,17 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { .getOrElse(Array.empty[String]) val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec) - // TODO: this may be wrong for non file-based data source like JDBC, which should be external - // even there is no `path` in options. We should consider allow the EXTERNAL keyword. + val location = Option(ctx.locationSpec).map(visitLocationSpec) val storage = DataSource.buildStorageFormatFromOptions(options) - val tableType = if (storage.locationUri.isDefined) { + + if (location.isDefined && storage.locationUri.isDefined) { + throw new ParseException( + "LOCATION and 'path' in OPTIONS are both used to indicate the custom table path, " + + "you can only specify one of them.", ctx) + } + val customLocation = storage.locationUri.orElse(location) + + val tableType = if (customLocation.isDefined) { CatalogTableType.EXTERNAL } else { CatalogTableType.MANAGED @@ -386,12 +392,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { val tableDesc = CatalogTable( identifier = table, tableType = tableType, - storage = storage, + storage = storage.copy(locationUri = customLocation), schema = schema.getOrElse(new StructType), provider = Some(provider), partitionColumnNames = partitionColumnNames, - bucketSpec = bucketSpec - ) + bucketSpec = bucketSpec, + comment = Option(ctx.comment).map(string)) // Determine the storage mode. val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists @@ -1011,10 +1017,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Create a table, returning a [[CreateTable]] logical plan. + * Create a Hive serde table, returning a [[CreateTable]] logical plan. * - * This is not used to create datasource tables, which is handled through - * "CREATE TABLE ... USING ...". + * This is a legacy syntax for Hive compatibility, we recommend users to use the Spark SQL + * CREATE TABLE syntax to create Hive serde table, e.g. "CREATE TABLE ... USING hive ..." * * Note: several features are currently not supported - temporary tables, bucketing, * skewed columns and storage handlers (STORED BY). @@ -1032,7 +1038,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { * [AS select_statement]; * }}} */ - override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) { + override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = withOrigin(ctx) { val (name, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader) // TODO: implement temporary tables if (temp) { @@ -1046,7 +1052,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { if (ctx.bucketSpec != null) { operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx) } - val comment = Option(ctx.STRING).map(string) val dataCols = Option(ctx.columns).map(visitColTypeList).getOrElse(Nil) val partitionCols = Option(ctx.partitionColumns).map(visitColTypeList).getOrElse(Nil) val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) @@ -1057,19 +1062,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { val schema = StructType(dataCols ++ partitionCols) // Storage format - val defaultStorage: CatalogStorageFormat = { - val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile") - val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType) - CatalogStorageFormat( - locationUri = None, - inputFormat = defaultHiveSerde.flatMap(_.inputFormat) - .orElse(Some("org.apache.hadoop.mapred.TextInputFormat")), - outputFormat = defaultHiveSerde.flatMap(_.outputFormat) - .orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")), - serde = defaultHiveSerde.flatMap(_.serde), - compressed = false, - properties = Map()) - } + val defaultStorage = HiveSerDe.getDefaultStorage(conf) validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx) val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat) .getOrElse(CatalogStorageFormat.empty) @@ -1104,7 +1097,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { provider = Some(DDLUtils.HIVE_PROVIDER), partitionColumnNames = partitionCols.map(_.name), properties = properties, - comment = comment) + comment = Option(ctx.comment).map(string)) val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 28808f8e3ee14..1257d1728c311 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -408,8 +408,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object DDLStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case CreateTable(tableDesc, mode, None) - if tableDesc.provider.get == DDLUtils.HIVE_PROVIDER => + case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) => val cmd = CreateTableCommand(tableDesc, ifNotExists = mode == SaveMode.Ignore) ExecutedCommandExec(cmd) :: Nil @@ -421,8 +420,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // CREATE TABLE ... AS SELECT ... for hive serde table is handled in hive module, by rule // `CreateTables` - case CreateTable(tableDesc, mode, Some(query)) - if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER => + case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isDatasourceTable(tableDesc) => val cmd = CreateDataSourceTableAsSelectCommand( tableDesc, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 59a29e884739a..82cbb4aa47445 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -762,8 +762,12 @@ case class AlterTableSetLocationCommand( object DDLUtils { val HIVE_PROVIDER = "hive" + def isHiveTable(table: CatalogTable): Boolean = { + table.provider.isDefined && table.provider.get.toLowerCase == HIVE_PROVIDER + } + def isDatasourceTable(table: CatalogTable): Boolean = { - table.provider.isDefined && table.provider.get != HIVE_PROVIDER + table.provider.isDefined && table.provider.get.toLowerCase != HIVE_PROVIDER } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 07b16671f744b..94ba814fa51ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -109,7 +109,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl throw new AnalysisException("Saving data into a view is not allowed.") } - if (existingTable.provider.get == DDLUtils.HIVE_PROVIDER) { + if (DDLUtils.isHiveTable(existingTable)) { throw new AnalysisException(s"Saving data in the Hive serde table $tableName is " + "not supported yet. Please use the insertInto() API as an alternative.") } @@ -233,7 +233,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl checkDuplication(normalizedPartitionCols, "partition") if (schema.nonEmpty && normalizedPartitionCols.length == schema.length) { - if (table.provider.get == DDLUtils.HIVE_PROVIDER) { + if (DDLUtils.isHiveTable(table)) { // When we hit this branch, it means users didn't specify schema for the table to be // created, as we always include partition columns in table schema for hive serde tables. // The real schema will be inferred at hive metastore by hive serde, plus the given @@ -380,8 +380,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { object HiveOnlyCheck extends (LogicalPlan => Unit) { def apply(plan: LogicalPlan): Unit = { plan.foreach { - case CreateTable(tableDesc, _, Some(_)) - if tableDesc.provider.get == DDLUtils.HIVE_PROVIDER => + case CreateTable(tableDesc, _, Some(_)) if DDLUtils.isHiveTable(tableDesc) => throw new AnalysisException("Hive support is required to use CREATE Hive TABLE AS SELECT") case _ => // OK diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala index 52e648a917d8b..ca46a1151e3e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala @@ -17,12 +17,49 @@ package org.apache.spark.sql.internal +import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat + case class HiveSerDe( inputFormat: Option[String] = None, outputFormat: Option[String] = None, serde: Option[String] = None) object HiveSerDe { + val serdeMap = Map( + "sequencefile" -> + HiveSerDe( + inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"), + outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat")), + + "rcfile" -> + HiveSerDe( + inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"), + serde = Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")), + + "orc" -> + HiveSerDe( + inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"), + serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")), + + "parquet" -> + HiveSerDe( + inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"), + serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")), + + "textfile" -> + HiveSerDe( + inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")), + + "avro" -> + HiveSerDe( + inputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"), + serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe"))) + /** * Get the Hive SerDe information from the data source abbreviation string or classname. * @@ -31,41 +68,6 @@ object HiveSerDe { * @return HiveSerDe associated with the specified source */ def sourceToSerDe(source: String): Option[HiveSerDe] = { - val serdeMap = Map( - "sequencefile" -> - HiveSerDe( - inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"), - outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat")), - - "rcfile" -> - HiveSerDe( - inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"), - outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"), - serde = Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")), - - "orc" -> - HiveSerDe( - inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"), - outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"), - serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")), - - "parquet" -> - HiveSerDe( - inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), - outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"), - serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")), - - "textfile" -> - HiveSerDe( - inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"), - outputFormat = Option("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")), - - "avro" -> - HiveSerDe( - inputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"), - outputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"), - serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe"))) - val key = source.toLowerCase match { case s if s.startsWith("org.apache.spark.sql.parquet") => "parquet" case s if s.startsWith("org.apache.spark.sql.orc") => "orc" @@ -77,4 +79,16 @@ object HiveSerDe { serdeMap.get(key) } + + def getDefaultStorage(conf: SQLConf): CatalogStorageFormat = { + val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile") + val defaultHiveSerde = sourceToSerDe(defaultStorageType) + CatalogStorageFormat.empty.copy( + inputFormat = defaultHiveSerde.flatMap(_.inputFormat) + .orElse(Some("org.apache.hadoop.mapred.TextInputFormat")), + outputFormat = defaultHiveSerde.flatMap(_.outputFormat) + .orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")), + serde = defaultHiveSerde.flatMap(_.serde) + .orElse(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala index b070138be05d5..15e490fb30a27 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala @@ -121,7 +121,8 @@ class SparkSqlParserSuite extends PlanTest { tableType: CatalogTableType = CatalogTableType.MANAGED, storage: CatalogStorageFormat = CatalogStorageFormat.empty.copy( inputFormat = HiveSerDe.sourceToSerDe("textfile").get.inputFormat, - outputFormat = HiveSerDe.sourceToSerDe("textfile").get.outputFormat), + outputFormat = HiveSerDe.sourceToSerDe("textfile").get.outputFormat, + serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")), schema: StructType = new StructType, provider: Option[String] = Some("hive"), partitionColumnNames: Seq[String] = Seq.empty, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 1a5e5226c2ec9..76bb9e5929a71 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -236,7 +236,7 @@ class DDLCommandSuite extends PlanTest { comparePlans(parsed4, expected4) } - test("create table - table file format") { + test("create hive table - table file format") { val allSources = Seq("parquet", "parquetfile", "orc", "orcfile", "avro", "avrofile", "sequencefile", "rcfile", "textfile") @@ -245,13 +245,14 @@ class DDLCommandSuite extends PlanTest { val ct = parseAs[CreateTable](query) val hiveSerde = HiveSerDe.sourceToSerDe(s) assert(hiveSerde.isDefined) - assert(ct.tableDesc.storage.serde == hiveSerde.get.serde) + assert(ct.tableDesc.storage.serde == + hiveSerde.get.serde.orElse(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))) assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat) assert(ct.tableDesc.storage.outputFormat == hiveSerde.get.outputFormat) } } - test("create table - row format and table file format") { + test("create hive table - row format and table file format") { val createTableStart = "CREATE TABLE my_tab ROW FORMAT" val fileFormat = s"STORED AS INPUTFORMAT 'inputfmt' OUTPUTFORMAT 'outputfmt'" val query1 = s"$createTableStart SERDE 'anything' $fileFormat" @@ -262,13 +263,15 @@ class DDLCommandSuite extends PlanTest { assert(parsed1.tableDesc.storage.serde == Some("anything")) assert(parsed1.tableDesc.storage.inputFormat == Some("inputfmt")) assert(parsed1.tableDesc.storage.outputFormat == Some("outputfmt")) + val parsed2 = parseAs[CreateTable](query2) - assert(parsed2.tableDesc.storage.serde.isEmpty) + assert(parsed2.tableDesc.storage.serde == + Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) assert(parsed2.tableDesc.storage.inputFormat == Some("inputfmt")) assert(parsed2.tableDesc.storage.outputFormat == Some("outputfmt")) } - test("create table - row format serde and generic file format") { + test("create hive table - row format serde and generic file format") { val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", "textfile") val supportedSources = Set("sequencefile", "rcfile", "textfile") @@ -287,7 +290,7 @@ class DDLCommandSuite extends PlanTest { } } - test("create table - row format delimited and generic file format") { + test("create hive table - row format delimited and generic file format") { val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", "textfile") val supportedSources = Set("textfile") @@ -297,7 +300,8 @@ class DDLCommandSuite extends PlanTest { val ct = parseAs[CreateTable](query) val hiveSerde = HiveSerDe.sourceToSerDe(s) assert(hiveSerde.isDefined) - assert(ct.tableDesc.storage.serde == hiveSerde.get.serde) + assert(ct.tableDesc.storage.serde == + hiveSerde.get.serde.orElse(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))) assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat) assert(ct.tableDesc.storage.outputFormat == hiveSerde.get.outputFormat) } else { @@ -306,7 +310,7 @@ class DDLCommandSuite extends PlanTest { } } - test("create external table - location must be specified") { + test("create hive external table - location must be specified") { assertUnsupported( sql = "CREATE EXTERNAL TABLE my_tab", containsThesePhrases = Seq("create external table", "location")) @@ -316,7 +320,7 @@ class DDLCommandSuite extends PlanTest { assert(ct.tableDesc.storage.locationUri == Some("/something/anything")) } - test("create table - property values must be set") { + test("create hive table - property values must be set") { assertUnsupported( sql = "CREATE TABLE my_tab TBLPROPERTIES('key_without_value', 'key_with_value'='x')", containsThesePhrases = Seq("key_without_value")) @@ -326,14 +330,14 @@ class DDLCommandSuite extends PlanTest { containsThesePhrases = Seq("key_without_value")) } - test("create table - location implies external") { + test("create hive table - location implies external") { val query = "CREATE TABLE my_tab LOCATION '/something/anything'" val ct = parseAs[CreateTable](query) assert(ct.tableDesc.tableType == CatalogTableType.EXTERNAL) assert(ct.tableDesc.storage.locationUri == Some("/something/anything")) } - test("create table using - with partitioned by") { + test("create table - with partitioned by") { val query = "CREATE TABLE my_tab(a INT comment 'test', b STRING) " + "USING parquet PARTITIONED BY (a)" @@ -357,7 +361,7 @@ class DDLCommandSuite extends PlanTest { } } - test("create table using - with bucket") { + test("create table - with bucket") { val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " + "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS" @@ -379,6 +383,57 @@ class DDLCommandSuite extends PlanTest { } } + test("create table - with comment") { + val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'" + + val expectedTableDesc = CatalogTable( + identifier = TableIdentifier("my_tab"), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + schema = new StructType().add("a", IntegerType).add("b", StringType), + provider = Some("parquet"), + comment = Some("abc")) + + parser.parsePlan(sql) match { + case CreateTable(tableDesc, _, None) => + assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) + case other => + fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + + s"got ${other.getClass.getName}: $sql") + } + } + + test("create table - with location") { + val v1 = "CREATE TABLE my_tab(a INT, b STRING) USING parquet LOCATION '/tmp/file'" + + val expectedTableDesc = CatalogTable( + identifier = TableIdentifier("my_tab"), + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat.empty.copy(locationUri = Some("/tmp/file")), + schema = new StructType().add("a", IntegerType).add("b", StringType), + provider = Some("parquet")) + + parser.parsePlan(v1) match { + case CreateTable(tableDesc, _, None) => + assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) + case other => + fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + + s"got ${other.getClass.getName}: $v1") + } + + val v2 = + """ + |CREATE TABLE my_tab(a INT, b STRING) + |USING parquet + |OPTIONS (path '/tmp/file') + |LOCATION '/tmp/file' + """.stripMargin + val e = intercept[ParseException] { + parser.parsePlan(v2) + } + assert(e.message.contains("you can only specify one of them.")) + } + // ALTER TABLE table_name RENAME TO new_table_name; // ALTER VIEW view_name RENAME TO new_view_name; test("alter table/view: rename table/view") { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index fde6d4a94762d..474a2c868e20a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -215,7 +215,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat tableDefinition.storage.locationUri } - if (tableDefinition.provider.get == DDLUtils.HIVE_PROVIDER) { + if (DDLUtils.isHiveTable(tableDefinition)) { val tableWithDataSourceProps = tableDefinition.copy( // We can't leave `locationUri` empty and count on Hive metastore to set a default table // location, because Hive metastore uses hive.metastore.warehouse.dir to generate default @@ -533,7 +533,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } else { val oldTableDef = getRawTable(db, withStatsProps.identifier.table) - val newStorage = if (tableDefinition.provider.get == DDLUtils.HIVE_PROVIDER) { + val newStorage = if (DDLUtils.isHiveTable(tableDefinition)) { tableDefinition.storage } else { // We can't alter the table storage of data source table directly for 2 reasons: diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index bea073fb4848a..52892f1c7270c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -64,6 +64,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) AnalyzeCreateTable(sparkSession) :: PreprocessTableInsertion(conf) :: DataSourceAnalysis(conf) :: + new DetermineHiveSerde(conf) :: (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil) override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 773c4a39d854c..6d5cc5778aef8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -18,14 +18,73 @@ package org.apache.spark.sql.hive import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.command.ExecutedCommandExec +import org.apache.spark.sql.execution.command.{DDLUtils, ExecutedCommandExec} import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.hive.execution._ +import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} + + +/** + * Determine the serde/format of the Hive serde table, according to the storage properties. + */ +class DetermineHiveSerde(conf: SQLConf) extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case c @ CreateTable(t, _, query) if DDLUtils.isHiveTable(t) && t.storage.serde.isEmpty => + if (t.bucketSpec.isDefined) { + throw new AnalysisException("Creating bucketed Hive serde table is not supported yet.") + } + if (t.partitionColumnNames.nonEmpty && query.isDefined) { + val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " + + "create a partitioned table using Hive's file formats. " + + "Please use the syntax of \"CREATE TABLE tableName USING dataSource " + + "OPTIONS (...) PARTITIONED BY ...\" to create a partitioned table through a " + + "CTAS statement." + throw new AnalysisException(errorMessage) + } + + val defaultStorage = HiveSerDe.getDefaultStorage(conf) + val options = new HiveOptions(t.storage.properties) + + val fileStorage = if (options.fileFormat.isDefined) { + HiveSerDe.sourceToSerDe(options.fileFormat.get) match { + case Some(s) => + CatalogStorageFormat.empty.copy( + inputFormat = s.inputFormat, + outputFormat = s.outputFormat, + serde = s.serde) + case None => + throw new IllegalArgumentException(s"invalid fileFormat: '${options.fileFormat.get}'") + } + } else if (options.hasInputOutputFormat) { + CatalogStorageFormat.empty.copy( + inputFormat = options.inputFormat, + outputFormat = options.outputFormat) + } else { + CatalogStorageFormat.empty + } + + val rowStorage = if (options.serde.isDefined) { + CatalogStorageFormat.empty.copy(serde = options.serde) + } else { + CatalogStorageFormat.empty + } + + val storage = t.storage.copy( + inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat), + outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat), + serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde), + properties = options.serdeProperties) + + c.copy(tableDesc = t.copy(storage = storage)) + } +} private[hive] trait HiveStrategies { // Possibly being too clever with types here... or not clever enough. @@ -49,15 +108,7 @@ private[hive] trait HiveStrategies { InsertIntoHiveTable( table, partition, planLater(child), overwrite, ifNotExists) :: Nil - case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" => - val newTableDesc = if (tableDesc.storage.serde.isEmpty) { - // add default serde - tableDesc.withNewStorage( - serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) - } else { - tableDesc - } - + case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) => // Currently we will never hit this branch, as SQL string API can only use `Ignore` or // `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't support hive serde // tables yet. @@ -68,7 +119,7 @@ private[hive] trait HiveStrategies { val dbName = tableDesc.identifier.database.getOrElse(sparkSession.catalog.currentDatabase) val cmd = CreateHiveTableAsSelectCommand( - newTableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))), + tableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))), query, mode == SaveMode.Ignore) ExecutedCommandExec(cmd) :: Nil diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala new file mode 100644 index 0000000000000..35b7a681f12e0 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap + +/** + * Options for the Hive data source. Note that rule `DetermineHiveSerde` will extract Hive + * serde/format information from these options. + */ +class HiveOptions(@transient private val parameters: CaseInsensitiveMap) extends Serializable { + import HiveOptions._ + + def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters)) + + val fileFormat = parameters.get(FILE_FORMAT).map(_.toLowerCase) + val inputFormat = parameters.get(INPUT_FORMAT) + val outputFormat = parameters.get(OUTPUT_FORMAT) + + if (inputFormat.isDefined != outputFormat.isDefined) { + throw new IllegalArgumentException("Cannot specify only inputFormat or outputFormat, you " + + "have to specify both of them.") + } + + def hasInputOutputFormat: Boolean = inputFormat.isDefined + + if (fileFormat.isDefined && inputFormat.isDefined) { + throw new IllegalArgumentException("Cannot specify fileFormat and inputFormat/outputFormat " + + "together for Hive data source.") + } + + val serde = parameters.get(SERDE) + + if (fileFormat.isDefined && serde.isDefined) { + if (!Set("sequencefile", "textfile", "rcfile").contains(fileFormat.get)) { + throw new IllegalArgumentException( + s"fileFormat '${fileFormat.get}' already specifies a serde.") + } + } + + val containsDelimiters = delimiterOptions.keys.exists(parameters.contains) + + if (containsDelimiters) { + if (serde.isDefined) { + throw new IllegalArgumentException("Cannot specify delimiters with a custom serde.") + } + if (fileFormat.isEmpty) { + throw new IllegalArgumentException("Cannot specify delimiters without fileFormat.") + } + if (fileFormat.get != "textfile") { + throw new IllegalArgumentException("Cannot specify delimiters as they are only compatible " + + s"with fileFormat 'textfile', not ${fileFormat.get}.") + } + } + + for (lineDelim <- parameters.get("lineDelim") if lineDelim != "\n") { + throw new IllegalArgumentException("Hive data source only support newline '\\n' as " + + s"line delimiter, but given: $lineDelim.") + } + + def serdeProperties: Map[String, String] = parameters.filterKeys { + k => !lowerCasedOptionNames.contains(k.toLowerCase) + }.map { case (k, v) => delimiterOptions.getOrElse(k, k) -> v } +} + +object HiveOptions { + private val lowerCasedOptionNames = collection.mutable.Set[String]() + + private def newOption(name: String): String = { + lowerCasedOptionNames += name.toLowerCase + name + } + + val FILE_FORMAT = newOption("fileFormat") + val INPUT_FORMAT = newOption("inputFormat") + val OUTPUT_FORMAT = newOption("outputFormat") + val SERDE = newOption("serde") + + // A map from the public delimiter option keys to the underlying Hive serde property keys. + val delimiterOptions = Map( + "fieldDelim" -> "field.delim", + "escapeDelim" -> "escape.delim", + // The following typo is inherited from Hive... + "collectionDelim" -> "colelction.delim", + "mapkeyDelim" -> "mapkey.delim", + "lineDelim" -> "line.delim").map { case (k, v) => k.toLowerCase -> v } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala index 3f1f86c278db0..5a3fcd7a759c0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala @@ -27,7 +27,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.types.StructType -private[orc] object OrcFileOperator extends Logging { +private[hive] object OrcFileOperator extends Logging { /** * Retrieves an ORC file reader from a given path. The path can point to either a directory or a * single ORC file. If it points to a directory, it picks any non-empty ORC file within that diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index d13e29b3029b1..b67e5f6fe57a1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformati import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types.StructType @@ -51,6 +50,12 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle assert(e.getMessage.toLowerCase.contains("operation not allowed")) } + private def analyzeCreateTable(sql: String): CatalogTable = { + TestHive.sessionState.analyzer.execute(parser.parsePlan(sql)).collect { + case CreateTable(tableDesc, mode, _) => tableDesc + }.head + } + test("Test CTAS #1") { val s1 = """CREATE EXTERNAL TABLE IF NOT EXISTS mydb.page_view @@ -76,7 +81,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")) - assert(desc.properties == Map(("p1", "v1"), ("p2", "v2"))) + assert(desc.properties == Map("p1" -> "v1", "p2" -> "v2")) } test("Test CTAS #2") { @@ -107,7 +112,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle assert(desc.storage.inputFormat == Some("parquet.hive.DeprecatedParquetInputFormat")) assert(desc.storage.outputFormat == Some("parquet.hive.DeprecatedParquetOutputFormat")) assert(desc.storage.serde == Some("parquet.hive.serde.ParquetHiveSerDe")) - assert(desc.properties == Map(("p1", "v1"), ("p2", "v2"))) + assert(desc.properties == Map("p1" -> "v1", "p2" -> "v2")) } test("Test CTAS #3") { @@ -125,7 +130,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle assert(desc.storage.inputFormat == Some("org.apache.hadoop.mapred.TextInputFormat")) assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) - assert(desc.storage.serde.isEmpty) + assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) assert(desc.properties == Map()) } @@ -305,7 +310,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle Some("org.apache.hadoop.mapred.TextInputFormat")) assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) - assert(desc.storage.serde.isEmpty) + assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) assert(desc.storage.properties.isEmpty) assert(desc.properties.isEmpty) assert(desc.comment.isEmpty) @@ -412,7 +417,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle val (desc2, _) = extractTableDesc(query2) assert(desc1.storage.inputFormat == Some("winput")) assert(desc1.storage.outputFormat == Some("wowput")) - assert(desc1.storage.serde.isEmpty) + assert(desc1.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) assert(desc2.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat")) assert(desc2.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat")) assert(desc2.storage.serde == Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde")) @@ -592,4 +597,94 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client assert(hiveClient.getConf("hive.in.test", "") == "true") } + + test("create hive serde table with new syntax - basic") { + val sql = + """ + |CREATE TABLE t + |(id int, name string COMMENT 'blabla') + |USING hive + |OPTIONS (fileFormat 'parquet', my_prop 1) + |LOCATION '/tmp/file' + |COMMENT 'BLABLA' + """.stripMargin + + val table = analyzeCreateTable(sql) + assert(table.schema == new StructType() + .add("id", "int") + .add("name", "string", nullable = true, comment = "blabla")) + assert(table.provider == Some(DDLUtils.HIVE_PROVIDER)) + assert(table.storage.locationUri == Some("/tmp/file")) + assert(table.storage.properties == Map("my_prop" -> "1")) + assert(table.comment == Some("BLABLA")) + + assert(table.storage.inputFormat == + Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")) + assert(table.storage.outputFormat == + Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")) + assert(table.storage.serde == + Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")) + } + + test("create hive serde table with new syntax - with partition and bucketing") { + val v1 = "CREATE TABLE t (c1 int, c2 int) USING hive PARTITIONED BY (c2)" + val table = analyzeCreateTable(v1) + assert(table.schema == new StructType().add("c1", "int").add("c2", "int")) + assert(table.partitionColumnNames == Seq("c2")) + // check the default formats + assert(table.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) + assert(table.storage.inputFormat == Some("org.apache.hadoop.mapred.TextInputFormat")) + assert(table.storage.outputFormat == + Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) + + val v2 = "CREATE TABLE t (c1 int, c2 int) USING hive CLUSTERED BY (c2) INTO 4 BUCKETS" + val e2 = intercept[AnalysisException](analyzeCreateTable(v2)) + assert(e2.message.contains("Creating bucketed Hive serde table is not supported yet")) + + val v3 = + """ + |CREATE TABLE t (c1 int, c2 int) USING hive + |PARTITIONED BY (c2) + |CLUSTERED BY (c2) INTO 4 BUCKETS""".stripMargin + val e3 = intercept[AnalysisException](analyzeCreateTable(v3)) + assert(e3.message.contains("Creating bucketed Hive serde table is not supported yet")) + } + + test("create hive serde table with new syntax - Hive options error checking") { + val v1 = "CREATE TABLE t (c1 int) USING hive OPTIONS (inputFormat 'abc')" + val e1 = intercept[IllegalArgumentException](analyzeCreateTable(v1)) + assert(e1.getMessage.contains("Cannot specify only inputFormat or outputFormat")) + + val v2 = "CREATE TABLE t (c1 int) USING hive OPTIONS " + + "(fileFormat 'x', inputFormat 'a', outputFormat 'b')" + val e2 = intercept[IllegalArgumentException](analyzeCreateTable(v2)) + assert(e2.getMessage.contains( + "Cannot specify fileFormat and inputFormat/outputFormat together")) + + val v3 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fileFormat 'parquet', serde 'a')" + val e3 = intercept[IllegalArgumentException](analyzeCreateTable(v3)) + assert(e3.getMessage.contains("fileFormat 'parquet' already specifies a serde")) + + val v4 = "CREATE TABLE t (c1 int) USING hive OPTIONS (serde 'a', fieldDelim ' ')" + val e4 = intercept[IllegalArgumentException](analyzeCreateTable(v4)) + assert(e4.getMessage.contains("Cannot specify delimiters with a custom serde")) + + val v5 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fieldDelim ' ')" + val e5 = intercept[IllegalArgumentException](analyzeCreateTable(v5)) + assert(e5.getMessage.contains("Cannot specify delimiters without fileFormat")) + + val v6 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fileFormat 'parquet', fieldDelim ' ')" + val e6 = intercept[IllegalArgumentException](analyzeCreateTable(v6)) + assert(e6.getMessage.contains( + "Cannot specify delimiters as they are only compatible with fileFormat 'textfile'")) + + // The value of 'fileFormat' option is case-insensitive. + val v7 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fileFormat 'TEXTFILE', lineDelim ',')" + val e7 = intercept[IllegalArgumentException](analyzeCreateTable(v7)) + assert(e7.getMessage.contains("Hive data source only support newline '\\n' as line delimiter")) + + val v8 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fileFormat 'wrong')" + val e8 = intercept[IllegalArgumentException](analyzeCreateTable(v8)) + assert(e8.getMessage.contains("invalid fileFormat: 'wrong'")) + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index 6fee45824ea3f..2f02bb5d3b649 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -68,6 +68,6 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { val rawTable = externalCatalog.client.getTable("db1", "hive_tbl") assert(!rawTable.properties.contains(HiveExternalCatalog.DATASOURCE_PROVIDER)) - assert(externalCatalog.getTable("db1", "hive_tbl").provider == Some(DDLUtils.HIVE_PROVIDER)) + assert(DDLUtils.isHiveTable(externalCatalog.getTable("db1", "hive_tbl"))) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 2b8d4e2bb3bb5..aed825e2f3d26 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -1189,21 +1189,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv } } - test("create a data source table using hive") { - val tableName = "tab1" - withTable (tableName) { - val e = intercept[AnalysisException] { - sql( - s""" - |CREATE TABLE $tableName - |(col1 int) - |USING hive - """.stripMargin) - }.getMessage - assert(e.contains("Cannot create hive serde table with CREATE TABLE USING")) - } - } - test("create a temp view using hive") { val tableName = "tab1" withTable (tableName) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 8b34219530259..3ac07d093381c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, Cat import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.hive.HiveExternalCatalog +import org.apache.spark.sql.hive.orc.OrcFileOperator import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION @@ -1250,4 +1251,42 @@ class HiveDDLSuite assert(e.message.contains("unknown is not a valid partition column")) } } + + test("create hive serde table with new syntax") { + withTable("t", "t2", "t3") { + withTempPath { path => + sql( + s""" + |CREATE TABLE t(id int) USING hive + |OPTIONS(fileFormat 'orc', compression 'Zlib') + |LOCATION '${path.getCanonicalPath}' + """.stripMargin) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(DDLUtils.isHiveTable(table)) + assert(table.storage.serde == Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde")) + assert(table.storage.properties.get("compression") == Some("Zlib")) + assert(spark.table("t").collect().isEmpty) + + sql("INSERT INTO t SELECT 1") + checkAnswer(spark.table("t"), Row(1)) + // Check if this is compressed as ZLIB. + val maybeOrcFile = path.listFiles().find(_.getName.endsWith("part-00000")) + assert(maybeOrcFile.isDefined) + val orcFilePath = maybeOrcFile.get.toPath.toString + val expectedCompressionKind = + OrcFileOperator.getFileReader(orcFilePath).get.getCompression + assert("ZLIB" === expectedCompressionKind.name()) + + sql("CREATE TABLE t2 USING HIVE AS SELECT 1 AS c1, 'a' AS c2") + val table2 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t2")) + assert(DDLUtils.isHiveTable(table2)) + assert(table2.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) + checkAnswer(spark.table("t2"), Row(1, "a")) + + sql("CREATE TABLE t3(a int, p int) USING hive PARTITIONED BY (p)") + sql("INSERT INTO t3 PARTITION(p=1) SELECT 0") + checkAnswer(spark.table("t3"), Row(0, 1)) + } + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala index 463c368fc42b1..e678cf6f22c20 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala @@ -93,8 +93,6 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { .orc(path) // Check if this is compressed as ZLIB. - val conf = spark.sessionState.newHadoopConf() - val fs = FileSystem.getLocal(conf) val maybeOrcFile = new File(path).listFiles().find(_.getName.endsWith(".zlib.orc")) assert(maybeOrcFile.isDefined) val orcFilePath = maybeOrcFile.get.toPath.toString