Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor improvements to TPC-DS benchmarking code #565

Merged
merged 1 commit into from
Aug 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,17 @@ object TpcdsLikeBench extends Logging {
}

for (i <- 0 until numColdRuns) {
println(s"Cold run $i took ${coldRunElapsed(i)} msec.")
println(s"Cold run $i for query $query took ${coldRunElapsed(i)} msec.")
}
println(s"Average cold run took ${coldRunElapsed.sum.toDouble/numColdRuns} msec.")

for (i <- 0 until numHotRuns) {
println(s"Hot run $i took ${hotRunElapsed(i)} msec.")
println(s"Hot run $i for query $query took ${hotRunElapsed(i)} msec.")
}
println(s"Average hot run took ${hotRunElapsed.sum.toDouble/numHotRuns} msec.")
println(s"Query $query: " +
s"best: ${hotRunElapsed.min} msec; " +
s"worst: ${hotRunElapsed.max} msec; " +
s"average: ${hotRunElapsed.sum.toDouble/numHotRuns} msec.")
}

def main(args: Array[String]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,31 @@ case class Table(

private def setupWrite(
spark: SparkSession,
inputBase: String): DataFrameWriter[Row] = {
inputBase: String,
writePartitioning: Boolean): DataFrameWriter[Row] = {
val tmp = readCSV(spark, inputBase)
.write
.mode("overwrite")
tmp
// Disabling partitioning on writes. The original databricks code has
// partitioning enabled, but for our data sizes it does not help
// We can possibly add it back in for large scale factors.
// if (partitionColumns.isEmpty) {
// tmp
// } else {
// tmp.partitionBy(partitionColumns: _*)
// }
if (writePartitioning && partitionColumns.nonEmpty) {
tmp.partitionBy(partitionColumns: _*)
} else {
tmp
}
}

def csvToParquet(spark: SparkSession, inputBase: String, outputBase: String): Unit =
setupWrite(spark, inputBase).parquet(path(outputBase))
def csvToParquet(
spark: SparkSession,
inputBase: String,
outputBase: String,
writePartitioning: Boolean): Unit =
setupWrite(spark, inputBase, writePartitioning).parquet(path(outputBase))

def csvToOrc(spark: SparkSession, inputBase: String, outputBase: String): Unit =
setupWrite(spark, inputBase).orc(path(outputBase))
def csvToOrc(
spark: SparkSession,
inputBase: String,
outputBase: String,
writePartitioning: Boolean): Unit =
setupWrite(spark, inputBase, writePartitioning).orc(path(outputBase))
}

case class Query(name: String, query: String) {
Expand All @@ -79,12 +84,43 @@ case class Query(name: String, query: String) {
*/
object TpcdsLikeSpark {

def csvToParquet(spark: SparkSession, baseInput: String, baseOutput: String): Unit = {
tables.foreach(_.csvToParquet(spark, baseInput, baseOutput))
/**
* Main method allows us to submit using spark-submit to perform conversions from CSV to
* Parquet or Orc.
*/
def main(arg: Array[String]): Unit = {
val baseInput = arg(0)
val baseOutput = arg(1)
val targetFileType = arg(2)
val withPartitioning = if (arg.length > 3) {
arg(3).toBoolean
} else {
false
}

val spark = SparkSession.builder.appName("TPC-DS Like File Conversion").getOrCreate()

targetFileType match {
case "parquet" => csvToParquet(spark, baseInput, baseOutput, withPartitioning)
case "orc" => csvToOrc(spark, baseInput, baseOutput, withPartitioning)
}

}

def csvToOrc(spark: SparkSession, baseInput: String, baseOutput: String): Unit = {
tables.foreach(_.csvToOrc(spark, baseInput, baseOutput))
def csvToParquet(
spark: SparkSession,
baseInput: String,
baseOutput: String,
writePartitioning: Boolean = false): Unit = {
tables.foreach(_.csvToParquet(spark, baseInput, baseOutput, writePartitioning))
}

def csvToOrc(
spark: SparkSession,
baseInput: String,
baseOutput: String,
writePartitioning: Boolean = false): Unit = {
tables.foreach(_.csvToOrc(spark, baseInput, baseOutput, writePartitioning))
}

def setupAllCSV(spark: SparkSession, basePath: String): Unit = {
Expand Down