Skip to content

Commit

Permalink
Fixes filter push-down for Parquet data source
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Jan 28, 2015
1 parent 119f45d commit 14ccd37
Showing 1 changed file with 10 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,26 @@ import java.util.{List => JList}

import scala.collection.JavaConversions._

import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce.{JobContext, InputSplit, Job}

import org.apache.hadoop.mapreduce.{InputSplit, Job, JobContext}
import parquet.filter2.predicate.FilterApi
import parquet.hadoop.ParquetInputFormat
import parquet.hadoop.util.ContextUtil

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.{Partition => SparkPartition, Logging}
import org.apache.spark.rdd.{NewHadoopPartition, RDD}
import org.apache.spark.sql.{SQLConf, Row, SQLContext}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
import org.apache.spark.{Logging, Partition => SparkPartition}


/**
* Allows creation of parquet based tables using the syntax
* `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option
* `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option
* required is `path`, which should be the location of a collection of, optionally partitioned,
* parquet files.
*/
Expand Down Expand Up @@ -193,10 +193,12 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, selectedFiles: _*)
}

// Push down filters when possible
// Push down filters when possible. Notice that not all filters can be converted to Parquet
// filter predicate. Here we try to convert each individual predicate and only collect those
// convertible ones.
predicates
.reduceOption(And)
.flatMap(ParquetFilters.createFilter)
.reduceOption(FilterApi.and)
.filter(_ => sqlContext.conf.parquetFilterPushDown)
.foreach(ParquetInputFormat.setFilterPredicate(jobConf, _))

Expand Down

0 comments on commit 14ccd37

Please sign in to comment.