Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-285] ColumnarWindow: Support Date input in MAX/MIN #286

Merged
merged 5 commits into from
May 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 8 additions & 48 deletions arrow-data-source/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,26 +100,6 @@
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand All @@ -128,44 +108,24 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<type>test-jar</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.12</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
</exclusion>
</exclusions>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
</exclusion>
</exclusions>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
Expand Down
28 changes: 1 addition & 27 deletions native-sql-engine/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,50 +48,24 @@
<!-- Prevent our dummy JAR from being included in Spark distributions or uploaded to YARN -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
plan.isSkewJoin)
case plan: WindowExec =>
if (!enableColumnarWindow) return false
val window = ColumnarWindowExec.create(
val window = ColumnarWindowExec.createWithOptimizations(
plan.windowExpression,
plan.partitionSpec,
plan.orderSpec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,36 +221,17 @@ case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] {
}

case plan: WindowExec =>
if (columnarConf.enableColumnarWindow) {
val sortRemoved = plan.child match {
case sort: SortExec => // remove ordering requirements
replaceWithColumnarPlan(sort.child)
case _ =>
replaceWithColumnarPlan(plan.child)
}
// disable CoalesceBatchesExec to reduce Netty direct memory usage
val coalesceBatchRemoved = sortRemoved match {
case s: CoalesceBatchesExec =>
s.child
case _ => sortRemoved
}
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
try {
val window = ColumnarWindowExec.create(
plan.windowExpression,
plan.partitionSpec,
plan.orderSpec,
coalesceBatchRemoved)
return window
} catch {
case _: Throwable =>
logInfo("Columnar Window: Falling back to regular Window...")
}
try {
ColumnarWindowExec.createWithOptimizations(
plan.windowExpression,
plan.partitionSpec,
plan.orderSpec,
replaceWithColumnarPlan(plan.child))
} catch {
case _: Throwable =>
logInfo("Columnar Window: Falling back to regular Window...")
plan
}
logDebug(s"Columnar Processing for ${plan.getClass} is not currently supported.")
val children = plan.children.map(replaceWithColumnarPlan)
plan.withNewChildren(children)

case p =>
val children = plan.children.map(replaceWithColumnarPlan)
logDebug(s"Columnar Processing for ${p.getClass} is currently not supported.")
Expand Down
Loading