Skip to content

Commit

Permalink
[SPARK-19459][SQL] Add Hive datatype (char/varchar) to StructField me…
Browse files Browse the repository at this point in the history
…tadata

## What changes were proposed in this pull request?
Reading from an existing ORC table which contains `char` or `varchar` columns can fail with a `ClassCastException` if the table metadata has been created using Spark. This is caused by the fact that spark internally replaces `char` and `varchar` columns with a `string` column.

This PR fixes this by adding the hive type to the `StructField's` metadata under the `HIVE_TYPE_STRING` key. This is picked up by the `HiveClient` and the ORC reader, see apache#16060 for more details on how the metadata is used.

## How was this patch tested?
Added a regression test to `OrcSourceSuite`.

Author: Herman van Hovell <[email protected]>

Closes apache#16804 from hvanhovell/SPARK-19459.
  • Loading branch information
hvanhovell authored and cmonkey committed Feb 15, 2017
1 parent 1170fea commit 709dc77
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1457,8 +1457,28 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
*/
override def visitColType(ctx: ColTypeContext): StructField = withOrigin(ctx) {
import ctx._
val structField = StructField(identifier.getText, typedVisit(dataType), nullable = true)
if (STRING == null) structField else structField.withComment(string(STRING))

val builder = new MetadataBuilder
// Add comment to metadata
if (STRING != null) {
builder.putString("comment", string(STRING))
}
// Add Hive type string to metadata.
dataType match {
case p: PrimitiveDataTypeContext =>
p.identifier.getText.toLowerCase match {
case "varchar" | "char" =>
builder.putString(HIVE_TYPE_STRING, dataType.getText.toLowerCase)
case _ =>
}
case _ =>
}

StructField(
identifier.getText,
typedVisit(dataType),
nullable = true,
builder.build())
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,12 @@ package org.apache.spark.sql
* Contains a type system for attributes produced by relations, including complex types like
* structs, arrays and maps.
*/
package object types
package object types {
/**
* Metadata key used to store the raw hive type string in the metadata of StructField. This
* is relevant for datatypes that do not have a direct Spark SQL counterpart, such as CHAR and
* VARCHAR. We need to preserve the original type in order to invoke the correct object
* inspector in Hive.
*/
val HIVE_TYPE_STRING = "HIVE_TYPE_STRING"
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ class TableScanSuite extends DataSourceTest with SharedSQLContext {
(2 to 10).map(i => Row(i, i - 1)).toSeq)

test("Schema and all fields") {
def hiveMetadata(dt: String): Metadata = {
new MetadataBuilder().putString(HIVE_TYPE_STRING, dt).build()
}

val expectedSchema = StructType(
StructField("string$%Field", StringType, true) ::
StructField("binaryField", BinaryType, true) ::
Expand All @@ -217,8 +221,8 @@ class TableScanSuite extends DataSourceTest with SharedSQLContext {
StructField("decimalField2", DecimalType(9, 2), true) ::
StructField("dateField", DateType, true) ::
StructField("timestampField", TimestampType, true) ::
StructField("varcharField", StringType, true) ::
StructField("charField", StringType, true) ::
StructField("varcharField", StringType, true, hiveMetadata("varchar(12)")) ::
StructField("charField", StringType, true, hiveMetadata("char(18)")) ::
StructField("arrayFieldSimple", ArrayType(IntegerType), true) ::
StructField("arrayFieldComplex",
ArrayType(
Expand Down
14 changes: 3 additions & 11 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,6 @@ private[spark] object HiveUtils extends Logging {
/** The version of hive used internally by Spark SQL. */
val hiveExecutionVersion: String = "1.2.1"

/**
* The property key that is used to store the raw hive type string in the metadata of StructField.
* For example, in the case where the Hive type is varchar, the type gets mapped to a string type
* in Spark SQL, but we need to preserve the original type in order to invoke the correct object
* inspector in Hive.
*/
val hiveTypeString: String = "HIVE_TYPE_STRING"

val HIVE_METASTORE_VERSION = buildConf("spark.sql.hive.metastore.version")
.doc("Version of the Hive metastore. Available options are " +
s"<code>0.12.0</code> through <code>$hiveExecutionVersion</code>.")
Expand Down Expand Up @@ -465,8 +457,8 @@ private[spark] object HiveUtils extends Logging {

/** Converts the native StructField to Hive's FieldSchema. */
private def toHiveColumn(c: StructField): FieldSchema = {
val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
c.metadata.getString(HiveUtils.hiveTypeString)
val typeString = if (c.metadata.contains(HIVE_TYPE_STRING)) {
c.metadata.getString(HIVE_TYPE_STRING)
} else {
c.dataType.catalogString
}
Expand All @@ -482,7 +474,7 @@ private[spark] object HiveUtils extends Logging {
throw new SparkException("Cannot recognize hive type string: " + hc.getType, e)
}

val metadata = new MetadataBuilder().putString(HiveUtils.hiveTypeString, hc.getType).build()
val metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, hc.getType).build()
val field = StructField(
name = hc.getName,
dataType = columnType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
import org.apache.spark.sql.types._
import org.apache.spark.util.{CircularBuffer, Utils}

/**
Expand Down Expand Up @@ -790,8 +790,8 @@ private[hive] class HiveClientImpl(
.asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]

private def toHiveColumn(c: StructField): FieldSchema = {
val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
c.metadata.getString(HiveUtils.hiveTypeString)
val typeString = if (c.metadata.contains(HIVE_TYPE_STRING)) {
c.metadata.getString(HIVE_TYPE_STRING)
} else {
c.dataType.catalogString
}
Expand All @@ -806,7 +806,7 @@ private[hive] class HiveClientImpl(
throw new SparkException("Cannot recognize hive type string: " + hc.getType, e)
}

val metadata = new MetadataBuilder().putString(HiveUtils.hiveTypeString, hc.getType).build()
val metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, hc.getType).build()
val field = StructField(
name = hc.getName,
dataType = columnType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,41 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
assert(new OrcOptions(Map("Orc.Compress" -> "NONE")).compressionCodec == "NONE")
}

test("SPARK-18220: read Hive orc table with varchar column") {
test("SPARK-19459/SPARK-18220: read char/varchar column written by Hive") {
val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
val location = Utils.createTempDir()
val uri = location.toURI
try {
hiveClient.runSqlHive("CREATE TABLE orc_varchar(a VARCHAR(10)) STORED AS orc")
hiveClient.runSqlHive("INSERT INTO TABLE orc_varchar SELECT 'a' FROM (SELECT 1) t")
checkAnswer(spark.table("orc_varchar"), Row("a"))
hiveClient.runSqlHive(
"""
|CREATE EXTERNAL TABLE hive_orc(
| a STRING,
| b CHAR(10),
| c VARCHAR(10))
|STORED AS orc""".stripMargin)
// Hive throws an exception if I assign the location in the create table statement.
hiveClient.runSqlHive(
s"ALTER TABLE hive_orc SET LOCATION '$uri'")
hiveClient.runSqlHive(
"INSERT INTO TABLE hive_orc SELECT 'a', 'b', 'c' FROM (SELECT 1) t")

// We create a different table in Spark using the same schema which points to
// the same location.
spark.sql(
s"""
|CREATE EXTERNAL TABLE spark_orc(
| a STRING,
| b CHAR(10),
| c VARCHAR(10))
|STORED AS orc
|LOCATION '$uri'""".stripMargin)
val result = Row("a", "b ", "c")
checkAnswer(spark.table("hive_orc"), result)
checkAnswer(spark.table("spark_orc"), result)
} finally {
hiveClient.runSqlHive("DROP TABLE IF EXISTS orc_varchar")
hiveClient.runSqlHive("DROP TABLE IF EXISTS hive_orc")
hiveClient.runSqlHive("DROP TABLE IF EXISTS spark_orc")
Utils.deleteRecursively(location)
}
}
}
Expand Down

0 comments on commit 709dc77

Please sign in to comment.