Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19459][SQL] Add Hive datatype (char/varchar) to StructField metadata #16804

Closed
wants to merge 10 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -1457,8 +1457,28 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
*/
override def visitColType(ctx: ColTypeContext): StructField = withOrigin(ctx) {
import ctx._
val structField = StructField(identifier.getText, typedVisit(dataType), nullable = true)
if (STRING == null) structField else structField.withComment(string(STRING))

val builder = new MetadataBuilder
// Add comment to metadata
if (STRING != null) {
builder.putString("comment", string(STRING))
}
// Add Hive type string to metadata.
dataType match {
case p: PrimitiveDataTypeContext =>
p.identifier.getText.toLowerCase match {
case "varchar" | "char" =>
builder.putString(HIVE_TYPE_STRING, dataType.getText.toLowerCase)
case _ =>
}
case _ =>
}

StructField(
identifier.getText,
typedVisit(dataType),
nullable = true,
builder.build())
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,12 @@ package org.apache.spark.sql
* Contains a type system for attributes produced by relations, including complex types like
* structs, arrays and maps.
*/
package object types
package object types {
/**
* Metadata key used to store the raw hive type string in the metadata of StructField. This
* is relevant for datatypes that do not have a direct Spark SQL counterpart, such as CHAR and
* VARCHAR. We need to preserve the original type in order to invoke the correct object
* inspector in Hive.
*/
val HIVE_TYPE_STRING = "HIVE_TYPE_STRING"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we remove HiveUtils. HIVE_TYPE_STRING?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we should.

}
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ class TableScanSuite extends DataSourceTest with SharedSQLContext {
(2 to 10).map(i => Row(i, i - 1)).toSeq)

test("Schema and all fields") {
def hiveMetadata(dt: String): Metadata = {
new MetadataBuilder().putString(HIVE_TYPE_STRING, dt).build()
}

val expectedSchema = StructType(
StructField("string$%Field", StringType, true) ::
StructField("binaryField", BinaryType, true) ::
Expand All @@ -217,8 +221,8 @@ class TableScanSuite extends DataSourceTest with SharedSQLContext {
StructField("decimalField2", DecimalType(9, 2), true) ::
StructField("dateField", DateType, true) ::
StructField("timestampField", TimestampType, true) ::
StructField("varcharField", StringType, true) ::
StructField("charField", StringType, true) ::
StructField("varcharField", StringType, true, hiveMetadata("varchar(12)")) ::
StructField("charField", StringType, true, hiveMetadata("char(18)")) ::
StructField("arrayFieldSimple", ArrayType(IntegerType), true) ::
StructField("arrayFieldComplex",
ArrayType(
Expand Down
14 changes: 3 additions & 11 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,6 @@ private[spark] object HiveUtils extends Logging {
/** The version of hive used internally by Spark SQL. */
val hiveExecutionVersion: String = "1.2.1"

/**
* The property key that is used to store the raw hive type string in the metadata of StructField.
* For example, in the case where the Hive type is varchar, the type gets mapped to a string type
* in Spark SQL, but we need to preserve the original type in order to invoke the correct object
* inspector in Hive.
*/
val hiveTypeString: String = "HIVE_TYPE_STRING"

val HIVE_METASTORE_VERSION = buildConf("spark.sql.hive.metastore.version")
.doc("Version of the Hive metastore. Available options are " +
s"<code>0.12.0</code> through <code>$hiveExecutionVersion</code>.")
Expand Down Expand Up @@ -465,8 +457,8 @@ private[spark] object HiveUtils extends Logging {

/** Converts the native StructField to Hive's FieldSchema. */
private def toHiveColumn(c: StructField): FieldSchema = {
val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
c.metadata.getString(HiveUtils.hiveTypeString)
val typeString = if (c.metadata.contains(HIVE_TYPE_STRING)) {
c.metadata.getString(HIVE_TYPE_STRING)
} else {
c.dataType.catalogString
}
Expand All @@ -482,7 +474,7 @@ private[spark] object HiveUtils extends Logging {
throw new SparkException("Cannot recognize hive type string: " + hc.getType, e)
}

val metadata = new MetadataBuilder().putString(HiveUtils.hiveTypeString, hc.getType).build()
val metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, hc.getType).build()
val field = StructField(
name = hc.getName,
dataType = columnType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
import org.apache.spark.sql.types._
import org.apache.spark.util.{CircularBuffer, Utils}

/**
Expand Down Expand Up @@ -790,8 +790,8 @@ private[hive] class HiveClientImpl(
.asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]

private def toHiveColumn(c: StructField): FieldSchema = {
val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
c.metadata.getString(HiveUtils.hiveTypeString)
val typeString = if (c.metadata.contains(HIVE_TYPE_STRING)) {
c.metadata.getString(HIVE_TYPE_STRING)
} else {
c.dataType.catalogString
}
Expand All @@ -806,7 +806,7 @@ private[hive] class HiveClientImpl(
throw new SparkException("Cannot recognize hive type string: " + hc.getType, e)
}

val metadata = new MetadataBuilder().putString(HiveUtils.hiveTypeString, hc.getType).build()
val metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, hc.getType).build()
val field = StructField(
name = hc.getName,
dataType = columnType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,39 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
assert(new OrcOptions(Map("Orc.Compress" -> "NONE")).compressionCodec == "NONE")
}

test("SPARK-18220: read Hive orc table with varchar column") {
test("SPARK-19459/SPARK-18220: read char/varchar column written by Hive") {
val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
val location = Utils.createTempDir().toURI
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we remove this temp dir in the finally block?

try {
hiveClient.runSqlHive("CREATE TABLE orc_varchar(a VARCHAR(10)) STORED AS orc")
hiveClient.runSqlHive("INSERT INTO TABLE orc_varchar SELECT 'a' FROM (SELECT 1) t")
checkAnswer(spark.table("orc_varchar"), Row("a"))
hiveClient.runSqlHive(
"""
|CREATE EXTERNAL TABLE hive_orc(
| a STRING,
| b CHAR(10),
| c VARCHAR(10))
|STORED AS orc""".stripMargin)
// Hive throws an exception if I assign the location in the create table statement.
hiveClient.runSqlHive(
s"ALTER TABLE hive_orc SET LOCATION '$location'")
hiveClient.runSqlHive(
"INSERT INTO TABLE hive_orc SELECT 'a', 'b', 'c' FROM (SELECT 1) t")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding one more check?

      checkAnswer(spark.table("hive_orc"), Row("a", "b         ", "c"))

Then, we can remove the test case SPARK-18220: read Hive orc table with varchar column

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that makes sense

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// We create a different table in Spark using the same schema which points to
// the same location.
spark.sql(
s"""
|CREATE EXTERNAL TABLE spark_orc(
| a STRING,
| b CHAR(10),
| c VARCHAR(10))
|STORED AS orc
|LOCATION '$location'""".stripMargin)
val result = Row("a", "b ", "c")
checkAnswer(spark.table("hive_orc"), result)
checkAnswer(spark.table("spark_orc"), result)
} finally {
hiveClient.runSqlHive("DROP TABLE IF EXISTS orc_varchar")
hiveClient.runSqlHive("DROP TABLE IF EXISTS hive_orc")
hiveClient.runSqlHive("DROP TABLE IF EXISTS spark_orc")
}
}
}
Expand Down