Skip to content

Commit

Permalink
[SPARK-26571][SQL] Update Hive Serde mapping with canonical name of P…
Browse files Browse the repository at this point in the history
…arquet and Orc FileFormat

## What changes were proposed in this pull request?

Currently Spark table maintains Hive catalog storage format, so that Hive client can read it.  In `HiveSerDe.scala`, Spark uses a mapping from its data source to HiveSerde. The mapping is old, we need to update with latest canonical name of Parquet and Orc FileFormat.

Otherwise the following queries will result in wrong Serde value in Hive table(default value `org.apache.hadoop.mapred.SequenceFileInputFormat`), and Hive client will fail to read the output table:
```
df.write.format("org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat").saveAsTable(..)
```

```
df.write.format("org.apache.spark.sql.execution.datasources.orc.OrcFileFormat").saveAsTable(..)
```

This minor PR is to fix the mapping.

## How was this patch tested?

Unit test.

Closes apache#23491 from gengliangwang/fixHiveSerdeMap.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
gengliangwang authored and cloud-fan committed Jan 9, 2019
1 parent 32515d2 commit 311f32f
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ object HiveSerDe {
def sourceToSerDe(source: String): Option[HiveSerDe] = {
val key = source.toLowerCase(Locale.ROOT) match {
case s if s.startsWith("org.apache.spark.sql.parquet") => "parquet"
case s if s.startsWith("org.apache.spark.sql.execution.datasources.parquet") => "parquet"
case s if s.startsWith("org.apache.spark.sql.orc") => "orc"
case s if s.startsWith("org.apache.spark.sql.hive.orc") => "orc"
case s if s.startsWith("org.apache.spark.sql.execution.datasources.orc") => "orc"
case s if s.equals("orcfile") => "orc"
case s if s.equals("parquetfile") => "parquet"
case s if s.equals("avrofile") => "avro"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,28 @@ class DataSourceWithHiveMetastoreCatalogSuite
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
)),

"org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat" -> ((
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
)),

"orc" -> ((
"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat",
"org.apache.hadoop.hive.ql.io.orc.OrcSerde"
)),

"org.apache.spark.sql.hive.orc" -> ((
"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat",
"org.apache.hadoop.hive.ql.io.orc.OrcSerde"
)),

"org.apache.spark.sql.execution.datasources.orc.OrcFileFormat" -> ((
"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat",
"org.apache.hadoop.hive.ql.io.orc.OrcSerde"
))
).foreach { case (provider, (inputFormat, outputFormat, serde)) =>
test(s"Persist non-partitioned $provider relation into metastore as managed table") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@ import java.io.File

import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.TestingUDT.{IntervalData, IntervalUDT}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.datasources.orc.OrcSuite
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -67,33 +65,6 @@ class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton {
""".stripMargin)
}

test("SPARK-22972: hive orc source") {
val tableName = "normal_orc_as_source_hive"
withTable(tableName) {
sql(
s"""
|CREATE TABLE $tableName
|USING org.apache.spark.sql.hive.orc
|OPTIONS (
| PATH '${new File(orcTableAsDir.getAbsolutePath).toURI}'
|)
""".stripMargin)

val tableMetadata = spark.sessionState.catalog.getTableMetadata(
TableIdentifier(tableName))
assert(tableMetadata.storage.inputFormat ==
Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"))
assert(tableMetadata.storage.outputFormat ==
Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"))
assert(tableMetadata.storage.serde ==
Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
assert(HiveSerDe.sourceToSerDe("org.apache.spark.sql.hive.orc")
.equals(HiveSerDe.sourceToSerDe("orc")))
assert(HiveSerDe.sourceToSerDe("org.apache.spark.sql.orc")
.equals(HiveSerDe.sourceToSerDe("orc")))
}
}

test("SPARK-19459/SPARK-18220: read char/varchar column written by Hive") {
val location = Utils.createTempDir()
val uri = location.toURI
Expand Down

0 comments on commit 311f32f

Please sign in to comment.