Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-451] backport patches to 1.2 #448

Merged
merged 7 commits into from
Aug 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion arrow-data-source/standard/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,10 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession {

test("file descriptor leak - v1") {
val path = ArrowDataSourceTest.locateResourcePath(parquetFile1)
spark.catalog.createTable("ptab2", path, "arrow")
val frame = spark.read
.option(ArrowOptions.KEY_ORIGINAL_FORMAT, "parquet")
.arrow(path)
frame.createOrReplaceTempView("ptab2")

def getFdCount: Long = {
ManagementFactory.getOperatingSystemMXBean
Expand Down
2 changes: 1 addition & 1 deletion native-sql-engine/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ class ColumnarPluginConfig(conf: SQLConf) extends Logging {
def getCpu(): Boolean = {
val source = scala.io.Source.fromFile("/proc/cpuinfo")
val lines = try source.mkString finally source.close()
return true
//TODO(): check CPU flags to enable/disable AVX512
if (lines.contains("GenuineIntel")) {
return true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ case class ColumnarHashAggregateExec(
aggregateExpressions: Seq[AggregateExpression],
aggregateAttributes: Seq[Attribute],
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
var resultExpressions: Seq[NamedExpression],
child: SparkPlan)
extends BaseAggregateExec
with ColumnarCodegenSupport
Expand All @@ -76,14 +76,28 @@ case class ColumnarHashAggregateExec(
val numaBindingInfo = ColumnarPluginConfig.getConf.numaBindingInfo
override def supportsColumnar = true

var resAttributes: Seq[Attribute] = resultExpressions.map(_.toAttribute)
if (aggregateExpressions != null && aggregateExpressions.nonEmpty) {
aggregateExpressions.head.mode match {
case Partial =>
// To fix the expression ids in result expressions being different with those from
// inputAggBufferAttributes, in Partial Aggregate,
// result attributes are recalculated to set the result expressions.
resAttributes = groupingExpressions.map(_.toAttribute) ++
aggregateExpressions.flatMap(_.aggregateFunction.inputAggBufferAttributes)
resultExpressions = resAttributes
case _ =>
}
}

// Members declared in org.apache.spark.sql.execution.AliasAwareOutputPartitioning
override protected def outputExpressions: Seq[NamedExpression] = resultExpressions

// Members declared in org.apache.spark.sql.execution.CodegenSupport
protected def doProduce(ctx: CodegenContext): String = throw new UnsupportedOperationException()

// Members declared in org.apache.spark.sql.catalyst.plans.QueryPlan
override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
override def output: Seq[Attribute] = resAttributes

// Members declared in org.apache.spark.sql.execution.SparkPlan
protected override def doExecute()
Expand Down Expand Up @@ -398,30 +412,7 @@ case class ColumnarHashAggregateExec(
expr.mode match {
case Final =>
val out_res = 0
resultColumnVectors(idx).dataType match {
case t: IntegerType =>
resultColumnVectors(idx)
.put(0, out_res.asInstanceOf[Number].intValue)
case t: LongType =>
resultColumnVectors(idx)
.put(0, out_res.asInstanceOf[Number].longValue)
case t: DoubleType =>
resultColumnVectors(idx)
.put(0, out_res.asInstanceOf[Number].doubleValue())
case t: FloatType =>
resultColumnVectors(idx)
.put(0, out_res.asInstanceOf[Number].floatValue())
case t: ByteType =>
resultColumnVectors(idx)
.put(0, out_res.asInstanceOf[Number].byteValue())
case t: ShortType =>
resultColumnVectors(idx)
.put(0, out_res.asInstanceOf[Number].shortValue())
case t: StringType =>
val values = (out_res :: Nil).map(_.toByte).toArray
resultColumnVectors(idx)
.putBytes(0, 1, values, 0)
}
putDataIntoVector(resultColumnVectors, out_res, idx)
idx += 1
case _ =>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import scala.collection.mutable.ListBuffer
import scala.util.Random

import org.apache.spark.sql.execution.datasources.v2.arrow.SparkSchemaUtils
import util.control.Breaks._

case class ColumnarWindowExec(windowExpression: Seq[NamedExpression],
partitionSpec: Seq[Expression],
Expand All @@ -60,6 +61,8 @@ case class ColumnarWindowExec(windowExpression: Seq[NamedExpression],

override def output: Seq[Attribute] = child.output ++ windowExpression.map(_.toAttribute)

buildCheck()

override def requiredChildDistribution: Seq[Distribution] = {
if (partitionSpec.isEmpty) {
// Only show warning when the number of bytes is larger than 100 MiB?
Expand Down Expand Up @@ -91,6 +94,29 @@ case class ColumnarWindowExec(windowExpression: Seq[NamedExpression],
val sparkConf = sparkContext.getConf
val numaBindingInfo = ColumnarPluginConfig.getConf.numaBindingInfo

def buildCheck(): Unit = {
var allLiteral = true
try {
breakable {
for (func <- validateWindowFunctions()) {
for (child <- func._2.children) {
if (!child.isInstanceOf[Literal]) {
allLiteral = false
break
}
}
}
}
} catch {
case e: Throwable =>
throw new UnsupportedOperationException(s"${e.getMessage}")
}
if (allLiteral) {
throw new UnsupportedOperationException(
s"Window functions' children all being Literal is not supported.")
}
}

def checkAggFunctionSpec(windowSpec: WindowSpecDefinition): Unit = {
if (windowSpec.orderSpec.nonEmpty) {
throw new UnsupportedOperationException("unsupported operation for " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ class DateTimeSuite extends QueryTest with SharedSparkSession {
}

// FIXME ZONE issue
ignore("date type - cast from timestamp") {
test("date type - cast from timestamp") {
withTempView("dates") {
val dates = (0L to 3L).map(i => i * 24 * 1000 * 3600)
.map(i => Tuple1(new Timestamp(i)))
Expand Down Expand Up @@ -248,7 +248,7 @@ class DateTimeSuite extends QueryTest with SharedSparkSession {
}

// todo: fix field/literal implicit conversion in ColumnarExpressionConverter
ignore("date type - join on, bhj") {
test("date type - join on, bhj") {
withTempView("dates1", "dates2") {
val dates1 = (0L to 3L).map(i => i * 1000 * 3600 * 24)
.map(i => Tuple1(new Date(i))).toDF("time1")
Expand Down Expand Up @@ -750,7 +750,7 @@ class DateTimeSuite extends QueryTest with SharedSparkSession {
}
}

ignore("datetime function - to_date with format") { // todo GetTimestamp IS PRIVATE ?
test("datetime function - to_date with format") { // todo GetTimestamp IS PRIVATE ?
withTempView("dates") {

val dates = Seq("2009-07-30", "2009-07-31", "2009-08-01")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class TPCDSSuite extends QueryTest with SharedSparkSession {
super.afterAll()
}

ignore("window queries") {
test("window queries") {
runner.runTPCQuery("q12", 1, true)
runner.runTPCQuery("q20", 1, true)
runner.runTPCQuery("q36", 1, true)
Expand Down Expand Up @@ -103,7 +103,7 @@ class TPCDSSuite extends QueryTest with SharedSparkSession {
df.show()
}

ignore("window function with decimal input") {
test("window function with decimal input") {
val df = spark.sql("SELECT i_item_sk, i_class_id, SUM(i_current_price)" +
" OVER (PARTITION BY i_class_id) FROM item LIMIT 1000")
df.explain()
Expand All @@ -118,7 +118,7 @@ class TPCDSSuite extends QueryTest with SharedSparkSession {
df.show()
}

ignore("window function with decimal input 2") {
test("window function with decimal input 2") {
val df = spark.sql("SELECT i_item_sk, i_class_id, RANK()" +
" OVER (PARTITION BY i_class_id ORDER BY i_current_price) FROM item LIMIT 1000")
df.explain()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ArrowColumnarBatchSerializerSuite extends SparkFunSuite with SharedSparkSe
SQLMetrics.createAverageMetric(spark.sparkContext, "test serializer number of output rows")
}

ignore("deserialize all null") {
test("deserialize all null") {
val input = getTestResourcePath("test-data/native-splitter-output-all-null")
val serializer =
new ArrowColumnarBatchSerializer(avgBatchNumRows, outputNumRows).newInstance()
Expand Down Expand Up @@ -64,7 +64,7 @@ class ArrowColumnarBatchSerializerSuite extends SparkFunSuite with SharedSparkSe
deserializedStream.close()
}

ignore("deserialize nullable string") {
test("deserialize nullable string") {
val input = getTestResourcePath("test-data/native-splitter-output-nullable-string")
val serializer =
new ArrowColumnarBatchSerializer(avgBatchNumRows, outputNumRows).newInstance()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,7 @@ class DataFrameAggregateSuite extends QueryTest
}

Seq(true, false).foreach { value =>
ignore(s"SPARK-31620: agg with subquery (whole-stage-codegen = $value)") {
test(s"SPARK-31620: agg with subquery (whole-stage-codegen = $value)") {
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> value.toString) {
withTempView("t1", "t2") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
Row("b", 2, 4, 8)))
}

ignore("null inputs") {
test("null inputs") {
val df = Seq(("a", 1), ("a", 1), ("a", 2), ("a", 2), ("b", 4), ("b", 3), ("b", 2))
.toDF("key", "value")
val window = Window.orderBy()
Expand Down Expand Up @@ -908,7 +908,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
}
}

ignore("NaN and -0.0 in window partition keys") {
test("NaN and -0.0 in window partition keys") {
val df = Seq(
(Float.NaN, Double.NaN),
(0.0f/0.0f, 0.0/0.0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession {
assert(d0 <= d1 && d1 <= d2 && d2 <= d3 && d3 - d0 <= 1)
}

ignore("function current_timestamp and now") {
test("function current_timestamp and now") {
val df1 = Seq((1, 2), (3, 1)).toDF("a", "b")
checkAnswer(df1.select(countDistinct(current_timestamp())), Row(1))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ import org.apache.spark.sql.internal.SQLConf
* }}}
*/
// scalastyle:on line.size.limit
@deprecated("This test suite is not suitable for native sql engine.", "Mo Rui")
// This test suite is not suitable for native sql engine. (Mo Rui)
/*
trait PlanStabilitySuite extends TPCDSBase with DisableAdaptiveExecutionSuite {

private val originalMaxToStringFields = conf.maxToStringFields
Expand Down Expand Up @@ -338,3 +339,4 @@ class TPCDSModifiedPlanStabilityWithStatsSuite extends PlanStabilitySuite {
}
}
}
*/
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class ReuseExchangeSuite extends RepartitionSuite {

override lazy val input = spark.read.parquet(filePath)

ignore("columnar exchange same result") {
test("columnar exchange same result") {
val df1 = input.groupBy("n_regionkey").agg(Map("n_nationkey" -> "sum"))
val hashAgg1 = df1.queryExecution.executedPlan.collectFirst {
case agg: ColumnarHashAggregateExec => agg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ class NativeDataFrameAggregateSuite extends QueryTest
}

Seq(true, false).foreach { value =>
ignore(s"SPARK-31620: agg with subquery (whole-stage-codegen = $value)") {
test(s"SPARK-31620: agg with subquery (whole-stage-codegen = $value)") {
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> value.toString) {
withTempView("t1", "t2") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class NativeTPCHTableRepartitionSuite extends NativeRepartitionSuite {

override lazy val input = spark.read.format("arrow").load(filePath)

/*
ignore("tpch table round robin partitioning") {
withRepartition(df => df.repartition(2))
}
Expand All @@ -95,6 +96,7 @@ class NativeTPCHTableRepartitionSuite extends NativeRepartitionSuite {
df => df.groupBy("n_regionkey").agg(Map("n_nationkey" -> "sum")),
df => df.repartition(2))
}
*/
}

class NativeDisableColumnarShuffleSuite extends NativeRepartitionSuite {
Expand Down
Loading