Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-705] Fallback R2C on unsupported cases (#706)
Browse files Browse the repository at this point in the history
* fallback on unsupported case

* add back ArrayType
  • Loading branch information
rui-mo authored Jan 18, 2022
1 parent b571919 commit ccb1e71
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ class ArrowRowToColumnarExec(child: SparkPlan) extends RowToColumnarExec(child =
case d: DecimalType =>
case d: TimestampType =>
case d: BinaryType =>
case d: ArrayType =>
case _ =>
throw new UnsupportedOperationException(s"${field.dataType} " +
s"is not supported in ArrowColumnarToRowExec.")
s"is not supported in ArrowRowToColumnarExec.")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,12 +338,17 @@ case class ColumnarPostOverrides() extends Rule[SparkPlan] {

def replaceWithColumnarPlan(plan: SparkPlan): SparkPlan = plan match {
case plan: RowToColumnarExec =>
val child = replaceWithColumnarPlan(plan.child)
if (columnarConf.enableArrowRowToColumnar) {
val child = replaceWithColumnarPlan(plan.child)
logDebug(s"ColumnarPostOverrides ArrowRowToColumnarExec(${child.getClass})")
new ArrowRowToColumnarExec(child)
try {
new ArrowRowToColumnarExec(child)
} catch {
case _: Throwable =>
logInfo("ArrowRowToColumnar: Falling back to RowToColumnar...")
RowToArrowColumnarExec(child)
}
} else {
val child = replaceWithColumnarPlan(plan.child)
logDebug(s"ColumnarPostOverrides RowToArrowColumnarExec(${child.getClass})")
RowToArrowColumnarExec(child)
}
Expand All @@ -365,25 +370,25 @@ case class ColumnarPostOverrides() extends Rule[SparkPlan] {
case r: SparkPlan
if !r.isInstanceOf[QueryStageExec] && !r.supportsColumnar && r.children.exists(c =>
c.isInstanceOf[ColumnarToRowExec]) =>
// This is a fix for when DPP and AQE both enabled, ColumnarExchange maybe child as a Row SparkPlan
val children = r.children.map(c =>
c match {
case c: ColumnarToRowExec =>
if (columnarConf.enableArrowColumnarToRow) {
try {
val child = replaceWithColumnarPlan(c.child)
new ArrowColumnarToRowExec(child)
} catch {
case _: Throwable =>
logInfo("ArrowColumnarToRow : Falling back to ColumnarToRow...")
c.withNewChildren(c.children.map(replaceWithColumnarPlan))
}
} else {
c.withNewChildren(c.children.map(replaceWithColumnarPlan))
// This is a fix for when DPP and AQE both enabled,
// ColumnarExchange maybe child as a Row SparkPlan.
val children = r.children.map {
case c: ColumnarToRowExec =>
if (columnarConf.enableArrowColumnarToRow) {
try {
val child = replaceWithColumnarPlan(c.child)
new ArrowColumnarToRowExec(child)
} catch {
case _: Throwable =>
logInfo("ArrowColumnarToRow : Falling back to ColumnarToRow...")
c.withNewChildren(c.children.map(replaceWithColumnarPlan))
}
case other =>
replaceWithColumnarPlan(other)
})
} else {
c.withNewChildren(c.children.map(replaceWithColumnarPlan))
}
case other =>
replaceWithColumnarPlan(other)
}
r.withNewChildren(children)
case p =>
val children = p.children.map(replaceWithColumnarPlan)
Expand Down

0 comments on commit ccb1e71

Please sign in to comment.