Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle SparkRapidsBuildInfoEvent in GPU event logs #1203

Merged
merged 18 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import org.apache.spark.scheduler.SparkListenerEvent

tgravescs marked this conversation as resolved.
Show resolved Hide resolved
case class SparkRapidsBuildInfoEvent(
sparkRapidsBuildInfo: Map[String, String],
sparkRapidsJniBuildInfo: Map[String, String],
cudfBuildInfo: Map[String, String],
sparkRapidsPrivateBuildInfo: Map[String, String]
) extends SparkListenerEvent
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.nvidia.spark.rapids.tool.profiling

import com.nvidia.spark.rapids.SparkRapidsBuildInfoEvent
import com.nvidia.spark.rapids.tool.AppSummaryInfoBaseProvider

case class ApplicationSummaryInfo(
Expand Down Expand Up @@ -45,7 +46,8 @@ case class ApplicationSummaryInfo(
appLogPath: Seq[AppLogPathProfileResults],
ioMetrics: Seq[IOAnalysisProfileResult],
sysProps: Seq[RapidsPropertyProfileResult],
sqlCleanedAlignedIds: Seq[SQLCleanAndAlignIdsProfileResult])
sqlCleanedAlignedIds: Seq[SQLCleanAndAlignIdsProfileResult],
sparkRapidsBuildInfo: Seq[SparkRapidsBuildInfoEvent])

trait AppInfoPropertyGetter {
// returns all the properties (i.e., spark)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.nvidia.spark.rapids.tool.profiling

import scala.collection.mutable.{ArrayBuffer, HashMap}

import com.nvidia.spark.rapids.SparkRapidsBuildInfoEvent
import com.nvidia.spark.rapids.tool.ToolTextFileWriter
import com.nvidia.spark.rapids.tool.views._

Expand Down Expand Up @@ -103,6 +104,10 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging {
def getSQLCleanAndAligned: Seq[SQLCleanAndAlignIdsProfileResult] = {
ProfSQLPlanAlignedView.getRawView(apps)
}

def getSparkRapidsInfo: Seq[SparkRapidsBuildInfoEvent] = {
apps.map(_.sparkRapidsBuildInfo)
}
}

object CollectInformation extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
*/
package com.nvidia.spark.rapids.tool.profiling

import com.nvidia.spark.rapids.SparkRapidsBuildInfoEvent
import com.nvidia.spark.rapids.tool.ToolTextFileWriter
import org.apache.commons.lang3.StringUtils
import org.json4s.DefaultFormats
import org.json4s.jackson.Serialization


class ProfileOutputWriter(outputDir: String, filePrefix: String, numOutputRows: Int,
outputCSV: Boolean = false) {

implicit val formats: DefaultFormats.type = DefaultFormats

private val textFileWriter = new ToolTextFileWriter(outputDir,
s"$filePrefix.log", "Profile summary")

Expand Down Expand Up @@ -50,6 +55,17 @@ class ProfileOutputWriter(outputDir: String, filePrefix: String, numOutputRows:
}
}

def writeSparkRapidsBuildInfo(headerText: String,
sparkRapidsBuildInfo: Seq[SparkRapidsBuildInfoEvent]): Unit = {
val fileName = headerText.replace(" ", "_").toLowerCase
val jsonWriter = new ToolTextFileWriter(outputDir, s"${fileName}.json", s"$headerText JSON:")
try {
jsonWriter.write(Serialization.writePretty(sparkRapidsBuildInfo) + "\n")
} finally {
jsonWriter.close()
}
}

def write(headerText: String, outRows: Seq[ProfileResult],
emptyTableText: Option[String] = None, tableDesc: Option[String] = None): Unit = {
writeTextTable(headerText, outRows, emptyTableText, tableDesc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.rapids.tool.util._
class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolean)
extends ToolBase(appArgs.timeout.toOption) {

override val simpleName: String = "profileTool"
override val simpleName: String = "profileTool"
override val outputDir: String = appArgs.outputDirectory().stripSuffix("/") +
s"/${Profiler.SUBDIR}"
private val numOutputRows = appArgs.numOutputRows.getOrElse(1000)
Expand Down Expand Up @@ -324,6 +324,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
val dsInfo = collect.getDataSourceInfo(sqlMetrics)
val stageMetrics = collect.getStageLevelMetrics
val wholeStage = collect.getWholeStageCodeGenMapping
val sparkRapidsBuildInfo = collect.getSparkRapidsInfo
// for compare mode we just add in extra tables for matching across applications
// the rest of the tables simply list all applications specified
val compareRes = if (appArgs.compare()) {
Expand Down Expand Up @@ -390,7 +391,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
analysis.sqlAggs, analysis.sqlDurAggs, analysis.taskShuffleSkew,
failedTasks, failedStages, failedJobs, removedBMs, removedExecutors,
unsupportedOps, sparkProps, collect.getSQLToStage, wholeStage, maxTaskInputInfo,
appLogPath, analysis.ioAggs, systemProps, sqlIdAlign), compareRes)
appLogPath, analysis.ioAggs, systemProps, sqlIdAlign, sparkRapidsBuildInfo), compareRes)
}

/**
Expand Down Expand Up @@ -485,7 +486,8 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
appsSum.flatMap(_.appLogPath).sortBy(_.appIndex),
appsSum.flatMap(_.ioMetrics).sortBy(_.appIndex),
combineProps("system", appsSum).sortBy(_.key),
appsSum.flatMap(_.sqlCleanedAlignedIds).sortBy(_.appIndex)
appsSum.flatMap(_.sqlCleanedAlignedIds).sortBy(_.appIndex),
appsSum.flatMap(_.sparkRapidsBuildInfo)
)
Seq(reduced)
} else {
Expand Down Expand Up @@ -550,6 +552,9 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
profileOutputWriter.writeText("\n### D. Recommended Configuration ###\n")
profileOutputWriter.writeText(Profiler.getAutoTunerResultsAsString(properties, comments))
}

profileOutputWriter.writeSparkRapidsBuildInfo("Spark Rapids Build Info",
app.sparkRapidsBuildInfo)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package org.apache.spark.sql.rapids.tool
import java.io.InputStream
import java.util.zip.GZIPInputStream

import scala.collection.immutable
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, LinkedHashSet, Map, SortedMap}

import com.nvidia.spark.rapids.SparkRapidsBuildInfoEvent
import com.nvidia.spark.rapids.tool.{DatabricksEventLog, DatabricksRollingEventLogFilesFileReader, EventLogInfo}
import com.nvidia.spark.rapids.tool.planparser.{HiveParseHelper, ReadParser}
import com.nvidia.spark.rapids.tool.planparser.HiveParseHelper.isHiveTableScanNode
Expand Down Expand Up @@ -89,6 +91,9 @@ abstract class AppBase(
var driverAccumMap: HashMap[Long, ArrayBuffer[DriverAccumCase]] =
HashMap[Long, ArrayBuffer[DriverAccumCase]]()

var sparkRapidsBuildInfo: SparkRapidsBuildInfoEvent = SparkRapidsBuildInfoEvent(immutable.Map(),
immutable.Map(), immutable.Map(), immutable.Map())

// Returns the String value of the eventlog or empty if it is not defined. Note that the eventlog
// won't be defined for running applications
def getEventLogPath: String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.rapids.tool
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

import com.nvidia.spark.rapids.SparkRapidsBuildInfoEvent
import com.nvidia.spark.rapids.tool.profiling.{BlockManagerRemovedCase, DriverAccumCase, JobInfoClass, ProfileUtils, ResourceProfileInfoCase, SQLExecutionInfoClass, SQLPlanMetricsCase}

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -96,6 +97,9 @@ abstract class EventProcessorBase[T <: AppBase](app: T) extends SparkListener wi
case _: StreamingQueryListener.QueryTerminatedEvent =>
doSparkListenerStreamingQuery(app,
event.asInstanceOf[StreamingQueryListener.QueryTerminatedEvent])
case _: SparkRapidsBuildInfoEvent =>
doSparkRapidsBuildInfoEvent(app,
event.asInstanceOf[SparkRapidsBuildInfoEvent])
case _ =>
val wasResourceProfileAddedEvent = doSparkListenerResourceProfileAddedReflect(app, event)
if (!wasResourceProfileAddedEvent) doOtherEvent(app, event)
Expand Down Expand Up @@ -194,6 +198,13 @@ abstract class EventProcessorBase[T <: AppBase](app: T) extends SparkListener wi
app.sqlPlanMetricsAdaptive ++= metrics
}

def doSparkRapidsBuildInfoEvent(
app: T,
event: SparkRapidsBuildInfoEvent): Unit = {
logDebug("Processing event: " + event.getClass)
app.sparkRapidsBuildInfo = event
}

override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
case e: SparkListenerSQLExecutionStart =>
doSparkListenerSQLExecutionStart(app, e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ object ToolUtils extends Logging {
org.apache.spark.SPARK_VERSION
}

var sparkRapidsRuntimeVersion: Option[String] = None
tgravescs marked this conversation as resolved.
Show resolved Hide resolved

var jniRuntimeVersion: Option[String] = None

var cudfRuntimeVersion: Option[String] = None

def compareVersions(verA: String, verB: String): Int = {
Try {
val verObjA = new ComparableVersion(verA)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ object EventUtils extends Logging {
(line: String) => Try {
runtimeEventFromJsonMethod.apply(line)
} match {
case Success(i) => Some(i)
case Success(i) =>
Some(i)
case Failure(e) =>
e match {
case i: InvocationTargetException =>
Expand All @@ -212,7 +213,7 @@ object EventUtils extends Logging {
// malformed
handleEventJsonParseEx(k)
case z: ClassNotFoundException if z.getMessage != null =>
// Avoid reporting missing classes more than once to reduce the noise in the logs
// Avoid reporting missing classes more than once to reduce noise in the logs
reportMissingEventClass(z.getMessage)
case t: Throwable =>
// We do not want to swallow unknown exceptions so that we can handle later
Expand Down