Skip to content

Commit

Permalink
Avoid dynamic dispatching when unwrapping Hive data.
Browse files Browse the repository at this point in the history
This is a follow up of PR apache#758.

The `unwrapHiveData` function is now composed statically before actual rows are scanned according to the field object inspector to avoid dynamic dispatching cost.

According to the same micro benchmark used in PR apache#758, this simple change brings slight performance boost: 2.5% for CSV table and 1% for RCFile table.

```
Optimized version:

CSV: 6870 ms, RCFile: 5687 ms
CSV: 6832 ms, RCFile: 5800 ms
CSV: 6822 ms, RCFile: 5679 ms
CSV: 6704 ms, RCFile: 5758 ms
CSV: 6819 ms, RCFile: 5725 ms

Original version:

CSV: 7042 ms, RCFile: 5667 ms
CSV: 6883 ms, RCFile: 5703 ms
CSV: 7115 ms, RCFile: 5665 ms
CSV: 7020 ms, RCFile: 5981 ms
CSV: 6871 ms, RCFile: 5906 ms
```

Author: Cheng Lian <[email protected]>

Closes apache#935 from liancheng/staticUnwrapping and squashes the following commits:

c49c70c [Cheng Lian] Avoid dynamic dispatching when unwrapping Hive data.
  • Loading branch information
liancheng authored and conviva-zz committed Sep 4, 2014
1 parent c3ce724 commit 238697d
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ import org.apache.hadoop.hive.ql.plan.{TableDesc, FileSinkDesc}
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharObjectInspector
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Serializer}
import org.apache.hadoop.io.Writable
Expand Down Expand Up @@ -95,29 +94,34 @@ case class HiveTableScan(
attributes.map { a =>
val ordinal = relation.partitionKeys.indexOf(a)
if (ordinal >= 0) {
val dataType = relation.partitionKeys(ordinal).dataType
(_: Any, partitionKeys: Array[String]) => {
val value = partitionKeys(ordinal)
val dataType = relation.partitionKeys(ordinal).dataType
unwrapHiveData(castFromString(value, dataType))
castFromString(partitionKeys(ordinal), dataType)
}
} else {
val ref = objectInspector.getAllStructFieldRefs
.find(_.getFieldName == a.name)
.getOrElse(sys.error(s"Can't find attribute $a"))
val fieldObjectInspector = ref.getFieldObjectInspector

val unwrapHiveData = fieldObjectInspector match {
case _: HiveVarcharObjectInspector =>
(value: Any) => value.asInstanceOf[HiveVarchar].getValue
case _: HiveDecimalObjectInspector =>
(value: Any) => BigDecimal(value.asInstanceOf[HiveDecimal].bigDecimalValue())
case _ =>
identity[Any] _
}

(row: Any, _: Array[String]) => {
val data = objectInspector.getStructFieldData(row, ref)
unwrapHiveData(unwrapData(data, ref.getFieldObjectInspector))
val hiveData = unwrapData(data, fieldObjectInspector)
if (hiveData != null) unwrapHiveData(hiveData) else null
}
}
}
}

private def unwrapHiveData(value: Any) = value match {
case varchar: HiveVarchar => varchar.getValue
case decimal: HiveDecimal => BigDecimal(decimal.bigDecimalValue)
case other => other
}

private def castFromString(value: String, dataType: DataType) = {
Cast(Literal(value), dataType).eval(null)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,14 @@ abstract class HiveComparisonTest
def isSorted(plan: LogicalPlan): Boolean = plan match {
case _: Join | _: Aggregate | _: BaseRelation | _: Generate | _: Sample | _: Distinct => false
case PhysicalOperation(_, _, Sort(_, _)) => true
case _ => plan.children.iterator.map(isSorted).exists(_ == true)
case _ => plan.children.iterator.exists(isSorted)
}

val orderedAnswer = hiveQuery.logical match {
// Clean out non-deterministic time schema info.
case _: NativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "")
case _: ExplainCommand => answer
case plan if isSorted(plan) => answer
case _ => answer.sorted
case plan => if (isSorted(plan)) answer else answer.sorted
}
orderedAnswer.map(cleanPaths)
}
Expand Down

0 comments on commit 238697d

Please sign in to comment.