Skip to content

Commit

Permalink
[SPARK-31710][SQL][FOLLOWUP] Replace CAST by TIMESTAMP_SECONDS in ben…
Browse files Browse the repository at this point in the history
…chmarks

### What changes were proposed in this pull request?
Replace `CAST(... AS TIMESTAMP` by `TIMESTAMP_SECONDS` in the following benchmarks:
- ExtractBenchmark
- DateTimeBenchmark
- FilterPushdownBenchmark
- InExpressionBenchmark

### Why are the changes needed?
The benchmarks fail w/o the changes:
```
[info] Running benchmark: datetime +/- interval
[info]   Running case: date + interval(m)
[error] Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'CAST(`id` AS TIMESTAMP)' due to data type mismatch: cannot cast bigint to timestamp,you can enable the casting by setting spark.sql.legacy.allowCastNumericToTimestamp to true,but we strongly recommend using function TIMESTAMP_SECONDS/TIMESTAMP_MILLIS/TIMESTAMP_MICROS instead.; line 1 pos 5;
[error] 'Project [(cast(cast(id#0L as timestamp) as date) + 1 months) AS (CAST(CAST(id AS TIMESTAMP) AS DATE) + INTERVAL '1 months')#2]
[error] +- Range (0, 10000000, step=1, splits=Some(1))
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the affected benchmarks.

Closes apache#28843 from MaxGekk/GuoPhilipse-31710-fix-compatibility-followup.

Authored-by: Max Gekk <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
MaxGekk authored and cloud-fan committed Jun 16, 2020
1 parent 6e9ff72 commit 3643565
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object DateTimeBenchmark extends SqlBasedBenchmark {

private def run(cardinality: Int, func: String): Unit = {
codegenBenchmark(s"$func of timestamp", cardinality) {
doBenchmark(cardinality, s"$func(cast(id as timestamp))")
doBenchmark(cardinality, s"$func(timestamp_seconds(id))")
}
}

Expand All @@ -64,7 +64,7 @@ object DateTimeBenchmark extends SqlBasedBenchmark {
val N = 10000000
runBenchmark("datetime +/- interval") {
val benchmark = new Benchmark("datetime +/- interval", N, output = output)
val ts = "cast(id as timestamp)"
val ts = "timestamp_seconds(id)"
val dt = s"cast($ts as date)"
benchmark.addCase("date + interval(m)") { _ =>
doBenchmark(N, s"$dt + interval 1 month")
Expand Down Expand Up @@ -105,7 +105,7 @@ object DateTimeBenchmark extends SqlBasedBenchmark {
benchmark.run()
}
runBenchmark("Extract components") {
run(N, "cast to timestamp", "cast(id as timestamp)")
run(N, "cast to timestamp", "timestamp_seconds(id)")
run(N, "year")
run(N, "quarter")
run(N, "month")
Expand All @@ -124,7 +124,7 @@ object DateTimeBenchmark extends SqlBasedBenchmark {
run(N, "current_timestamp", "current_timestamp")
}
runBenchmark("Date arithmetic") {
val dateExpr = "cast(cast(id as timestamp) as date)"
val dateExpr = "cast(timestamp_seconds(id) as date)"
run(N, "cast to date", dateExpr)
run(N, "last_day", s"last_day($dateExpr)")
run(N, "next_day", s"next_day($dateExpr, 'TU')")
Expand All @@ -133,31 +133,31 @@ object DateTimeBenchmark extends SqlBasedBenchmark {
run(N, "add_months", s"add_months($dateExpr, 10)")
}
runBenchmark("Formatting dates") {
val dateExpr = "cast(cast(id as timestamp) as date)"
val dateExpr = "cast(timestamp_seconds(id) as date)"
run(N, "format date", s"date_format($dateExpr, 'MMM yyyy')")
}
runBenchmark("Formatting timestamps") {
run(N, "from_unixtime", "from_unixtime(id, 'yyyy-MM-dd HH:mm:ss.SSSSSS')")
}
runBenchmark("Convert timestamps") {
val timestampExpr = "cast(id as timestamp)"
val timestampExpr = "timestamp_seconds(id)"
run(N, "from_utc_timestamp", s"from_utc_timestamp($timestampExpr, 'CET')")
run(N, "to_utc_timestamp", s"to_utc_timestamp($timestampExpr, 'CET')")
}
runBenchmark("Intervals") {
val (start, end) = ("cast(id as timestamp)", "cast((id+8640000) as timestamp)")
val (start, end) = ("timestamp_seconds(id)", "timestamp_seconds(id+8640000)")
run(N, "cast interval", start, end)
run(N, "datediff", s"datediff($start, $end)")
run(N, "months_between", s"months_between($start, $end)")
run(1000000, "window", s"window($start, 100, 10, 1)")
}
runBenchmark("Truncation") {
val timestampExpr = "cast(id as timestamp)"
val timestampExpr = "timestamp_seconds(id)"
Seq("YEAR", "YYYY", "YY", "MON", "MONTH", "MM", "DAY", "DD", "HOUR", "MINUTE",
"SECOND", "WEEK", "QUARTER").foreach { level =>
run(N, s"date_trunc $level", s"date_trunc('$level', $timestampExpr)")
}
val dateExpr = "cast(cast(id as timestamp) as date)"
val dateExpr = "cast(timestamp_seconds(id) as date)"
Seq("year", "yyyy", "yy", "mon", "month", "mm").foreach { level =>
run(N, s"trunc $level", s"trunc('$level', $dateExpr)")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ object ExtractBenchmark extends SqlBasedBenchmark {
}

private def castExpr(from: String): String = from match {
case "timestamp" => "cast(id as timestamp)"
case "date" => "cast(cast(id as timestamp) as date)"
case "interval" => "(cast(cast(id as timestamp) as date) - date'0001-01-01') + " +
"(cast(id as timestamp) - timestamp'1000-01-01 01:02:03.123456')"
case "timestamp" => "timestamp_seconds(id)"
case "date" => "cast(timestamp_seconds(id) as date)"
case "interval" => "(cast(timestamp_seconds(id) as date) - date'0001-01-01') + " +
"(timestamp_seconds(id) - timestamp'1000-01-01 01:02:03.123456')"
case other => throw new IllegalArgumentException(
s"Unsupported column type $other. Valid column types are 'timestamp' and 'date'")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.util.Random
import org.apache.spark.SparkConf
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.monotonically_increasing_id
import org.apache.spark.sql.functions.{monotonically_increasing_id, timestamp_seconds}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
import org.apache.spark.sql.types.{ByteType, Decimal, DecimalType, TimestampType}
Expand Down Expand Up @@ -332,11 +332,11 @@ object FilterPushdownBenchmark extends SqlBasedBenchmark {
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> fileType) {
val columns = (1 to width).map(i => s"CAST(id AS string) c$i")
val df = spark.range(numRows).selectExpr(columns: _*)
.withColumn("value", monotonically_increasing_id().cast(TimestampType))
.withColumn("value", timestamp_seconds(monotonically_increasing_id()))
withTempTable("orcTable", "parquetTable") {
saveAsTable(df, dir)

Seq(s"value = CAST($mid AS timestamp)").foreach { whereExpr =>
Seq(s"value = timestamp_seconds($mid)").foreach { whereExpr =>
val title = s"Select 1 timestamp stored as $fileType row ($whereExpr)"
.replace("value AND value", "value")
filterPushDownBenchmark(numRows, title, whereExpr)
Expand All @@ -348,8 +348,8 @@ object FilterPushdownBenchmark extends SqlBasedBenchmark {
filterPushDownBenchmark(
numRows,
s"Select $percent% timestamp stored as $fileType rows " +
s"(value < CAST(${numRows * percent / 100} AS timestamp))",
s"value < CAST(${numRows * percent / 100} as timestamp)",
s"(value < timestamp_seconds(${numRows * percent / 100}))",
s"value < timestamp_seconds(${numRows * percent / 100})",
selectExpr
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.benchmark

import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{array, struct}
import org.apache.spark.sql.functions.{array, struct, timestamp_seconds}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -128,15 +128,15 @@ object InExpressionBenchmark extends SqlBasedBenchmark {

private def runTimestampBenchmark(numItems: Int, numRows: Long, minNumIters: Int): Unit = {
val name = s"$numItems timestamps"
val values = (1 to numItems).map(m => s"CAST('1970-01-01 01:00:00.$m' AS timestamp)")
val df = spark.range(0, numRows).select($"id".cast(TimestampType))
val values = (1 to numItems).map(m => s"timestamp'1970-01-01 01:00:00.$m'")
val df = spark.range(0, numRows).select(timestamp_seconds($"id").as("id"))
runBenchmark(name, df, values, numRows, minNumIters)
}

private def runDateBenchmark(numItems: Int, numRows: Long, minNumIters: Int): Unit = {
val name = s"$numItems dates"
val values = (1 to numItems).map(n => 1970 + n).map(y => s"CAST('$y-01-01' AS date)")
val df = spark.range(0, numRows).select($"id".cast(TimestampType).cast(DateType))
val values = (1 to numItems).map(n => 1970 + n).map(y => s"date'$y-01-01'")
val df = spark.range(0, numRows).select(timestamp_seconds($"id").cast(DateType).as("id"))
runBenchmark(name, df, values, numRows, minNumIters)
}

Expand Down

0 comments on commit 3643565

Please sign in to comment.