From 60f70aa2e5956249107d8a4ee92bec58fb702628 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?baishuo=28=E7=99=BD=E7=A1=95=29?= Date: Mon, 18 Aug 2014 14:34:53 +0800 Subject: [PATCH] Update InsertIntoHiveTable.scala --- .../hive/execution/InsertIntoHiveTable.scala | 56 +++++++++++++++---- 1 file changed, 44 insertions(+), 12 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index a6f1a683f3fe2..8f327d1887a23 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -24,7 +24,9 @@ import java.util.{HashMap => JHashMap} import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar} import org.apache.hadoop.hive.metastore.MetaStoreUtils import org.apache.hadoop.hive.ql.Context +import org.apache.hadoop.hive.ql.ErrorMsg import org.apache.hadoop.hive.ql.metadata.Hive +import org.apache.hadoop.hive.ql.parse.SemanticException import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} import org.apache.hadoop.hive.serde2.Serializer import org.apache.hadoop.hive.serde2.objectinspector._ @@ -40,6 +42,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.execution.{SparkPlan, UnaryNode} import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation, SparkHiveHadoopWriter} +import org.apache.hadoop.hive.conf.HiveConf /** * :: DeveloperApi :: @@ -159,7 +162,7 @@ case class InsertIntoHiveTable( writer.commitJob() } - def getDynamicPartDir(tableInfo: TableDesc, row: Row, dynamicPartNum2: Int) :String = { + def getDynamicPartDir(tableInfo: TableDesc, row: Row, dynamicPartNum2: Int, jobConf: JobConf) :String = { dynamicPartNum2 match { case 0 =>"" case i => { @@ -169,11 +172,11 @@ case class InsertIntoHiveTable( var buf = new StringBuffer() if (partCols.length == dynamicPartNum2) { for (j <- 0 until partCols.length) { - buf.append("/").append(partCols(j)).append("=").append(row(j + row.length - colsNum)) + buf.append("/").append(partCols(j)).append("=").append(handleNull(row(colsNum + j ), jobConf)) } } else { for (j <- 0 until dynamicPartNum2) { - buf.append("/").append(partCols(j + partCols.length - dynamicPartNum2)).append("=").append(row(j + colsNum)) + buf.append("/").append(partCols(j + partCols.length - dynamicPartNum2)).append("=").append(handleNull(row(colsNum + j), jobConf)) } } buf.toString @@ -181,6 +184,14 @@ case class InsertIntoHiveTable( } } + def handleNull(obj :Any, jobConf: JobConf) :String = { + if (obj == null ||obj.toString.length == 0) { + jobConf.get("hive.exec.default.partition.name ", "__HIVE_DEFAULT_PARTITION__") + } else { + obj.toString + } + } + override def execute() = result /** @@ -201,11 +212,38 @@ case class InsertIntoHiveTable( val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) var dynamicPartNum = 0 + var numStaPart = 0 var dynamicPartPath = ""; val partitionSpec = partition.map { - case (key, Some(value)) => key -> value - case (key, None) => { dynamicPartNum += 1; key -> "" }// Should not reach here right now. + case (key, Some(value)) => { numStaPart += 1; key -> value } + case (key, None) => { dynamicPartNum += 1; key -> "" } } + // ORC stores compression information in table properties. While, there are other formats + // (e.g. RCFile) that rely on hadoop configurations to store compression information. + val jobConf = new JobConf(sc.hiveconf) + val jobConfSer = new SerializableWritable(jobConf) + // check if the partition spec is valid + if (dynamicPartNum > 0) { + if (!sc.hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) { + throw new SemanticException( + ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg()) + } + if (numStaPart == 0 && sc.hiveconf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) { + throw new SemanticException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg()); + } + // check if static partition appear after dynamic partitions + for ((k,v) <- partitionSpec) { + if (partitionSpec(k) == "") { + if (numStaPart > 0) { // found a DP, but there exists ST as subpartition + throw new SemanticException( + ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg()); + } + } else { + numStaPart -= 1 + } + } + } + val rdd = childRdd.mapPartitions { iter => val serializer = newSerializer(fileSinkConf.getTableInfo) val standardOI = ObjectInspectorUtils @@ -221,7 +259,7 @@ case class InsertIntoHiveTable( var i = 0 while (i < fieldOIs.length) { if (fieldOIs.length < row.length && row.length - fieldOIs.length == dynamicPartNum) { - dynamicPartPath = getDynamicPartDir(fileSinkConf.getTableInfo, row, dynamicPartNum) + dynamicPartPath = getDynamicPartDir(fileSinkConf.getTableInfo, row, dynamicPartNum, jobConfSer.value) } // Casts Strings to HiveVarchars when necessary. outputData(i) = wrap(row(i), fieldOIs(i)) @@ -232,10 +270,6 @@ case class InsertIntoHiveTable( } } - // ORC stores compression information in table properties. While, there are other formats - // (e.g. RCFile) that rely on hadoop configurations to store compression information. - val jobConf = new JobConf(sc.hiveconf) - val jobConfSer = new SerializableWritable(jobConf) if (dynamicPartNum>0) { if (outputClass == null) { throw new SparkException("Output value class not set") @@ -300,8 +334,6 @@ case class InsertIntoHiveTable( v.commitJob() } writerMap.clear() - //writer.commitJob() - } else { saveAsHiveFile( rdd,