Skip to content

Commit

Permalink
[SPARK-5465] [SQL] Fixes filter push-down for Parquet data source
Browse files Browse the repository at this point in the history
Not all Catalyst filter expressions can be converted to Parquet filter predicates. We should try to convert each individual predicate and then collect those convertible ones.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4255)
<!-- Reviewable:end -->

Author: Cheng Lian <[email protected]>

Closes apache#4255 from liancheng/spark-5465 and squashes the following commits:

14ccd37 [Cheng Lian] Fixes filter push-down for Parquet data source
  • Loading branch information
liancheng authored and marmbrus committed Feb 2, 2015
1 parent 8cf4a1f commit ec10032
Showing 1 changed file with 10 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,26 @@ import java.util.{List => JList}

import scala.collection.JavaConversions._

import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce.{JobContext, InputSplit, Job}

import org.apache.hadoop.mapreduce.{InputSplit, Job, JobContext}
import parquet.filter2.predicate.FilterApi
import parquet.hadoop.ParquetInputFormat
import parquet.hadoop.util.ContextUtil

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.{Partition => SparkPartition, Logging}
import org.apache.spark.rdd.{NewHadoopPartition, RDD}
import org.apache.spark.sql.{SQLConf, Row, SQLContext}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
import org.apache.spark.{Logging, Partition => SparkPartition}


/**
* Allows creation of parquet based tables using the syntax
* `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option
* `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option
* required is `path`, which should be the location of a collection of, optionally partitioned,
* parquet files.
*/
Expand Down Expand Up @@ -193,10 +193,12 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, selectedFiles: _*)
}

// Push down filters when possible
// Push down filters when possible. Notice that not all filters can be converted to Parquet
// filter predicate. Here we try to convert each individual predicate and only collect those
// convertible ones.
predicates
.reduceOption(And)
.flatMap(ParquetFilters.createFilter)
.reduceOption(FilterApi.and)
.filter(_ => sqlContext.conf.parquetFilterPushDown)
.foreach(ParquetInputFormat.setFilterPredicate(jobConf, _))

Expand Down

0 comments on commit ec10032

Please sign in to comment.