Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qualification tool: Add option to filter event logs for a maximum file system size #1275

Merged
merged 5 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ sealed trait EventLogInfo {
def eventLog: Path
}

case class EventLogFileSystemInfo(timestamp: Long, size: Long)

case class ApacheSparkEventLog(override val eventLog: Path) extends EventLogInfo
case class DatabricksEventLog(override val eventLog: Path) extends EventLogInfo

Expand Down Expand Up @@ -137,7 +139,7 @@ object EventLogPathProcessor extends Logging {
}

def getEventLogInfo(pathString: String,
hadoopConf: Configuration): Map[EventLogInfo, Option[Long]] = {
hadoopConf: Configuration): Map[EventLogInfo, Option[EventLogFileSystemInfo]] = {
val inputPath = new Path(pathString)
try {
// Note that some cloud storage APIs may throw FileNotFoundException when the pathPrefix
Expand All @@ -161,15 +163,19 @@ object EventLogPathProcessor extends Logging {
logWarning(msg)
// Return an empty map as this is a skip due to unsupported file type, not an exception.
// Returning FailedEventLog would clutter the status report with unnecessary entries.
Map.empty[EventLogInfo, Option[Long]]
Map.empty[EventLogInfo, Option[EventLogFileSystemInfo]]
} else if (fileStatus.isDirectory && isEventLogDir(fileStatus)) {
// either event logDir v2 directory or regular event log
val info = ApacheSparkEventLog(fileStatus.getPath).asInstanceOf[EventLogInfo]
Map(info -> Some(fileStatus.getModificationTime))
// TODO - need to handle size of files in directory, for now document its not supported
Map(info ->
Some(EventLogFileSystemInfo(fileStatus.getModificationTime, fileStatus.getLen)))
} else if (fileStatus.isDirectory &&
isDatabricksEventLogDir(fileStatus, fs)) {
val dbinfo = DatabricksEventLog(fileStatus.getPath).asInstanceOf[EventLogInfo]
Map(dbinfo -> Some(fileStatus.getModificationTime))
// TODO - need to handle size of files in directory, for now document its not supported
Map(dbinfo ->
Some(EventLogFileSystemInfo(fileStatus.getModificationTime, fileStatus.getLen)))
} else {
// assume either single event log or directory with event logs in it, we don't
// support nested dirs, so if event log dir within another one we skip it
Expand All @@ -194,10 +200,10 @@ object EventLogPathProcessor extends Logging {
logsSupported.map { s =>
if (s.isFile || (s.isDirectory && isEventLogDir(s.getPath().getName()))) {
(ApacheSparkEventLog(s.getPath).asInstanceOf[EventLogInfo]
-> Some(s.getModificationTime))
-> Some(EventLogFileSystemInfo(s.getModificationTime, s.getLen)))
} else {
(DatabricksEventLog(s.getPath).asInstanceOf[EventLogInfo]
-> Some(s.getModificationTime))
-> Some(EventLogFileSystemInfo(s.getModificationTime, s.getLen)))
}
}.toMap
}
Expand Down Expand Up @@ -227,7 +233,8 @@ object EventLogPathProcessor extends Logging {
filterNLogs: Option[String],
matchlogs: Option[String],
eventLogsPaths: List[String],
hadoopConf: Configuration): (Seq[EventLogInfo], Seq[EventLogInfo]) = {
hadoopConf: Configuration,
maxEventLogSize: Option[String] = None): (Seq[EventLogInfo], Seq[EventLogInfo]) = {
val logsPathNoWildCards = processWildcardsLogs(eventLogsPaths, hadoopConf)
val logsWithTimestamp = logsPathNoWildCards.flatMap {
case (rawPath, processedPaths) if processedPaths.isEmpty =>
Expand All @@ -244,24 +251,44 @@ object EventLogPathProcessor extends Logging {
logsWithTimestamp.filterKeys(_.eventLog.getName.contains(strMatch))
}.getOrElse(logsWithTimestamp)

val filteredLogs = if (filterNLogs.nonEmpty && !filterByAppCriteria(filterNLogs)) {
val filteredInfo = filterNLogs.get.split("-")
val numberofEventLogs = filteredInfo(0).toInt
val criteria = filteredInfo(1)
// Before filtering based on user criteria, remove the failed event logs
// (i.e. logs without timestamp) from the list.
val filteredLogs = if ((filterNLogs.nonEmpty && !filterByAppCriteria(filterNLogs)) ||
maxEventLogSize.isDefined) {
val validMatchedLogs = matchedLogs.collect {
case (info, Some(ts)) => info -> ts
}
val matched = if (criteria.equals("newest")) {
LinkedHashMap(validMatchedLogs.toSeq.sortWith(_._2 > _._2): _*)
} else if (criteria.equals("oldest")) {
LinkedHashMap(validMatchedLogs.toSeq.sortWith(_._2 < _._2): _*)
val filteredBySize = if (maxEventLogSize.isDefined) {
val maxSizeInBytes = if (StringUtils.isMemorySize(maxEventLogSize.get)) {
// if it is memory return the bytes unit
StringUtils.convertMemorySizeToBytes(maxEventLogSize.get)
} else {
// size is assumed to be mb
StringUtils.convertMemorySizeToBytes(maxEventLogSize.get + "m")
}
val (matched, filtered) = validMatchedLogs.partition(info => info._2.size <= maxSizeInBytes)
logInfo(s"Filtering eventlogs by size, max size is ${maxSizeInBytes}b. The logs filtered " +
s"out include: ${filtered.keys.map(_.eventLog.toString).mkString(",")}")
matched
} else {
validMatchedLogs
}
if (filterNLogs.nonEmpty && !filterByAppCriteria(filterNLogs)) {
val filteredInfo = filterNLogs.get.split("-")
val numberofEventLogs = filteredInfo(0).toInt
val criteria = filteredInfo(1)
// Before filtering based on user criteria, remove the failed event logs
// (i.e. logs without timestamp) from the list.
val matched = if (criteria.equals("newest")) {
LinkedHashMap(filteredBySize.toSeq.sortWith(_._2.timestamp > _._2.timestamp): _*)
} else if (criteria.equals("oldest")) {
LinkedHashMap(filteredBySize.toSeq.sortWith(_._2.timestamp < _._2.timestamp): _*)
} else {
logError("Criteria should be either newest-filesystem or oldest-filesystem")
Map.empty[EventLogInfo, Long]
}
matched.take(numberofEventLogs)
} else {
logError("Criteria should be either newest-filesystem or oldest-filesystem")
Map.empty[EventLogInfo, Long]
filteredBySize
}
matched.take(numberofEventLogs)
} else {
matchedLogs
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
descr = "Filter event logs whose application start occurred within the past specified " +
"time period. Valid time periods are min(minute),h(hours),d(days),w(weeks)," +
"m(months). If a period is not specified it defaults to days.")
val maxEventLogSize: ScallopOption[String] =
opt[String](required = false,
descr = "Process only application event logs whose size is less than or equal to the size " +
"specified. Valid units of size are b(bytes),k(kilobytes),m(megabytes),g(gigabytes). " +
"If no units are specified, the size is assumed to be m. Note, this does not support " +
"event log rolling which puts multiple event logs for the same application into a " +
"single directory.")
val matchEventLogs: ScallopOption[String] =
opt[String](required = false,
descr = "Filter event logs whose filenames contain the input string. Filesystem " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ object QualificationMain extends Logging {

val eventlogPaths = appArgs.eventlog()
val filterN = appArgs.filterCriteria
val maxEventLogSize = appArgs.maxEventLogSize.toOption
val matchEventLogs = appArgs.matchEventLogs
val outputDirectory = appArgs.outputDirectory().stripSuffix("/")
val numOutputRows = appArgs.numOutputRows.getOrElse(1000)
Expand Down Expand Up @@ -86,7 +87,7 @@ object QualificationMain extends Logging {
}

val (eventLogFsFiltered, allEventLogs) = EventLogPathProcessor.processAllPaths(
filterN.toOption, matchEventLogs.toOption, eventlogPaths, hadoopConf)
filterN.toOption, matchEventLogs.toOption, eventlogPaths, hadoopConf, maxEventLogSize)

val filteredLogs = if (argsContainsAppFilters(appArgs)) {
val appFilter = new AppFilterImpl(numOutputRows, hadoopConf, timeout, nThreads)
Expand Down Expand Up @@ -120,8 +121,10 @@ object QualificationMain extends Logging {

def argsContainsFSFilters(appArgs: QualificationArgs): Boolean = {
val filterCriteria = appArgs.filterCriteria.toOption
val maxEventLogSize = appArgs.maxEventLogSize.toOption
appArgs.matchEventLogs.isSupplied ||
(filterCriteria.isDefined && filterCriteria.get.endsWith("-filesystem"))
(filterCriteria.isDefined && filterCriteria.get.endsWith("-filesystem")) ||
maxEventLogSize.isDefined
}

def argsContainsAppFilters(appArgs: QualificationArgs): Boolean = {
Expand Down
8 changes: 5 additions & 3 deletions user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,9 +513,11 @@ def init_instance_descriptions(self) -> None:
Load instance description file from resources based on platform type.
"""
platform = CspEnv.pretty_print(self.cloud_ctxt['platformType'])
instance_description_file_path = Utils.resource_path(f'{platform}-instance-catalog.json')
self.logger.info('Loading instance descriptions from file: %s', instance_description_file_path)
self.instance_descriptions = JSONPropertiesContainer(instance_description_file_path)
if platform != CspEnv.ONPREM:
# we do not have instance descriptions for on-prem
instance_description_file_path = Utils.resource_path(f'{platform}-instance-catalog.json')
self.logger.info('Loading instance descriptions from file: %s', instance_description_file_path)
self.instance_descriptions = JSONPropertiesContainer(instance_description_file_path)

def describe_node_instance(self, instance_type: str) -> str:
instance_info = self.instance_descriptions.get_value(instance_type)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ sparkRapids:
- no-html-report
- m
- match-event-logs
- max-event-log-size
- max-sql-desc-length
- ml-functions
- n
Expand Down