Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qualification tool: Error handling while processing large event logs #3714

Merged
merged 9 commits into from
Oct 8, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.util.concurrent.{ConcurrentLinkedQueue, Executors, ThreadPoolExecuto

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.control.NonFatal

import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.nvidia.spark.rapids.tool.{EventLogInfo, EventLogPathProcessor}
Expand Down Expand Up @@ -106,14 +107,32 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs) extends Logging
}
}

private def errorHandler(error: Throwable, path: EventLogInfo) = {
error match {
case oom: OutOfMemoryError =>
logError(s"OOM error while processing large file: ${path.eventLog.toString}." +
s" Increase heap size. Exiting ...", oom)
sys.exit(1)
case NonFatal(e) =>
logWarning(s"Exception occurred processing file: ${path.eventLog.getName}", e)
case o =>
Copy link
Collaborator

@gerashegalov gerashegalov Oct 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: there is no warning in the function but if we ever move it back inside a catch will get a warning like #3743.

Suggested change
case o =>
case o: Throwable =>

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! updated it.

logError(s"Error occurred while processing file: ${path.eventLog.toString}. Exiting ...", o)
sys.exit(1)
}
}

private def createApps(allPaths: Seq[EventLogInfo]): Seq[ApplicationInfo] = {
var errorCodes = ArrayBuffer[Int]()
val allApps = new ConcurrentLinkedQueue[ApplicationInfo]()

class ProfileThread(path: EventLogInfo, index: Int) extends Runnable {
def run: Unit = {
val appOpt = createApp(path, numOutputRows, index, hadoopConf)
appOpt.foreach(app => allApps.add(app))
try {
val appOpt = createApp(path, numOutputRows, index, hadoopConf)
appOpt.foreach(app => allApps.add(app))
} catch {
case t: Throwable => errorHandler(t, path)
}
}
}

Expand Down Expand Up @@ -145,18 +164,21 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs) extends Logging

class ProfileThread(path: EventLogInfo, index: Int) extends Runnable {
def run: Unit = {
val appOpt = createApp(path, numOutputRows, index, hadoopConf)
appOpt.foreach { app =>
val sum = try {
val (s, _) = processApps(Seq(app), false, profileOutputWriter)
Some(s)
} catch {
case e: Exception =>
logWarning(s"Unexpected exception thrown ${path.eventLog.toString}, skipping! ", e)
None

try {
val appOpt = createApp(path, numOutputRows, index, hadoopConf)
appOpt.foreach { app =>
val sum = try {
val (s, _) = processApps(Seq(app), false, profileOutputWriter)
Some(s)
} catch {
case e: Exception =>
logWarning(s"Unexpected exception thrown ${path.eventLog.toString}, skipping! ", e)
None
}
sum.foreach(allApps.add(_))
}
sum.foreach(allApps.add(_))
} catch {
case t: Throwable => errorHandler(t, path)
}
}
}
Expand Down Expand Up @@ -186,7 +208,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs) extends Logging
class ProfileProcessThread(path: EventLogInfo, index: Int) extends Runnable {
def run: Unit = {
try {
// we just skip apps that don't process cleanly
// we just skip apps that don't process cleanly and exit if heap is smaller
val appOpt = createApp(path, numOutputRows, index, hadoopConf)
appOpt match {
case Some(app) =>
Expand All @@ -204,8 +226,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs) extends Logging
logInfo("No application to process. Exiting")
}
} catch {
case e: Exception =>
logWarning(s"Exception occurred processing file: ${path.eventLog.getName}", e)
case t: Throwable => errorHandler(t, path)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,15 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration,
}
}
} catch {
case oom: OutOfMemoryError =>
logError(s"OOM error while processing large file: ${path.eventLog.toString}." +
s"Increase heap size.", oom)
System.exit(1)
case o: Error =>
logError(s"Error occured while processing file: ${path.eventLog.toString}", o)
System.exit(1)
case e: Exception =>
logError(s"Unexpected exception processing log ${path.eventLog.toString}, skipping!", e)
logWarning(s"Unexpected exception processing log ${path.eventLog.toString}, skipping!", e)
}
}
}