Skip to content

Commit

Permalink
[SPARK-23510][SQL] Support Hive 2.2 and Hive 2.3 metastore
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
This is based on apache#20668 for supporting Hive 2.2 and Hive 2.3 metastore.

When we merge the PR, we should give the major credit to wangyum

## How was this patch tested?
Added the test cases

Author: Yuming Wang <[email protected]>
Author: gatorsmile <[email protected]>

Closes apache#20671 from gatorsmile/pr-20668.
  • Loading branch information
wangyum authored and cloud-fan committed Mar 1, 2018
1 parent 22f3d33 commit ff14801
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private[spark] object HiveUtils extends Logging {

val HIVE_METASTORE_VERSION = buildConf("spark.sql.hive.metastore.version")
.doc("Version of the Hive metastore. Available options are " +
s"<code>0.12.0</code> through <code>2.1.1</code>.")
s"<code>0.12.0</code> through <code>2.3.2</code>.")
.stringConf
.createWithDefault(builtinHiveVersion)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.conf.HiveConf
Expand Down Expand Up @@ -104,6 +103,8 @@ private[hive] class HiveClientImpl(
case hive.v1_2 => new Shim_v1_2()
case hive.v2_0 => new Shim_v2_0()
case hive.v2_1 => new Shim_v2_1()
case hive.v2_2 => new Shim_v2_2()
case hive.v2_3 => new Shim_v2_3()
}

// Create an internal session state for this HiveClientImpl.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -880,9 +880,7 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {

}

private[client] class Shim_v1_0 extends Shim_v0_14 {

}
private[client] class Shim_v1_0 extends Shim_v0_14

private[client] class Shim_v1_1 extends Shim_v1_0 {

Expand Down Expand Up @@ -1146,3 +1144,7 @@ private[client] class Shim_v2_1 extends Shim_v2_0 {
alterPartitionsMethod.invoke(hive, tableName, newParts, environmentContextInAlterTable)
}
}

private[client] class Shim_v2_2 extends Shim_v2_1

private[client] class Shim_v2_3 extends Shim_v2_1
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ private[hive] object IsolatedClientLoader extends Logging {
case "1.2" | "1.2.0" | "1.2.1" | "1.2.2" => hive.v1_2
case "2.0" | "2.0.0" | "2.0.1" => hive.v2_0
case "2.1" | "2.1.0" | "2.1.1" => hive.v2_1
case "2.2" | "2.2.0" => hive.v2_2
case "2.3" | "2.3.0" | "2.3.1" | "2.3.2" => hive.v2_3
}

private def downloadVersion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,15 @@ package object client {
exclusions = Seq("org.apache.curator:*",
"org.pentaho:pentaho-aggdesigner-algorithm"))

val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1)
case object v2_2 extends HiveVersion("2.2.0",
exclusions = Seq("org.apache.curator:*",
"org.pentaho:pentaho-aggdesigner-algorithm"))

case object v2_3 extends HiveVersion("2.3.2",
exclusions = Seq("org.apache.curator:*",
"org.pentaho:pentaho-aggdesigner-algorithm"))

val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1, v2_2, v2_3)
}
// scalastyle:on

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
// staging directory under the table director for Hive prior to 1.1, the staging directory will
// be removed by Hive when Hive is trying to empty the table directory.
val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0)
val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = Set(v1_1, v1_2, v2_0, v2_1)
val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] =
Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3)

// Ensure all the supported versions are considered here.
assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath ==
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ import scala.collection.immutable.IndexedSeq
import org.apache.spark.SparkFunSuite

private[client] trait HiveClientVersions {
protected val versions = IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1")
protected val versions =
IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3")
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu
// Hive changed the default of datanucleus.schema.autoCreateAll from true to false and
// hive.metastore.schema.verification from false to true since 2.0
// For details, see the JIRA HIVE-6113 and HIVE-12463
if (version == "2.0" || version == "2.1") {
if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3") {
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
hadoopConf.set("hive.metastore.schema.verification", "false")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.{ByteArrayOutputStream, File, PrintStream, PrintWriter}
import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.mapred.TextInputFormat
Expand Down Expand Up @@ -110,7 +111,8 @@ class VersionsSuite extends SparkFunSuite with Logging {
assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'")
}

private val versions = Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1")
private val versions =
Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3")

private var client: HiveClient = null

Expand All @@ -125,7 +127,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
// Hive changed the default of datanucleus.schema.autoCreateAll from true to false and
// hive.metastore.schema.verification from false to true since 2.0
// For details, see the JIRA HIVE-6113 and HIVE-12463
if (version == "2.0" || version == "2.1") {
if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3") {
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
hadoopConf.set("hive.metastore.schema.verification", "false")
}
Expand Down Expand Up @@ -422,15 +424,18 @@ class VersionsSuite extends SparkFunSuite with Logging {

test(s"$version: alterPartitions") {
val spec = Map("key1" -> "1", "key2" -> "2")
val parameters = Map(StatsSetupConst.TOTAL_SIZE -> "0", StatsSetupConst.NUM_FILES -> "1")
val newLocation = new URI(Utils.createTempDir().toURI.toString.stripSuffix("/"))
val storage = storageFormat.copy(
locationUri = Some(newLocation),
// needed for 0.12 alter partitions
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
val partition = CatalogTablePartition(spec, storage)
val partition = CatalogTablePartition(spec, storage, parameters)
client.alterPartitions("default", "src_part", Seq(partition))
assert(client.getPartition("default", "src_part", spec)
.storage.locationUri == Some(newLocation))
assert(client.getPartition("default", "src_part", spec)
.parameters.get(StatsSetupConst.TOTAL_SIZE) == Some("0"))
}

test(s"$version: dropPartitions") {
Expand Down Expand Up @@ -633,6 +638,46 @@ class VersionsSuite extends SparkFunSuite with Logging {
}
}

test(s"$version: CREATE Partitioned TABLE AS SELECT") {
withTable("tbl") {
versionSpark.sql(
"""
|CREATE TABLE tbl(c1 string)
|PARTITIONED BY (ds STRING)
""".stripMargin)
versionSpark.sql("INSERT OVERWRITE TABLE tbl partition (ds='2') SELECT '1'")

assert(versionSpark.table("tbl").collect().toSeq == Seq(Row("1", "2")))
val partMeta = versionSpark.sessionState.catalog.getPartition(
TableIdentifier("tbl"), spec = Map("ds" -> "2")).parameters
val totalSize = partMeta.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
val numFiles = partMeta.get(StatsSetupConst.NUM_FILES).map(_.toLong)
// Except 0.12, all the following versions will fill the Hive-generated statistics
if (version == "0.12") {
assert(totalSize.isEmpty && numFiles.isEmpty)
} else {
assert(totalSize.nonEmpty && numFiles.nonEmpty)
}

versionSpark.sql(
"""
|ALTER TABLE tbl PARTITION (ds='2')
|SET SERDEPROPERTIES ('newKey' = 'vvv')
""".stripMargin)
val newPartMeta = versionSpark.sessionState.catalog.getPartition(
TableIdentifier("tbl"), spec = Map("ds" -> "2")).parameters

val newTotalSize = newPartMeta.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
val newNumFiles = newPartMeta.get(StatsSetupConst.NUM_FILES).map(_.toLong)
// Except 0.12, all the following versions will fill the Hive-generated statistics
if (version == "0.12") {
assert(newTotalSize.isEmpty && newNumFiles.isEmpty)
} else {
assert(newTotalSize.nonEmpty && newNumFiles.nonEmpty)
}
}
}

test(s"$version: Delete the temporary staging directory and files after each insert") {
withTempDir { tmpDir =>
withTable("tab") {
Expand Down

0 comments on commit ff14801

Please sign in to comment.