diff --git a/assembly/pom.xml b/assembly/pom.xml index de7b75258e3c5..cb66a14a48d02 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -188,6 +188,22 @@ + + hive-0.13 + + + hive.version + 0.13.1 + + + + + org.apache.spark + spark-hive_${scala.binary.version} + ${project.version} + + + spark-ganglia-lgpl diff --git a/pom.xml b/pom.xml index a5eaea80afd71..94302f97b7951 100644 --- a/pom.xml +++ b/pom.xml @@ -127,7 +127,6 @@ 0.94.6 1.4.0 3.4.5 - 0.12.0 1.4.3 1.2.3 8.1.14.v20131031 @@ -427,7 +426,7 @@ org.apache.derby derby - 10.4.2.0 + ${derby.version} com.codahale.metrics @@ -1225,7 +1224,18 @@ - + + hive-default + + + !hive.version + + + + 0.12.0 + 10.4.2.0 + + hive @@ -1235,6 +1245,18 @@ sql/hive-thriftserver + + hive-0.13 + + + hive.version + 0.13.1 + + + + 10.10.1.1 + + diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index 30ff277e67c88..67f0f46507070 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -36,11 +36,6 @@ - - com.twitter - parquet-hive-bundle - 1.5.0 - org.apache.spark spark-core_${scala.binary.version} @@ -51,46 +46,15 @@ spark-sql_${scala.binary.version} ${project.version} - - org.spark-project.hive - hive-metastore - ${hive.version} - commons-httpclient commons-httpclient 3.1 - - org.spark-project.hive - hive-exec - ${hive.version} - - - commons-logging - commons-logging - - - org.codehaus.jackson jackson-mapper-asl - - org.spark-project.hive - hive-serde - ${hive.version} - - - commons-logging - commons-logging - - - commons-logging - commons-logging-api - - - org.apache.avro @@ -109,6 +73,74 @@ + + hive-default + + + !hive.version + + + + + com.twitter + parquet-hive-bundle + 1.5.0 + + + org.spark-project.hive + hive-metastore + ${hive.version} + + + org.spark-project.hive + hive-exec + ${hive.version} + + + commons-logging + commons-logging + + + + + org.spark-project.hive + hive-serde + ${hive.version} + + + commons-logging + commons-logging + + + commons-logging + commons-logging-api + + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-default-sources + generate-sources + + add-source + + + + v0.12/src/main/scala + + + + + + + + hive @@ -135,6 +167,82 @@ + + hive-0.13 + + + hive.version + 0.13.1 + + + + + org.apache.hive + hive-metastore + ${hive.version} + + + org.apache.hive + hive-exec + ${hive.version} + + + commons-logging + commons-logging + + + + + org.apache.hive + hive-serde + ${hive.version} + + + commons-logging + commons-logging + + + commons-logging + commons-logging-api + + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-v13-sources + generate-sources + + add-source + + + + v0.13/src/main/scala + + + + + add-scala-test-sources + generate-test-sources + + add-test-source + + + + src/test/scala + + + + + + + + diff --git a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index ab7862f4f9e06..cac5b1b4ee190 100644 --- a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -24,11 +24,12 @@ import java.util.Date import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities} import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat} -import org.apache.hadoop.hive.ql.plan.FileSinkDesc import org.apache.hadoop.mapred._ import org.apache.hadoop.io.Writable import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter} +import org.apache.spark.sql.hive.{ShimFileSinkDesc => FileSinkDesc} +import org.apache.spark.sql.hive.HiveShim._ /** * Internal helper class that saves an RDD using a Hive OutputFormat. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index d9b2bc7348ad2..717cdafac1e15 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -32,7 +32,6 @@ import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.metadata.Table import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState -import org.apache.hadoop.hive.ql.stats.StatsSetupConst import org.apache.hadoop.hive.serde2.io.TimestampWritable import org.apache.spark.SparkContext @@ -46,6 +45,8 @@ import org.apache.spark.sql.execution.ExtractPythonUdfs import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.{Command => PhysicalCommand} import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand +import org.apache.spark.sql.hive.HiveShim +import org.apache.spark.sql.hive.HiveShim._ /** * DEPRECATED: Use HiveContext instead. @@ -170,13 +171,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { val tableParameters = relation.hiveQlTable.getParameters val oldTotalSize = - Option(tableParameters.get(StatsSetupConst.TOTAL_SIZE)).map(_.toLong).getOrElse(0L) + Option(tableParameters.get(HiveShim.getStatsSetupConstTotalSize)). + map(_.toLong).getOrElse(0L) val newTotalSize = getFileSizeForTable(hiveconf, relation.hiveQlTable) // Update the Hive metastore if the total size of the table is different than the size // recorded in the Hive metastore. // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats(). if (newTotalSize > 0 && newTotalSize != oldTotalSize) { - tableParameters.put(StatsSetupConst.TOTAL_SIZE, newTotalSize.toString) + tableParameters.put(HiveShim.getStatsSetupConstTotalSize, newTotalSize.toString) val hiveTTable = relation.hiveQlTable.getTTable hiveTTable.setParameters(tableParameters) val tableFullName = @@ -286,24 +288,20 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { val cmd_trimmed: String = cmd.trim() val tokens: Array[String] = cmd_trimmed.split("\\s+") val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim() - val proc: CommandProcessor = CommandProcessorFactory.get(tokens(0), hiveconf) - - SessionState.start(sessionState) + val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hiveconf) proc match { case driver: Driver => - driver.init() - val results = new JArrayList[String] val response: CommandProcessorResponse = driver.run(cmd) // Throw an exception if there is an error in query processing. if (response.getResponseCode != 0) { - driver.destroy() + driver.close throw new QueryExecutionException(response.getErrorMessage) } driver.setMaxRows(maxRows) driver.getResults(results) - driver.destroy() + driver.close results case _ => sessionState.out.println(tokens(0) + " " + cmd_1) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 943bbaa8ce25e..068a8285859c1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.types._ /* Implicit conversions */ import scala.collection.JavaConversions._ +import org.apache.spark.sql.hive.HiveShim private[hive] trait HiveInspectors { @@ -137,7 +138,7 @@ private[hive] trait HiveInspectors { /** Converts native catalyst types to the types expected by Hive */ def wrap(a: Any): AnyRef = a match { - case s: String => new hadoopIo.Text(s) // TODO why should be Text? + case s: String => HiveShim.convertCatalystString2Hive(s) case i: Int => i: java.lang.Integer case b: Boolean => b: java.lang.Boolean case f: Float => f: java.lang.Float diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 6571c35499ef4..1f5d64328a228 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -23,7 +23,6 @@ import org.apache.hadoop.hive.metastore.api.{FieldSchema, StorageDescriptor, Ser import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition} import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table} import org.apache.hadoop.hive.ql.plan.TableDesc -import org.apache.hadoop.hive.ql.stats.StatsSetupConst import org.apache.hadoop.hive.serde2.Deserializer import org.apache.spark.annotation.DeveloperApi @@ -39,6 +38,8 @@ import org.apache.spark.sql.columnar.InMemoryRelation import org.apache.spark.sql.hive.execution.HiveTableScan import org.apache.spark.util.Utils +import org.apache.spark.sql.hive.HiveShim +import org.apache.spark.sql.hive.HiveShim._ /* Implicit conversions */ import scala.collection.JavaConversions._ @@ -59,7 +60,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val table = client.getTable(databaseName, tblName) val partitions: Seq[Partition] = if (table.isPartitioned) { - client.getAllPartitionsForPruner(table).toSeq + client.getAllPartitionsOf(table).toSeq } else { Nil } @@ -180,6 +181,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with override def unregisterAllTables() = {} } + /** * :: DeveloperApi :: * Provides conversions between Spark SQL data types and Hive Metastore types. @@ -196,7 +198,7 @@ object HiveMetastoreTypes extends RegexParsers { "bigint" ^^^ LongType | "binary" ^^^ BinaryType | "boolean" ^^^ BooleanType | - "decimal" ^^^ DecimalType | + HiveShim.metastoreDecimal ^^^ DecimalType | "timestamp" ^^^ TimestampType | "varchar\\((\\d+)\\)".r ^^^ StringType @@ -280,13 +282,13 @@ private[hive] case class MetastoreRelation // of RPCs are involved. Besides `totalSize`, there are also `numFiles`, `numRows`, // `rawDataSize` keys (see StatsSetupConst in Hive) that we can look at in the future. BigInt( - Option(hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)) + Option(hiveQlTable.getParameters.get(HiveShim.getStatsSetupConstTotalSize)) .map(_.toLong) .getOrElse(sqlContext.defaultSizeInBytes)) } ) - val tableDesc = new TableDesc( + val tableDesc = HiveShim.getTableDesc( Class.forName( hiveQlTable.getSerializationLib, true, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index a4dd6be5f9e35..a30f8c3d35188 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -27,6 +27,8 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.types._ +import org.apache.hadoop.hive.ql.Context +import org.apache.hadoop.hive.conf.HiveConf /* Implicit conversions */ import scala.collection.JavaConversions._ @@ -210,7 +212,13 @@ private[hive] object HiveQl { /** * Returns the AST for the given SQL string. */ - def getAst(sql: String): ASTNode = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql)) + def getAst(sql: String): ASTNode = { + val hContext = new Context(new HiveConf()) + val node = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql, hContext)) + hContext.clear + node + } + /** Returns a LogicalPlan for a given HiveQL string. */ def parseSql(sql: String): LogicalPlan = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 329f80cad471e..552e71777b64f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -35,6 +35,7 @@ import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD} import org.apache.spark.sql.catalyst.expressions.{Attribute, Row, GenericMutableRow, Literal, Cast} import org.apache.spark.sql.catalyst.types.DataType +import org.apache.spark.sql.hive.HiveShim._ /** * A trait for subclasses that handle table scans. @@ -142,7 +143,7 @@ class HadoopTableReader( filterOpt: Option[PathFilter]): RDD[Row] = { val hivePartitionRDDs = partitionToDeserializer.map { case (partition, partDeserializer) => val partDesc = Utilities.getPartitionDesc(partition) - val partPath = partition.getPartitionPath + val partPath = partition.getDataLocationPath val inputPathStr = applyFilterIfNeeded(partPath, filterOpt) val ifc = partDesc.getInputFileFormatClass .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index a013f3f7a805f..aacff83ce24c8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.plans.logical.{CacheCommand, LogicalPlan, NativeCommand} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.hive._ - +import org.apache.hadoop.hive.ql.processors._ /* Implicit conversions */ import scala.collection.JavaConversions._ @@ -76,7 +76,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { // For some hive test case which contain ${system:test.tmp.dir} System.setProperty("test.tmp.dir", testTempDir.getCanonicalPath) - + CommandProcessorFactory.clean(hiveconf); configure() // Must be called before initializing the catalog below. /** The location of the compiled hive distribution */ @@ -297,6 +297,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { * tests. */ protected val originalUdfs: JavaSet[String] = FunctionRegistry.getFunctionNames + HiveShim.createDefaultDBIfNeeded(this) /** * Resets the test instance by deleting any tables that have been created. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala index a40e89e0d382b..7f891a5171cc6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala @@ -26,6 +26,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow, Row} import org.apache.spark.sql.execution.{Command, LeafNode} import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation} +import org.apache.spark.sql.hive.HiveShim /** * Implementation for "describe [extended] table". @@ -49,7 +50,8 @@ case class DescribeHiveTableCommand( case (name, dataType, comment) => String.format("%-" + alignment + "s", name) + delim + String.format("%-" + alignment + "s", dataType) + delim + - String.format("%-" + alignment + "s", Option(comment).getOrElse("None")) + String.format("%-" + alignment + "s", Option(comment). + getOrElse(HiveShim.getEmptyCommentsFieldValue)) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala index 577ca928b43b6..5e6b2ee76aa63 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala @@ -23,7 +23,7 @@ import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar} import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} import org.apache.hadoop.hive.serde.serdeConstants -import org.apache.hadoop.hive.serde2.ColumnProjectionUtils +import org.apache.spark.sql.hive.HiveShim import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption import org.apache.hadoop.hive.serde2.objectinspector.primitive._ @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.types.{BooleanType, DataType} import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive._ + /** * :: DeveloperApi :: * The Hive table scan operator. Column and partition pruning are both handled. @@ -71,14 +72,14 @@ case class HiveTableScan( Cast(Literal(value), dataType).eval(null) } + private def addColumnMetadataToConf(hiveConf: HiveConf) { // Specifies needed column IDs for those non-partitioning columns. val neededColumnIDs = attributes.map(a => relation.attributes.indexWhere(_.name == a.name): Integer).filter(index => index >= 0) - ColumnProjectionUtils.appendReadColumnIDs(hiveConf, neededColumnIDs) - ColumnProjectionUtils.appendReadColumnNames(hiveConf, attributes.map(_.name)) + HiveShim.appendReadColumns(hiveConf, neededColumnIDs, attributes.map(_.name)) // Specifies types and object inspectors of columns to be scanned. val structOI = ObjectInspectorUtils diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 39033bdeac4b0..33efb4b84744a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -21,11 +21,10 @@ import scala.collection.JavaConversions._ import java.util.{HashMap => JHashMap} -import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar} +import org.apache.hadoop.hive.common.`type`.{HiveVarchar} import org.apache.hadoop.hive.metastore.MetaStoreUtils -import org.apache.hadoop.hive.ql.Context import org.apache.hadoop.hive.ql.metadata.Hive -import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} +import org.apache.hadoop.hive.ql.plan.{TableDesc} import org.apache.hadoop.hive.serde2.Serializer import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption @@ -40,6 +39,9 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.execution.{SparkPlan, UnaryNode} import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation, SparkHiveHadoopWriter} +import org.apache.spark.sql.hive.{ShimFileSinkDesc => FileSinkDesc, ShimContext => Context} +import org.apache.spark.sql.hive.HiveShim +import org.apache.spark.sql.hive.HiveShim._ /** * :: DeveloperApi :: @@ -76,7 +78,7 @@ case class InsertIntoHiveTable( new HiveVarchar(s, s.size) case (bd: BigDecimal, oi: JavaHiveDecimalObjectInspector) => - new HiveDecimal(bd.underlying()) + HiveShim.createDecimal(bd.underlying()) case (row: Row, oi: StandardStructObjectInspector) => val struct = oi.create() @@ -176,7 +178,7 @@ case class InsertIntoHiveTable( // instances within the closure, since Serializer is not serializable while TableDesc is. val tableDesc = table.tableDesc val tableLocation = table.hiveQlTable.getDataLocation - val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation) + val tmpLocation = hiveContext.getExternalTmpPath(tableLocation) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) val rdd = childRdd.mapPartitions { iter => val serializer = newSerializer(fileSinkConf.getTableInfo) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index 7d1ad53d8bdb3..537e21eac7b0f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -18,8 +18,6 @@ package org.apache.spark.sql.hive import scala.collection.mutable.ArrayBuffer - -import org.apache.hadoop.hive.common.`type`.HiveDecimal import org.apache.hadoop.hive.ql.exec.UDF import org.apache.hadoop.hive.ql.exec.{FunctionInfo, FunctionRegistry} import org.apache.hadoop.hive.ql.udf.{UDFType => HiveUDFType} @@ -33,6 +31,7 @@ import org.apache.spark.util.Utils.getContextOrSparkClassLoader /* Implicit conversions */ import scala.collection.JavaConversions._ +import org.apache.spark.sql.hive.HiveShim private[hive] abstract class HiveFunctionRegistry extends analysis.FunctionRegistry with HiveInspectors { @@ -110,7 +109,8 @@ private[hive] case class HiveSimpleUdf(functionClassName: String, children: Seq[ val primitiveClasses = Seq( Integer.TYPE, classOf[java.lang.Integer], classOf[java.lang.String], java.lang.Double.TYPE, classOf[java.lang.Double], java.lang.Long.TYPE, classOf[java.lang.Long], - classOf[HiveDecimal], java.lang.Byte.TYPE, classOf[java.lang.Byte], + classOf[org.apache.hadoop.hive.common.`type`.HiveDecimal], + java.lang.Byte.TYPE, classOf[java.lang.Byte], classOf[java.sql.Timestamp] ) val matchingConstructor = argClass.getConstructors.find { c => @@ -128,7 +128,7 @@ private[hive] case class HiveSimpleUdf(functionClassName: String, children: Seq[ } else { constructor.newInstance(a match { case i: Int => i: java.lang.Integer - case bd: BigDecimal => new HiveDecimal(bd.underlying()) + case bd: BigDecimal => HiveShim.createDecimal(bd.underlying()) case other: AnyRef => other }).asInstanceOf[AnyRef] } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index a35c40efdc207..0bc1ffa7c26a4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical.NativeCommand import org.apache.spark.sql.execution.{BroadcastHashJoin, ShuffledHashJoin} import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ +import org.apache.spark.sql.hive.HiveShim class StatisticsSuite extends QueryTest with BeforeAndAfterAll { TestHive.reset() @@ -79,9 +80,9 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect() sql("INSERT INTO TABLE analyzeTable SELECT * FROM src").collect() sql("INSERT INTO TABLE analyzeTable SELECT * FROM src").collect() - - assert(queryTotalSize("analyzeTable") === defaultSizeInBytes) - + if (HiveShim.version.equals("0.12.0")) { + assert(queryTotalSize("analyzeTable") === defaultSizeInBytes) + } sql("ANALYZE TABLE analyzeTable COMPUTE STATISTICS noscan") assert(queryTotalSize("analyzeTable") === BigInt(11624)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index c4abb3eb4861f..ebfd754f8666e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.{Row, SchemaRDD} +import org.apache.spark.sql.hive.HiveShim case class TestData(a: Int, b: String) @@ -451,14 +452,14 @@ class HiveQuerySuite extends HiveComparisonTest { // Describe a partition is a native command assertResult( Array( - Array("key", "int", "None"), - Array("value", "string", "None"), - Array("dt", "string", "None"), + Array("key", "int", HiveShim.getEmptyCommentsFieldValue), + Array("value", "string", HiveShim.getEmptyCommentsFieldValue), + Array("dt", "string", HiveShim.getEmptyCommentsFieldValue), Array("", "", ""), Array("# Partition Information", "", ""), Array("# col_name", "data_type", "comment"), Array("", "", ""), - Array("dt", "string", "None")) + Array("dt", "string", HiveShim.getEmptyCommentsFieldValue)) ) { sql("DESCRIBE test_describe_commands1 PARTITION (dt='2008-06-08')") .select('result) @@ -538,8 +539,8 @@ class HiveQuerySuite extends HiveComparisonTest { sql(s"SET ${testKey + testKey}=${testVal + testVal}") assert(hiveconf.get(testKey + testKey, "") == testVal + testVal) - assertResult(Array(s"$testKey=$testVal", s"${testKey + testKey}=${testVal + testVal}")) { - sql(s"SET").collect().map(_.getString(0)) + assertResult(Set(s"$testKey=$testVal", s"${testKey + testKey}=${testVal + testVal}")) { + sql(s"SET").collect().map(_.getString(0)).toSet } // "set key" @@ -566,8 +567,8 @@ class HiveQuerySuite extends HiveComparisonTest { sql(s"SET ${testKey + testKey}=${testVal + testVal}") assert(hiveconf.get(testKey + testKey, "") == testVal + testVal) - assertResult(Array(s"$testKey=$testVal", s"${testKey + testKey}=${testVal + testVal}")) { - sql("SET").collect().map(_.getString(0)) + assertResult(Set(s"$testKey=$testVal", s"${testKey + testKey}=${testVal + testVal}")) { + sql("SET").collect().map(_.getString(0)).toSet } assertResult(Array(s"$testKey=$testVal")) { diff --git a/sql/hive/v0.12/src/main/scala/org/apache/spark/sql/hive/Shim.scala b/sql/hive/v0.12/src/main/scala/org/apache/spark/sql/hive/Shim.scala new file mode 100644 index 0000000000000..5dca21bf2eca5 --- /dev/null +++ b/sql/hive/v0.12/src/main/scala/org/apache/spark/sql/hive/Shim.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.util.Properties +import scala.language.implicitConversions +import org.apache.hadoop.hive.ql.metadata.Partition +import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} +import scala.collection.JavaConversions._ +import org.apache.hadoop.hive.serde2.{Deserializer, ColumnProjectionUtils} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hive.ql.Context +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.common.`type`.HiveDecimal +import java.net.URI +import org.apache.hadoop.{io => hadoopIo} +import org.apache.hadoop.hive.ql.stats.StatsSetupConst +import org.apache.hadoop.mapred.InputFormat +import org.apache.hadoop.hive.ql.metadata.Hive +import org.apache.hadoop.hive.ql.metadata.Table +import org.apache.hadoop.hive.ql.processors._ +import org.apache.hadoop.hive.conf.HiveConf + +/*hive-0.12.0 support shimmer layer*/ +object HiveShim { + val version = "0.12.0" + val metastoreDecimal = "decimal" + def getTableDesc(serdeClass: Class[_ <: Deserializer], inputFormatClass: Class[_ <: InputFormat[_, _]], outputFormatClass: Class[_], properties: Properties) = { + new TableDesc(serdeClass, inputFormatClass, outputFormatClass, properties) + } + def getStatsSetupConstTotalSize = StatsSetupConst.TOTAL_SIZE + def createDefaultDBIfNeeded(context: HiveContext) ={ } + + /*handle the difference in "None" and empty ""*/ + def getEmptyCommentsFieldValue = "None" + + def convertCatalystString2Hive(s: String) = new hadoopIo.Text(s) + + def getCommandProcessor(cmd: Array[String], conf: HiveConf) = { + CommandProcessorFactory.get(cmd(0), conf) + } + + def createDecimal(bd: java.math.BigDecimal): HiveDecimal = { + new HiveDecimal(bd) + } + + def appendReadColumns(conf: Configuration, ids: Seq[Integer], names: Seq[String]) { + ColumnProjectionUtils.appendReadColumnIDs(conf, ids) + ColumnProjectionUtils.appendReadColumnNames(conf, names) + } + + implicit class wrapperToPartition(p: Partition) { + def getDataLocationPath: Path = p.getPartitionPath + } + implicit class wrapperToHive(client: Hive) { + def getAllPartitionsOf(tbl: Table) = { + client.getAllPartitionsForPruner(tbl) + } + } +} + +class ShimContext(conf: Configuration) extends Context(conf) { + def getExternalTmpPath(uri: URI): String = { + super.getExternalTmpFileURI(uri) + } +} + +class ShimFileSinkDesc(var dir: String, var tableInfo: TableDesc, var compressed: Boolean) + extends FileSinkDesc(dir, tableInfo, compressed) { +} diff --git a/sql/hive/v0.13/src/main/scala/org/apache/spark/sql/hive/Shim.scala b/sql/hive/v0.13/src/main/scala/org/apache/spark/sql/hive/Shim.scala new file mode 100644 index 0000000000000..2834184cbd1c1 --- /dev/null +++ b/sql/hive/v0.13/src/main/scala/org/apache/spark/sql/hive/Shim.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory +import scala.language.implicitConversions +import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} +import org.apache.hadoop.hive.common.`type`.{HiveDecimal} +import scala.collection.JavaConversions._ +import org.apache.spark.Logging +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.ql.Context +import org.apache.hadoop.hive.ql.metadata.Partition +import org.apache.hadoop.{io => hadoopIo} +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.common.StatsSetupConst +import org.apache.hadoop.mapred.InputFormat +import java.util.Properties +import org.apache.hadoop.hive.serde2.Deserializer + +/*hive-0.13.1 support shimmer layer*/ +object HiveShim { + val version = "0.13.1" + /* + * hive-0.13 support DECIMAL(precision, scale), DECIMAL in hive-0.12 is actually DECIMAL(10,0) + * Full support of new decimal feature need to be fixed in seperate PR. + */ + val metastoreDecimal = "decimal(10,0)" + def getTableDesc(serdeClass: Class[_ <: Deserializer], inputFormatClass: Class[_ <: InputFormat[_, _]], outputFormatClass: Class[_], properties: Properties) = { + new TableDesc(inputFormatClass, outputFormatClass, properties) + } + def getStatsSetupConstTotalSize = StatsSetupConst.TOTAL_SIZE + def createDefaultDBIfNeeded(context: HiveContext) ={ + context.runSqlHive("CREATE DATABASE default") + context.runSqlHive("USE default") + } + /*handle the difference in HiveQuerySuite*/ + def getEmptyCommentsFieldValue = "" + + def convertCatalystString2Hive(s: String) = s + + def getCommandProcessor(cmd: Array[String], conf: HiveConf) = { + CommandProcessorFactory.get(cmd, conf) + } + + def createDecimal(bd: java.math.BigDecimal): HiveDecimal = { + HiveDecimal.create(bd) + } + + /* + * This function in hive-0.13 become private, but we have to do this to walkaround hive bug + * */ + private def appendReadColumnNames(conf: Configuration, cols: Seq[String]) { + val old: String = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "") + val result: StringBuilder = new StringBuilder(old) + var first: Boolean = old.isEmpty + + for (col <- cols) { + if (first) { + first = false + } + else { + result.append(',') + } + result.append(col) + } + conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, result.toString) + } + + /* + * Cannot use ColumnProjectionUtils.appendReadColumns directly, if ids is null or empty + * */ + def appendReadColumns(conf: Configuration, ids: Seq[Integer], names: Seq[String]) { + if (ids != null && ids.size > 0) { + ColumnProjectionUtils.appendReadColumns(conf, ids) + } else { + appendReadColumnNames(conf, names) + } + } + + /* + * Bug introdiced in hive-0.13. FileSinkDesc is serilizable, but its member path is not. + * Fix it through wrapper. + * */ + implicit def wrapperToFileSinkDesc(w: ShimFileSinkDesc): FileSinkDesc = { + var f = new FileSinkDesc(new Path(w.dir), w.tableInfo, w.compressed) + f.setCompressed(w.compressed) + f.setCompressCodec(w.compressCodec) + f.setCompressType(w.compressType) + f.setTableInfo(w.tableInfo) + f + } + + implicit class wrapperToPartition(p: Partition) { + def getDataLocationPath: Path = p.getDataLocation + } +} + +class ShimContext(conf: Configuration) extends Context(conf) { + def getExternalTmpPath(path: Path): Path = { + super.getExternalTmpPath (path.toUri) + } +} + +class ShimFileSinkDesc(var dir: String, var tableInfo: TableDesc, var compressed: Boolean) + extends Serializable with Logging { + var compressCodec: String = _ + var compressType: String = _ + var destTableId: Int = _ + + def setCompressed(compressed: Boolean) { + this.compressed = compressed + } + def getDirName = dir + def setDestTableId(destTableId: Int) { + this.destTableId = destTableId + } + + def setTableInfo(tableInfo: TableDesc) { + this.tableInfo = tableInfo + } + + def setCompressCodec(intermediateCompressorCodec: String) { + compressCodec = intermediateCompressorCodec + } + + def setCompressType(intermediateCompressType: String) { + compressType = intermediateCompressType + } +}