Skip to content

Commit

Permalink
use spark benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenliang123 committed Dec 4, 2023
1 parent 57285b4 commit 9e863c2
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 114 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
OpenJDK 64-Bit Server VM 1.8.0_392-b08 on Mac OS X 14.1.2
Intel(R) Core(TM) i5-8257U CPU @ 1.40GHz
TRowSet toHiveString benchmark: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
column-based boolVal benchmark 0 0 0 43.7 22.9 1.0X
column-based byteVal benchmark 0 0 0 72.6 13.8 1.7X
column-based shortVal benchmark 0 0 0 71.7 14.0 1.6X
column-based value benchmark 0 0 0 72.1 13.9 1.6X
column-based longVal benchmark 0 0 0 54.5 18.3 1.2X
column-based floatVal benchmark 1 1 0 3.3 301.5 0.1X
column-based doubleVal benchmark 0 0 0 54.1 18.5 1.2X
column-based stringVal benchmark 0 0 0 55.3 18.1 1.3X
column-based decimalVal benchmark 0 0 0 9.5 105.7 0.2X
column-based dateVal benchmark 1 1 0 3.1 318.6 0.1X
column-based timestampVal benchmark 1 1 0 4.3 233.4 0.1X
column-based binaryVal benchmark 0 0 0 12.5 79.9 0.3X
column-based arrVal benchmark 10 11 0 0.3 3458.3 0.0X
column-based mapVal benchmark 2 2 0 1.7 571.4 0.0X
column-based interval benchmark 1 1 1 3.4 293.7 0.1X
column-based localDate benchmark 1 1 0 4.9 202.7 0.1X
column-based instant benchmark 2 2 0 1.6 633.1 0.0X

7 changes: 7 additions & 0 deletions externals/kyuubi-spark-sql-engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-repl_${scala.binary.version}</artifactId>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.kyuubi

import scala.concurrent.duration._

import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.hive.service.rpc.thrift.TProtocolVersion._
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.kyuubi.benchmark.KyuubiBenchmarkBase
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.HiveResult
import org.apache.spark.sql.types._

import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.engine.spark.schema.{RowSet, RowSetHelper}

/**
* Benchmark to measure the performance of generate TRowSet.
*
* {{{
* RUN_BENCHMARK=1 ./build/mvn clean test \
* -pl externals/kyuubi-spark-sql-engine -am \
* -Dtest=none -DwildcardSuites=org.apache.kyuubi.engine.spark.schema.TRowSetBenchmark
* }}}
*/
class TRowSetBenchmark extends KyuubiFunSuite with RowSetHelper with KyuubiBenchmarkBase {
private val runBenchmark = sys.env.contains("RUN_BENCHMARK") || true

private val rowCount = 3000
private lazy val allRows = (0 until rowCount).map(genRow)

test("row-based toTRowSet benchmark") {
assume(runBenchmark)
withHeader {
tRowSetGenerationBenchmark(HIVE_CLI_SERVICE_PROTOCOL_V5)
}
}

test("column-based toTRowSet benchmark") {
assume(runBenchmark)
withHeader {
tRowSetGenerationBenchmark(HIVE_CLI_SERVICE_PROTOCOL_V6)
}
}

private def tRowSetGenerationBenchmark(protocolVersion: TProtocolVersion): Unit = {
val rowSetType = if (protocolVersion.getValue < HIVE_CLI_SERVICE_PROTOCOL_V6.getValue) {
"row-based"
} else {
"column-based"
}

val benchmark =
new Benchmark(
s"TRowSet toHiveString benchmark",
rowCount,
minTime = 30.seconds,
output = output)

schemaStructFields.zipWithIndex.foreach {
case (field, idx) =>
val rowsOfSingleType = allRows.map(row => Row(row.get(idx)))
val schemaOfSingleType = StructType(Seq(field))

val commentOrName = field.getComment().getOrElse(field.dataType.typeName)
benchmark.addCase(s"$rowSetType $commentOrName benchmark", 10) {
_ =>
benchmarkToTRowSet(
commentOrName,
rowsOfSingleType,
schemaOfSingleType,
protocolVersion)
}
}
benchmark.run()

// val totalMs = schemaStructFields.zipWithIndex.map { case (field, idx) =>
// // run benchmark with rows of single column with one data type
// val rowsOfSingleType = allRows.map(row => Row(row.get(idx)))
// val schemaOfSingleType = StructType(Seq(field))
// benchmarkToTRowSet(
// field.getComment().getOrElse(field.dataType.typeName),
// rowsOfSingleType,
// schemaOfSingleType,
// protocolVersion)
//
// benchmark.addCase("toHiveString benchmark", 3) { _ =>
// rowsOfSingleType.map(row => RowSet.toHiveString((row.get(0), field.dataType)))
// }
// }.sum
// val totalRowsPerMs: BigDecimal = (BigDecimal(rowCount) / totalMs)
// .setScale(3, RoundingMode.HALF_UP)
// // scalastyle:off
// println()
// printf("%20s %20s %20s\n", "sum(all types)", s"$totalMs ms", s"$totalRowsPerMs rows/ms")
//
// // run benchmark with rows of columns with all data types
// benchmarkToTRowSet("with all types", allRows, schema, protocolVersion)
//
// println()
// println()
// // scalastyle:on
}

private def benchmarkToTRowSet(
clue: String,
rows: Seq[Row],
schema: StructType,
protocolVersion: TProtocolVersion): Unit = {
val timeFormatters = HiveResult.getTimeFormatters
RowSet.toTRowSet(rows, schema, protocolVersion, timeFormatters)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.kyuubi.benchmark

import java.io.{File, FileOutputStream, OutputStream}

import scala.collection.JavaConverters._

import com.google.common.reflect.ClassPath
import org.scalatest.Assertions._

trait KyuubiBenchmarkBase {
var output: Option[OutputStream] = None

private val prefix = {
val benchmarkClasses = ClassPath.from(Thread.currentThread.getContextClassLoader)
.getTopLevelClassesRecursive("org.apache.spark.kyuubi").asScala.toArray
assert(benchmarkClasses.nonEmpty)
val benchmark = benchmarkClasses.find(_.load().getName.endsWith("Benchmark"))
val targetDirOrProjDir =
new File(benchmark.get.load().getProtectionDomain.getCodeSource.getLocation.toURI)
.getParentFile.getParentFile
if (targetDirOrProjDir.getName == "target") {
targetDirOrProjDir.getParentFile.getCanonicalPath + "/"
} else {
targetDirOrProjDir.getCanonicalPath + "/"
}
}

def withHeader(func: => Unit): Unit = {
val version = System.getProperty("java.version").split("\\D+")(0).toInt
val jdkString = if (version > 8) s"-jdk$version" else ""
val resultFileName =
s"${this.getClass.getSimpleName.replace("$", "")}$jdkString-results.txt"
val dir = new File(s"${prefix}benchmarks/")
if (!dir.exists()) {
// scalastyle:off println
println(s"Creating ${dir.getAbsolutePath} for benchmark results.")
// scalastyle:on println
dir.mkdirs()
}
val file = new File(dir, resultFileName)
if (!file.exists()) {
file.createNewFile()
}
output = Some(new FileOutputStream(file))

func

output.foreach { o =>
if (o != null) {
o.close()
}
}
}
}

0 comments on commit 9e863c2

Please sign in to comment.