Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24718][SQL] Timestamp support pushdown to parquet data source #21741

Closed
wants to merge 4 commits into from
Closed

[SPARK-24718][SQL] Timestamp support pushdown to parquet data source #21741

wants to merge 4 commits into from

Conversation

wangyum
Copy link
Member

@wangyum wangyum commented Jul 10, 2018

What changes were proposed in this pull request?

Timestamp support pushdown to parquet data source.
Only TIMESTAMP_MICROS and TIMESTAMP_MILLIS support push down.

How was this patch tested?

unit tests and benchmark tests

"enabled and Timestamp stored as TIMESTAMP_MICROS or TIMESTAMP_MILLIS type.")
.internal()
.booleanConf
.createWithDefault(true)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be default should be false. because PARQUET_OUTPUT_TIMESTAMP_TYPE default is INT96.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we're using the file schema, it doesn't mater what the write configuration is. It only matters what it was when the file was written. If the file has an INT96 timestamp, this should just not push anything down.

@SparkQA
Copy link

SparkQA commented Jul 10, 2018

Test build #92796 has finished for PR 21741 at commit b2a9000.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Jul 10, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Jul 10, 2018

Test build #92799 has finished for PR 21741 at commit b2a9000.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


val data = Seq(ts1, ts2, ts3, ts4)

withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case is quite similar to the one below. Should we use a loop for setting the key SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key to avoid duplicated code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed to:

    // spark.sql.parquet.outputTimestampType = TIMESTAMP_MILLIS
    val millisData = Seq(Timestamp.valueOf("2018-06-14 08:28:53.123"),
      Timestamp.valueOf("2018-06-15 08:28:53.123"),
      Timestamp.valueOf("2018-06-16 08:28:53.123"),
      Timestamp.valueOf("2018-06-17 08:28:53.123"))
    withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
      ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString) {
      testTimestampPushdown(millisData)
    }

    // spark.sql.parquet.outputTimestampType = TIMESTAMP_MICROS
    val microsData = Seq(Timestamp.valueOf("2018-06-14 08:28:53.123456"),
      Timestamp.valueOf("2018-06-15 08:28:53.123456"),
      Timestamp.valueOf("2018-06-16 08:28:53.123456"),
      Timestamp.valueOf("2018-06-17 08:28:53.123456"))
    withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
      ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) {
      testTimestampPushdown(microsData)
    }

We shouldn't use same data to test TIMESTAMP_MILLIS type and TIMESTAMP_MICROS type:

  1. TIMESTAMP_MILLIS type will truncate 456 if use microsData to test.
  2. It can't test DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp] if use millisData.

@@ -387,6 +389,82 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}

test("filter pushdown - timestamp(TIMESTAMP_MILLIS)") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also test INT96 timestamp type. Also maybe when PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED is disabled.

@SparkQA
Copy link

SparkQA commented Jul 12, 2018

Test build #92908 has finished for PR 21741 at commit 5471d79.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@gengliangwang gengliangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@gatorsmile
Copy link
Member

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Select 1 timestamp stored as INT96 row (value = CAST(7864320 AS timestamp)): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add a new line after the benchmark name? e.g.

Select 1 timestamp stored as INT96 row (value = CAST(7864320 AS timestamp)):
Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
...

We can send a follow-up PR to fix this entire file.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I'll send a follow-up PR.

@cloud-fan
Copy link
Contributor

LGTM

buildConf("spark.sql.parquet.filterPushdown.timestamp")
.doc("If true, enables Parquet filter push-down optimization for Timestamp. " +
"This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is " +
"enabled and Timestamp stored as TIMESTAMP_MICROS or TIMESTAMP_MILLIS type.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shell we note INT64 here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think end users have a better understanding of TIMESTAMP_MICROS and TIMESTAMP_MILLIS.

Copy link
Member

@HyukjinKwon HyukjinKwon Jul 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... I don't think ordinary users will understand any of them ..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to explain how to use spark.sql.parquet.outputTimestampType to control the Parquet timestamp type Spark uses to writes parquet files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just note that push-down doesn't work for INT96 timestamps in the file. It should work for the others.

@@ -517,7 +585,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would revert this change if you are going to push more changes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM too

@rdblue
Copy link
Contributor

rdblue commented Jul 13, 2018

+1

# Conflicts:
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
#	sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@SparkQA
Copy link

SparkQA commented Jul 14, 2018

Test build #93006 has finished for PR 21741 at commit f206457.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class SerializableConfiguration(@transient var value: Configuration)
  • class IncompatibleSchemaException(msg: String, ex: Throwable = null) extends Exception(msg, ex)
  • case class SchemaType(dataType: DataType, nullable: Boolean)
  • implicit class AvroDataFrameWriter[T](writer: DataFrameWriter[T])
  • implicit class AvroDataFrameReader(reader: DataFrameReader)
  • class KMeansModel (@Since(\"1.0.0\") val clusterCenters: Array[Vector],
  • abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast
  • case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike

@HyukjinKwon
Copy link
Member

Merged to master.

@asfgit asfgit closed this in 43e4e85 Jul 15, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants