Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor #24136

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
if (effective) {
queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)
planChangeLogger.log(rule.ruleName, plan, result)
planChangeLogger.logRule(rule.ruleName, plan, result)
}
queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime)
queryExecutionMetrics.incNumExecution(rule.ruleName)
Expand Down Expand Up @@ -152,15 +152,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
lastPlan = curPlan
}

if (!batchStartPlan.fastEquals(curPlan)) {
logDebug(
s"""
|=== Result of Batch ${batch.name} ===
|${sideBySide(batchStartPlan.treeString, curPlan.treeString).mkString("\n")}
""".stripMargin)
} else {
logTrace(s"Batch ${batch.name} has no effect.")
}
planChangeLogger.logBatch(batch.name, batchStartPlan, curPlan)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the same thing for line 148?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The config "spark.sql.optimizer.planChangeLog.level", is used for logging plan changes after applying rule or batch . As per Line 148,it is logging when there is no change in plan,which conflicts the parameter(spark.sql.optimizer.planChangeLog.level). Its not required to log this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile please review

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please address them in a followup if there are further comments.

}

curPlan
Expand All @@ -172,21 +164,46 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {

private val logRules = SQLConf.get.optimizerPlanChangeRules.map(Utils.stringToSeq)

def log(ruleName: String, oldPlan: TreeType, newPlan: TreeType): Unit = {
private val logBatches = SQLConf.get.optimizerPlanChangeBatches.map(Utils.stringToSeq)

def logRule(ruleName: String, oldPlan: TreeType, newPlan: TreeType): Unit = {
if (logRules.isEmpty || logRules.get.contains(ruleName)) {
lazy val message =
def message(): String = {
s"""
|=== Applying Rule ${ruleName} ===
|${sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n")}
""".stripMargin
logLevel match {
case "TRACE" => logTrace(message)
case "DEBUG" => logDebug(message)
case "INFO" => logInfo(message)
case "WARN" => logWarning(message)
case "ERROR" => logError(message)
case _ => logTrace(message)
}

logBasedOnLevel(message)
}
}

def logBatch(batchName: String, oldPlan: TreeType, newPlan: TreeType): Unit = {
if (logBatches.isEmpty || logBatches.get.contains(batchName)) {
def message(): String = {
if (!oldPlan.fastEquals(newPlan)) {
s"""
|=== Result of Batch ${batchName} ===
|${sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n")}
""".stripMargin
} else {
s"Batch ${batchName} has no effect."
}
}

logBasedOnLevel(message)
}
}

private def logBasedOnLevel(f: => String): Unit = {
logLevel match {
case "TRACE" => logTrace(f)
case "DEBUG" => logDebug(f)
case "INFO" => logInfo(f)
case "WARN" => logWarning(f)
case "ERROR" => logError(f)
case _ => logTrace(f)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ object SQLConf {
val OPTIMIZER_PLAN_CHANGE_LOG_LEVEL = buildConf("spark.sql.optimizer.planChangeLog.level")
.internal()
.doc("Configures the log level for logging the change from the original plan to the new " +
"plan after a rule is applied. The value can be 'trace', 'debug', 'info', 'warn', or " +
"'error'. The default log level is 'trace'.")
"plan after a rule or batch is applied. The value can be 'trace', 'debug', 'info', " +
chakravarthiT marked this conversation as resolved.
Show resolved Hide resolved
"'warn', or 'error'. The default log level is 'trace'.")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValue(logLevel => Set("TRACE", "DEBUG", "INFO", "WARN", "ERROR").contains(logLevel),
Expand All @@ -195,9 +195,15 @@ object SQLConf {

val OPTIMIZER_PLAN_CHANGE_LOG_RULES = buildConf("spark.sql.optimizer.planChangeLog.rules")
.internal()
.doc("If this configuration is set, the optimizer will only log plan changes caused by " +
"applying the rules specified in this configuration. The value can be a list of rule " +
"names separated by comma.")
.doc("Configures a list of rules to be logged in the optimizer, in which the rules are " +
"specified by their rule names and separated by comma.")
.stringConf
.createOptional

val OPTIMIZER_PLAN_CHANGE_LOG_BATCHES = buildConf("spark.sql.optimizer.planChangeLog.batches")
chakravarthiT marked this conversation as resolved.
Show resolved Hide resolved
.internal()
.doc("Configures a list of batches to be logged in the optimizer, in which the batches " +
"are specified by their batch names and separated by comma.")
.stringConf
.createOptional

Expand Down Expand Up @@ -1751,6 +1757,8 @@ class SQLConf extends Serializable with Logging {

def optimizerPlanChangeRules: Option[String] = getConf(OPTIMIZER_PLAN_CHANGE_LOG_RULES)

def optimizerPlanChangeBatches: Option[String] = getConf(OPTIMIZER_PLAN_CHANGE_LOG_BATCHES)

def stateStoreProviderClass: String = getConf(STATE_STORE_PROVIDER_CLASS)

def stateStoreMinDeltasForSnapshot: Int = getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,20 @@ import org.apache.spark.sql.internal.SQLConf
class OptimizerLoggingSuite extends PlanTest {

object Optimize extends RuleExecutor[LogicalPlan] {
val batches = Batch("Optimizer Batch", FixedPoint(100),
PushDownPredicate,
ColumnPruning,
CollapseProject) :: Nil
val batches =
Batch("Optimizer Batch", FixedPoint(100),
PushDownPredicate, ColumnPruning, CollapseProject) ::
Batch("Batch Has No Effect", Once,
ColumnPruning) :: Nil
}

class MockAppender extends AppenderSkeleton {
val loggingEvents = new ArrayBuffer[LoggingEvent]()

override def append(loggingEvent: LoggingEvent): Unit = {
if (loggingEvent.getRenderedMessage().contains("Applying Rule")) {
if (loggingEvent.getRenderedMessage().contains("Applying Rule") ||
loggingEvent.getRenderedMessage().contains("Result of Batch") ||
loggingEvent.getRenderedMessage().contains("has no effect")) {
loggingEvents.append(loggingEvent)
}
}
Expand All @@ -51,7 +54,18 @@ class OptimizerLoggingSuite extends PlanTest {
override def requiresLayout(): Boolean = false
}

private def verifyLog(expectedLevel: Level, expectedRules: Seq[String]): Unit = {
private def withLogLevelAndAppender(level: Level, appender: Appender)(f: => Unit): Unit = {
val logger = Logger.getLogger(Optimize.getClass.getName.dropRight(1))
val restoreLevel = logger.getLevel
logger.setLevel(level)
logger.addAppender(appender)
try f finally {
logger.setLevel(restoreLevel)
logger.removeAppender(appender)
}
}

private def verifyLog(expectedLevel: Level, expectedRulesOrBatches: Seq[String]): Unit = {
val logAppender = new MockAppender()
withLogAppender(logAppender,
loggerName = Some(Optimize.getClass.getName.dropRight(1)), level = Some(Level.TRACE)) {
Expand All @@ -61,7 +75,8 @@ class OptimizerLoggingSuite extends PlanTest {
comparePlans(Optimize.execute(query), expected)
}
val logMessages = logAppender.loggingEvents.map(_.getRenderedMessage)
assert(expectedRules.forall(rule => logMessages.exists(_.contains(rule))))
assert(expectedRulesOrBatches.forall
(ruleOrBatch => logMessages.exists(_.contains(ruleOrBatch))))
assert(logAppender.loggingEvents.forall(_.getLevel == expectedLevel))
}

Expand Down Expand Up @@ -135,4 +150,20 @@ class OptimizerLoggingSuite extends PlanTest {
}
}
}

test("test log batches which change the plan") {
withSQLConf(
SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_BATCHES.key -> "Optimizer Batch",
chakravarthiT marked this conversation as resolved.
Show resolved Hide resolved
SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_LEVEL.key -> "INFO") {
verifyLog(Level.INFO, Seq("Optimizer Batch"))
}
}

test("test log batches which do not change the plan") {
withSQLConf(
SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_BATCHES.key -> "Batch Has No Effect",
SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_LEVEL.key -> "INFO") {
verifyLog(Level.INFO, Seq("Batch Has No Effect"))
}
}
}