Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12355][SQL] Implement unhandledFilter interface for Parquet #10502

Closed
wants to merge 10 commits into from

Conversation

HyukjinKwon
Copy link
Member

What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-12355
This is similar with #10427.

As discussed here #10221, this PR implemented unhandledFilter to remove duplicated Spark-side filtering.

In case of Parquet, the columns referenced in pushed down filters should be given to org.apache.spark.sql.parquet.row.requested_schema whereas general datasources such as JDBC do not require the columns.

However, DataSourceStrategy.pruneFilterProjectRaw() removes the columns only referenced in pushed down filters. Therefore, this PR resolved this problem by manually generating the columns referenced in pushed down filters.

How was the this patch tested?

This was tested with unittests and with dev/run_tests for coding style

@SparkQA
Copy link

SparkQA commented Dec 29, 2015

Test build #48396 has finished for PR 10502 at commit cf331a4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

The test is failed from wrong results from Parquet.
The test result was below:

== Physical Plan ==
Scan ParquetRelation[_1#4] InputPaths: file:/private/var/folders/9j/gf_c342d7d150mwrxvkqnc180000gn/T/spark-2fa8f75a-8ac1-4273-afcd-f271529a24a7, PushedFilters: [GreaterThan(_1,5)]
== Results ==
!== Correct Answer - 4 ==   == Spark Answer - 5 ==
![6]                        [5]
![7]                        [6]
![8]                        [7]
![9]                        [8]
!                           [9]

As can be seen, GreaterThan(_1,5) is pushed down correctly.
In addition, Parquet filter gt(_1, 5) is also created correctly but the results includes 5.
But thing is, this all passes the tests in ParquetFilterSuite which check all the operator including GreaterThan.
I am looking into this deeper.

@HyukjinKwon
Copy link
Member Author

I see. UnsafeRowParquetRecordReader at Parquet does not support filter record by record but only block-level filtering. So, even with = operator produces the same results below.

+---+
| _1|
+---+
|  5|
|  6|
|  7|
|  8|
|  9|
+---+

I think I should disable unhandledFilters when spark.sql.parquet.enableUnsafeRowRecordReader is enabled.
BTW, it looks a bit strange that it does not work with filtering (record by record).

@SparkQA
Copy link

SparkQA commented Dec 29, 2015

Test build #48403 has finished for PR 10502 at commit 8c376af.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 29, 2015

Test build #48402 has finished for PR 10502 at commit 8c376af.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 29, 2015

Test build #48405 has finished for PR 10502 at commit 2ad8182.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

cc @yhuai @liancheng

@yhuai
Copy link
Contributor

yhuai commented Dec 29, 2015

@HyukjinKwon Thank you for the PR? Can you post some benchmarking results (with your testing code)? It will be good to have these numbers to help others understand if it can provide benefit.

@HyukjinKwon
Copy link
Member Author

@yhuai Sure. I will try!

@HyukjinKwon
Copy link
Member Author

Benchmark (Removed Spark-side Filter)

Motivation

This PR simplifies the query plans for Parquet files by stripping duplicated Spark-side filtering,

from:

== Physical Plan ==
Filter (a#8 = 2)
  +- Scan ParquetRelation[a#8] InputPaths: file:/private/var/folders/9j/gf_c342d7d150mwrxvkqnc180000gn/T/spark-ef271ec6-95e1-43ae-9b3e-1d4dae6f69c3/part=1, PushedFilters: [EqualTo(a,2)]

to :

== Physical Plan ==
Scan ParquetRelation[a#8] InputPaths: file:/private/var/folders/9j/gf_c342d7d150mwrxvkqnc180000gn/T/spark-ef271ec6-95e1-43ae-9b3e-1d4dae6f69c3/part=1, PushedFilters: [EqualTo(a,2)]

However, in terms of performance, it is unkown if there is benefit. So, this benchmark was performed.
Simply, several queries were executed with duplicated Spark-side filtering and without it.

Environment

  • Machine: MacBook Pro Retina
  • CPU: 4
  • Memory: 8GB

Method

  • The filters that Parquet can generate were composed with 9 simple quries. IS NULL, IS NOT NULL, =, !=, <=>, <, >, <= and >= operators were tested here. Each query was executed 5 times and the average times were calculated for them.
  • The commit right before this PR was used against this PR. Namely, The tests were performed with Spark-side filtering and without Spark-side filtering.
  • In order to test cleanly, the target Parquet file was uncompressed and in order to enable Parquet filtering row by row, spark.sql.parquet.enableUnsafeRowRecordReader was disabled.
  • Spark was stopped and started for each query.

Dataset

Raw Data

Create Target Parquet File

  • case class for Lineitem table
case class Lineitem(l_orderkey: Int,
                    l_partkey: Int,
                    l_suppkey: Int,
                    l_linenumber: Int,
                    l_quantity: Float,
                    l_extendedprice: Float,
                    l_discount: Float,
                    l_tax: Float,
                    l_returnflag: String,
                    l_linestatus: String,
                    l_shipdate: String,
                    l_commitdate: String,
                    l_receiptdate: String,
                    l_shipinstruct: String,
                    l_shipmode: String,
                    l_comment: String)
  • Create Parquet file
import sqlContext.implicits._
var conf = new SparkConf()
conf.setAppName("Test").setMaster("local")
conf.set("spark.sql.parquet.compression.codec", "uncompressed")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sc.textFile("lineitem.tbl").map(_.split('|')).map { v =>
  Lineitem(
    v(0).trim.toInt,
    v(1).trim.toInt,
    v(2).trim.toInt,
    v(3).trim.toInt,
    v(4).trim.toFloat,
    v(5).trim.toFloat,
    v(6).trim.toFloat,
    v(7).trim.toFloat,
    v(8),
    v(9),
    v(10),
    v(11),
    v(12),
    v(13),
    v(14),
    v(15))
}.toDF()
df.save("lineitem", "parquet")

Parquet file

-rw-r--r--  hyukjinkwon hyukjinkwon 0 B         _SUCCESS
-rw-r--r--  hyukjinkwon hyukjinkwon 1.54 KB     _common_metadata
-rw-r--r--  hyukjinkwon hyukjinkwon 13.72 KB    _metadata
-rw-r--r--  hyukjinkwon hyukjinkwon 52.16 MB    part-r-00000-2aa739b2-6194-47db-9dd5-461905cea976.parquet
-rw-r--r--  hyukjinkwon hyukjinkwon 51.74 MB    part-r-00001-2aa739b2-6194-47db-9dd5-461905cea976.parquet
-rw-r--r--  hyukjinkwon hyukjinkwon 51.97 MB    part-r-00002-2aa739b2-6194-47db-9dd5-461905cea976.parquet
-rw-r--r--  hyukjinkwon hyukjinkwon 51.71 MB    part-r-00003-2aa739b2-6194-47db-9dd5-461905cea976.parquet
-rw-r--r--  hyukjinkwon hyukjinkwon 51.89 MB    part-r-00004-2aa739b2-6194-47db-9dd5-461905cea976.parquet
-rw-r--r--  hyukjinkwon hyukjinkwon 34.25 MB    part-r-00005-2aa739b2-6194-47db-9dd5-461905cea976.parquet

Test Codes

  • Function to measure time
def time[A](f: => A) = {
  val s = System.nanoTime
  val ret = f
  println("time: "+(System.nanoTime-s)/1e6+"ms")
  ret
}
  • Configuration and SQLContext
var conf = new SparkConf()
conf.setAppName("Test").setMaster("local")
conf.set("spark.sql.parquet.enableUnsafeRowRecordReader", "false")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  • IS NULL
val source = sqlContext.read.parquet("lineitem")
val df = source.filter("l_orderkey IS NULL").select("l_orderkey")
time(df.collect())
  • IS NOT NULL
val source = sqlContext.read.parquet("lineitem")
val df = source.filter("l_orderkey IS NOT NULL").select("l_orderkey")
time(df.collect())
  • =
val source = sqlContext.read.parquet("lineitem")
val df = source.filter("l_orderkey = 1").select("l_orderkey")
time(df.collect())
  • !=
val source = sqlContext.read.parquet("lineitem")
val df = source.filter("l_orderkey != 1").select("l_orderkey")
time(df.collect())
  • <=>
val source = sqlContext.read.parquet("lineitem")
val df = source.filter("l_orderkey <=> 1").select("l_orderkey")
time(df.collect())
  • <
val source = sqlContext.read.parquet("lineitem")
val df = source.filter("l_orderkey < 3000000").select("l_orderkey")
time(df.collect())
  • >
val source = sqlContext.read.parquet("lineitem")
val df = source.filter("l_orderkey > 3000000").select("l_orderkey")
time(df.collect())
  • <=
val source = sqlContext.read.parquet("lineitem")
val df = source.filter("l_orderkey <= 3000000").select("l_orderkey")
time(df.collect())
  • >=
val source = sqlContext.read.parquet("lineitem")
val df = source.filter("l_orderkey >= 3000000").select("l_orderkey")
time(df.collect())

Results

Operator Without Spark Filtering(ms) With Spark Filtering(ms) Time Decreased(%)
IS NULL 645.015 669.038 3.590
IS NOT NULL 8040.593 8394.950 4.221
= 885.764 906.658 2.304
!= 7844.248 8082.113 2.943
<=> 869.402 912.457 4.718
< 4510.599 4625.214 2.478
> 4732.729 4940.915 4.213
<= 4868.453 4918.441 1.016
>= 4751.772 4946.939 3.945

Basically, in a simple view, the difference was below.

The original codes would work as below (With Spark Filtering):

data
  // Parquet-side filtering
  .filter(pushedFilter)
  // Spark-side filtering
  .filter(pushedFilter)

This PR would change this into below (Without Spark Filtering):

data
  // Parquet-side filtering
  .filter(pushedFilter)

Although both have the same O(n) time complexity, the former was 2n and the latter was n. So, it seems there is performance benefit.

One notable thing is, there was still considerable performance differences for =, IS NULL and <=>. It seems there is the overhead of computing the plans wrapped with Spark-side filter regardless of the time complexity because for the operations above, the actual cost of additional filtering is 1 or 0 for IS NULL.

So, in conclusion, although we cannot depend only on this benchmark, it seems there is performance benefit approximately from 1% to 4% for basic queries with pushed filters in terms of elapsed time.

@HyukjinKwon
Copy link
Member Author

@yhuai @liancheng @rxin Would you look through this please?

@@ -208,11 +210,30 @@ private[sql] object ParquetFilters {
}

/**
* Return referenced columns in [[sources.Filter]].
*/
def referencedColumns(schema: StructType, predicate: sources.Filter): Array[String] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to add private[parquet]?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yes it looks so. I think I might also have to change createFilter() in that way because I just followed up in the way of createFilter() for this function because both createFilter() and referencedColumns() are called in the same places.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

@JoshRosen
Copy link
Contributor

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jan 11, 2016

Test build #49115 has finished for PR 10502 at commit dfc7506.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 11, 2016

Test build #49122 has finished for PR 10502 at commit dfc7506.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

I will resolve this conflict on Thursday.

@SparkQA
Copy link

SparkQA commented Feb 11, 2016

Test build #51075 has finished for PR 10502 at commit ad757be.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

@liancheng @yhuai Would you look through this please?

@SparkQA
Copy link

SparkQA commented Feb 26, 2016

Test build #52014 has finished for PR 10502 at commit 0e149da.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yhuai
Copy link
Contributor

yhuai commented Feb 26, 2016

@HyukjinKwon Thank you for working on it. We have been actively improving the efficiency of code-gen and unsafe parquet reader. For the long term, letting parquet to evaluate filters like a=10 row by row is not as efficient as using the code gen version of Spark filter. We do plan to let unsafe parquet reader to evaluate very cheap predicates like IsNotNull and IsNull for every row. However, before it happens, can we hold off the work on this PR?

@HyukjinKwon
Copy link
Member Author

@yhuai No problem. Then, please inform me later when I am supposed to do something else.

@yhuai
Copy link
Contributor

yhuai commented Feb 26, 2016

yea will do. Thank you.

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Apr 22, 2016

@yhuai Let me close this for now. Please let me know although this is closed. I will reopen this when I start to work on this again.

@HyukjinKwon
Copy link
Member Author

Hi @yhuai ! Would this be okay if I give a try for this one again maybe?

@HyukjinKwon HyukjinKwon deleted the SPARK-12355 branch September 23, 2016 18:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants