Skip to content

Commit

Permalink
Enable using spark-submit to convert from csv to parquet (NVIDIA#565)
Browse files Browse the repository at this point in the history
Signed-off-by: Andy Grove <[email protected]>
  • Loading branch information
andygrove authored Aug 18, 2020
1 parent 13b2701 commit 6624e88
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,17 @@ object TpcdsLikeBench extends Logging {
}

for (i <- 0 until numColdRuns) {
println(s"Cold run $i took ${coldRunElapsed(i)} msec.")
println(s"Cold run $i for query $query took ${coldRunElapsed(i)} msec.")
}
println(s"Average cold run took ${coldRunElapsed.sum.toDouble/numColdRuns} msec.")

for (i <- 0 until numHotRuns) {
println(s"Hot run $i took ${hotRunElapsed(i)} msec.")
println(s"Hot run $i for query $query took ${hotRunElapsed(i)} msec.")
}
println(s"Average hot run took ${hotRunElapsed.sum.toDouble/numHotRuns} msec.")
println(s"Query $query: " +
s"best: ${hotRunElapsed.min} msec; " +
s"worst: ${hotRunElapsed.max} msec; " +
s"average: ${hotRunElapsed.sum.toDouble/numHotRuns} msec.")
}

def main(args: Array[String]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,31 @@ case class Table(

private def setupWrite(
spark: SparkSession,
inputBase: String): DataFrameWriter[Row] = {
inputBase: String,
writePartitioning: Boolean): DataFrameWriter[Row] = {
val tmp = readCSV(spark, inputBase)
.write
.mode("overwrite")
tmp
// Disabling partitioning on writes. The original databricks code has
// partitioning enabled, but for our data sizes it does not help
// We can possibly add it back in for large scale factors.
// if (partitionColumns.isEmpty) {
// tmp
// } else {
// tmp.partitionBy(partitionColumns: _*)
// }
if (writePartitioning && partitionColumns.nonEmpty) {
tmp.partitionBy(partitionColumns: _*)
} else {
tmp
}
}

def csvToParquet(spark: SparkSession, inputBase: String, outputBase: String): Unit =
setupWrite(spark, inputBase).parquet(path(outputBase))
def csvToParquet(
spark: SparkSession,
inputBase: String,
outputBase: String,
writePartitioning: Boolean): Unit =
setupWrite(spark, inputBase, writePartitioning).parquet(path(outputBase))

def csvToOrc(spark: SparkSession, inputBase: String, outputBase: String): Unit =
setupWrite(spark, inputBase).orc(path(outputBase))
def csvToOrc(
spark: SparkSession,
inputBase: String,
outputBase: String,
writePartitioning: Boolean): Unit =
setupWrite(spark, inputBase, writePartitioning).orc(path(outputBase))
}

case class Query(name: String, query: String) {
Expand All @@ -79,12 +84,43 @@ case class Query(name: String, query: String) {
*/
object TpcdsLikeSpark {

def csvToParquet(spark: SparkSession, baseInput: String, baseOutput: String): Unit = {
tables.foreach(_.csvToParquet(spark, baseInput, baseOutput))
/**
* Main method allows us to submit using spark-submit to perform conversions from CSV to
* Parquet or Orc.
*/
def main(arg: Array[String]): Unit = {
val baseInput = arg(0)
val baseOutput = arg(1)
val targetFileType = arg(2)
val withPartitioning = if (arg.length > 3) {
arg(3).toBoolean
} else {
false
}

val spark = SparkSession.builder.appName("TPC-DS Like File Conversion").getOrCreate()

targetFileType match {
case "parquet" => csvToParquet(spark, baseInput, baseOutput, withPartitioning)
case "orc" => csvToOrc(spark, baseInput, baseOutput, withPartitioning)
}

}

def csvToOrc(spark: SparkSession, baseInput: String, baseOutput: String): Unit = {
tables.foreach(_.csvToOrc(spark, baseInput, baseOutput))
def csvToParquet(
spark: SparkSession,
baseInput: String,
baseOutput: String,
writePartitioning: Boolean = false): Unit = {
tables.foreach(_.csvToParquet(spark, baseInput, baseOutput, writePartitioning))
}

def csvToOrc(
spark: SparkSession,
baseInput: String,
baseOutput: String,
writePartitioning: Boolean = false): Unit = {
tables.foreach(_.csvToOrc(spark, baseInput, baseOutput, writePartitioning))
}

def setupAllCSV(spark: SparkSession, basePath: String): Unit = {
Expand Down

0 comments on commit 6624e88

Please sign in to comment.