Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[SPARK-1368][SQL] Optimized HiveTableScan
JIRA issue: [SPARK-1368](https://issues.apache.org/jira/browse/SPARK-1368) This PR introduces two major updates: - Replaced FP style code with `while` loop and reusable `GenericMutableRow` object in critical path of `HiveTableScan`. - Using `ColumnProjectionUtils` to help optimizing RCFile and ORC column pruning. My quick micro benchmark suggests these two optimizations made the optimized version 2x and 2.5x faster when scanning CSV table and RCFile table respectively: ``` Original: [info] CSV: 27676 ms, RCFile: 26415 ms [info] CSV: 27703 ms, RCFile: 26029 ms [info] CSV: 27511 ms, RCFile: 25962 ms Optimized: [info] CSV: 13820 ms, RCFile: 10402 ms [info] CSV: 14158 ms, RCFile: 10691 ms [info] CSV: 13606 ms, RCFile: 10346 ms ``` The micro benchmark loads a 609MB CVS file (structurally similar to the `src` test table) into a normal Hive table with `LazySimpleSerDe` and a RCFile table, then scans these tables respectively. Preparation code: ```scala package org.apache.spark.examples.sql.hive import org.apache.spark.sql.hive.LocalHiveContext import org.apache.spark.{SparkConf, SparkContext} object HiveTableScanPrepare extends App { val sparkContext = new SparkContext( new SparkConf() .setMaster("local") .setAppName(getClass.getSimpleName.stripSuffix("$"))) val hiveContext = new LocalHiveContext(sparkContext) import hiveContext._ hql("drop table scan_csv") hql("drop table scan_rcfile") hql("""create table scan_csv (key int, value string) | row format serde 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' | with serdeproperties ('field.delim'=',') """.stripMargin) hql(s"""load data local inpath "${args(0)}" into table scan_csv""") hql("""create table scan_rcfile (key int, value string) | row format serde 'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe' |stored as | inputformat 'org.apache.hadoop.hive.ql.io.RCFileInputFormat' | outputformat 'org.apache.hadoop.hive.ql.io.RCFileOutputFormat' """.stripMargin) hql( """ |from scan_csv |insert overwrite table scan_rcfile |select scan_csv.key, scan_csv.value """.stripMargin) } ``` Benchmark code: ```scala package org.apache.spark.examples.sql.hive import org.apache.spark.sql.hive.LocalHiveContext import org.apache.spark.{SparkConf, SparkContext} object HiveTableScanBenchmark extends App { val sparkContext = new SparkContext( new SparkConf() .setMaster("local") .setAppName(getClass.getSimpleName.stripSuffix("$"))) val hiveContext = new LocalHiveContext(sparkContext) import hiveContext._ val scanCsv = hql("select key from scan_csv") val scanRcfile = hql("select key from scan_rcfile") val csvDuration = benchmark(scanCsv.count()) val rcfileDuration = benchmark(scanRcfile.count()) println(s"CSV: $csvDuration ms, RCFile: $rcfileDuration ms") def benchmark(f: => Unit) = { val begin = System.currentTimeMillis() f val end = System.currentTimeMillis() end - begin } } ``` @marmbrus Please help review, thanks! Author: Cheng Lian <[email protected]> Closes apache#758 from liancheng/fastHiveTableScan and squashes the following commits: 4241a19 [Cheng Lian] Distinguishes sorted and possibly not sorted operations more accurately in HiveComparisonTest cf640d8 [Cheng Lian] More HiveTableScan optimisations: bf0e7dc [Cheng Lian] Added SortedOperation pattern to match *some* definitely sorted operations and avoid some sorting cost in HiveComparisonTest. 6d1c642 [Cheng Lian] Using ColumnProjectionUtils to optimise RCFile and ORC column pruning eb62fd3 [Cheng Lian] [SPARK-1368] Optimized HiveTableScan
- Loading branch information