Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23914][SQL] Add array_union function #21061

Closed
wants to merge 34 commits into from

Conversation

kiszk
Copy link
Member

@kiszk kiszk commented Apr 13, 2018

What changes were proposed in this pull request?

The PR adds the SQL function array_union. The behavior of the function is based on Presto's one.

This function returns returns an array of the elements in the union of array1 and array2.

Note: The order of elements in the result is not defined.

How was this patch tested?

Added UTs

@SparkQA
Copy link

SparkQA commented Apr 13, 2018

Test build #89317 has finished for PR 21061 at commit 6604271.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

cc @ueshin

@SparkQA
Copy link

SparkQA commented Apr 13, 2018

Test build #89320 has finished for PR 21061 at commit 29c9b92.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 13, 2018

Test build #89335 has finished for PR 21061 at commit 2fb87fa.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

""",
since = "2.4.0")
case class ArrayUnion(left: Expression, right: Expression)
extends BinaryExpression with ExpectsInputTypes with CodegenFallback {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should implement codegen version instead of using CodegenFallback.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to let us know why we should implement codegen version? From the performance view, the codegen may not have advantage since union method takes longer time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, do we want to keep the chain length of the whole-stage codegen as possible?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wholestage codegen doesn't support CodegenFallback. So even this expression codegen has no performance advantage itself, it still can makes a difference because it breaks a query to non wholestage codegen and wholestage codegen parts.

Copy link
Member Author

@kiszk kiszk Apr 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thank you for your clarification

i += 1
}
i = 0
while (i < ary2.numElements()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also support array_union and array_except by changing this 2nd loop with small other changes. This is why we introduced ArraySetUtils in this PR.

Other PRs will update ArraySetUtils appropriately.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is an instance of the usage of ArraySetUtils.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the final version of ArraySetUtils that supports three functions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this abstraction is good or not. The final version seems complex because of a bunch of if-else.
I'd rather introduce abstract methods for the difference and override them in the subclasses.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your good suggestion.
I will create a new abstract method for this part which will be overridden by each of three subclasses

@SparkQA
Copy link

SparkQA commented Apr 17, 2018

Test build #89466 has finished for PR 21061 at commit 26c30b9.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 17, 2018

Test build #89468 has finished for PR 21061 at commit 809621b.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 17, 2018

Test build #89470 has finished for PR 21061 at commit cf65616.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class ArraySetUtils extends BinaryExpression with ExpectsInputTypes
  • case class ArrayUnion(left: Expression, right: Expression) extends ArraySetUtils

@kiszk
Copy link
Member Author

kiszk commented Apr 18, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Apr 18, 2018

Test build #89474 has finished for PR 21061 at commit cf65616.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class ArraySetUtils extends BinaryExpression with ExpectsInputTypes
  • case class ArrayUnion(left: Expression, right: Expression) extends ArraySetUtils

@SparkQA
Copy link

SparkQA commented Apr 18, 2018

Test build #89487 has finished for PR 21061 at commit bbbc865.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Apr 18, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Apr 18, 2018

Test build #89493 has finished for PR 21061 at commit bbbc865.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 18, 2018

Test build #89510 has finished for PR 21061 at commit 76561f1.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 18, 2018

Test build #89515 has finished for PR 21061 at commit 01ec6c2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 18, 2018

Test build #89526 has finished for PR 21061 at commit b1a0f7f.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 19, 2018

Test build #89539 has finished for PR 21061 at commit 233f056.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

|for (int $i = 0; $i < $ary2.numElements(); $i++) {
| $hs.add$postFix($ary2.$getter);
|}
|${ev.value} = $arrayBuilder(($castType[]) $hs.iterator().toArray($classTag));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we shouldn't use iterator() to avoid box/unbox. Iterator is not specialized.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, great catch. I confirmed there is not iterator(), which is specialized, in OpenHashSet$mcI$sp`. I will rewrite this.

checkEvaluation(ArrayUnion(a20, a22), Seq("b", "a", "c", null, "g"))
checkEvaluation(ArrayUnion(a23, a24), Seq("b", "c", "d", "a", "f"))

checkEvaluation(ArrayUnion(a30, a30), Seq(null))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if one of the two arguments is null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I cannot see such a test case in Presto.
Let me think.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Umm, when only one of the arguments is null, unexpected TreeNodeException occurs.

checkEvaluation(ArrayUnion(a20, a30), Seq("b", "a", "c", null))
After applying rule org.apache.spark.sql.catalyst.optimizer.EliminateDistinct in batch Eliminate Distinct, the structural integrity of the plan is broken., tree:
'Project [array_union([b,a,c], [null,null]) AS Optimized(array_union([b,a,c], [null,null]))#71]
+- OneRowRelation

org.apache.spark.sql.catalyst.errors.package$TreeNodeException: After applying rule org.apache.spark.sql.catalyst.optimizer.EliminateDistinct in batch Eliminate Distinct, the structural integrity of the plan is broken., tree:
'Project [array_union([b,a,c], [null,null]) AS Optimized(array_union([b,a,c], [null,null]))#71]
+- OneRowRelation


	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:106)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
	at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
	at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
	at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
	at org.apache.spark.sql.catalyst.expressions.ExpressionEvalHelper$class.checkEvaluationWithOptimization(ExpressionEvalHelper.scala:252)
...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure its reason (maybe because the element types are different?), but I meant something like:

checkEvaluation(ArrayUnion(a20, Literal.create(null, ArrayType(StringType))), ...?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Thanks. Your example returns null. Since the following test throws an exception, I think that it makes sense that your example returns null. WDYT?

    val df8 = Seq((null, Array("a"))).toDF("a", "b")
    intercept[AnalysisException] {
      df8.select(array_union($"a", $"b"))
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning null sounds good, but what do you mean by "Since the following test throws an exception"? What exception is the test throwing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following error occurs. When I looked at other tests, it does not look strange. This is because null has no type information.

cannot resolve 'array_union(NULL, `b`)' due to data type mismatch: Element type in both arrays must be the same;;
'Project [array_union(null, b#118) AS array_union(a, b)#121]
+- AnalysisBarrier
      +- Project [_1#114 AS a#117, _2#115 AS b#118]
         +- LocalRelation [_1#114, _2#115]

org.apache.spark.sql.AnalysisException: cannot resolve 'array_union(NULL, `b`)' due to data type mismatch: Element type in both arrays must be the same;;
'Project [array_union(null, b#118) AS array_union(a, b)#121]
+- AnalysisBarrier
      +- Project [_1#114 AS a#117, _2#115 AS b#118]
         +- LocalRelation [_1#114, _2#115]

	at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:93)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Maybe the purpose of the test is not what I thought.
Seems like what I wanted is included in the latest updates.

@@ -505,3 +506,150 @@ case class ArrayMax(child: Expression) extends UnaryExpression with ImplicitCast

override def prettyName: String = "array_max"
}

abstract class ArraySetUtils extends BinaryExpression with ExpectsInputTypes {
val kindUnion = 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be in ArraySetUtils object?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

i += 1
}
i = 0
while (i < ary2.numElements()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this abstraction is good or not. The final version seems complex because of a bunch of if-else.
I'd rather introduce abstract methods for the difference and override them in the subclasses.

@SparkQA
Copy link

SparkQA commented Apr 20, 2018

Test build #89621 has finished for PR 21061 at commit eb325ca.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 20, 2018

Test build #89624 has finished for PR 21061 at commit c04b166.

  • This patch fails from timeout after a configured wait of `300m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 20, 2018

Test build #89625 has finished for PR 21061 at commit 02d6d9e.

  • This patch fails from timeout after a configured wait of `300m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 8, 2018

Test build #92716 has finished for PR 21061 at commit 0c0d3ba.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Jul 8, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Jul 8, 2018

Test build #92717 has finished for PR 21061 at commit 0c0d3ba.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 8, 2018

Test build #92718 has finished for PR 21061 at commit 4a217bc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 8, 2018

Test build #92720 has finished for PR 21061 at commit f5ebbe8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except for a few comments.

ArrayType(et, dataTypes.exists(_.asInstanceOf[ArrayType].containsNull))
case dt => dt
}.getOrElse(StringType)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

override def dataType: DataType = {
  val dataTypes = children.map(_.dataType.asInstanceOf[ArrayType])
  ArrayType(elementType, dataTypes.exists(_.containsNull))
}

should work?

// store elements into resultArray
var nullElementSize = 0
var pos = 0
Seq(array1, array2).foreach(array => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: foreach { array =>?

val a30 = Literal.create(Seq(null, null), ArrayType(IntegerType))
val a31 = Literal.create(null, ArrayType(StringType))

checkEvaluation(ArrayUnion(a00, a01), UnsafeArrayData.fromPrimitiveArray(Array(1, 2, 3, 4)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we don't need to use UnsafeArrayData here. Seq(1, 2, 3, 4) should work.

}
}

private def cn = left.dataType.asInstanceOf[ArrayType].containsNull ||
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

containsNull instead of cn?

return fromPrimitiveArray(null, offset, length, elementSize);
}

public static boolean useGenericArrayData(int elementSize, int length) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: canUseGenericArrayData

@since(2.4)
def array_union(col1, col2):
"""
Collection function: returns an array of the elements in the union of col1 and col2,
Copy link
Member

@viirya viirya Jul 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the array of col1 contains duplicate elements itself, what it does? de-duplicate them too?

E.g.,

df = spark.createDataFrame([Row(c1=["b", "a", "c", "c"], c2=["c", "d", "a", "f"])])
df.select(array_union(df.c1, df.c2)).collect()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading the code, seems it de-duplicates all elements from two arrays. Is this behavior the same as Presto?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add the tests for duplication.
Yes, this will de-duplicate. I think that it is the same behavior as Presto.

@@ -450,7 +450,7 @@ public UnsafeArrayData copy() {
return values;
}

private static UnsafeArrayData fromPrimitiveArray(
public static UnsafeArrayData fromPrimitiveArray(
Object arr, int offset, int length, int elementSize) {
final long headerInBytes = calculateHeaderPortionInBytes(length);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this logic extracted to useGenericArrayData? If so, can we re-use it by calling the method here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this thread an answer to this question?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

}


abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Describe what ArraySetLike is intended for by adding comment?

// calculate result array size
hsLong = new OpenHashSet[Long]
val elements = evalIntLongPrimitiveType(array1, array2, null, true)
hsLong = new OpenHashSet[Long]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we obtain unique elements of two arrays in the hash set, can't we get final array elements from it directly instead of scanning two arrays again?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be. Originally, I took that approach.
After discussed with @ueshin, I decided to generate a result array from the original arrays instead of the hash. This is because we generate a result array in a unique deterministic order among the different paths in array_union.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, though I think there will be some performance issue.

var i = 0
while (i < array.numElements()) {
if (array.isNullAt(i)) {
if (!foundNullElement) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This two if can be combined?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is not easy since we want to do nothing if array.isNullAt(i) && foundNullElement is true.

@since(2.4)
def array_union(col1, col2):
"""
Collection function: returns an array of the elements in the union of col1 and col2,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading the code, seems it de-duplicates all elements from two arrays. Is this behavior the same as Presto?

@SparkQA
Copy link

SparkQA commented Jul 9, 2018

Test build #92769 has finished for PR 21061 at commit 763a1f8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Jul 10, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Jul 10, 2018

Test build #92779 has finished for PR 21061 at commit 763a1f8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Jul 10, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Jul 10, 2018

Test build #92789 has finished for PR 21061 at commit 763a1f8.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member

ueshin commented Jul 10, 2018

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jul 10, 2018

Test build #92807 has finished for PR 21061 at commit 763a1f8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member

ueshin commented Jul 11, 2018

@viirya Do you have any other comments on this? Thanks!

return fromPrimitiveArray(null, offset, length, elementSize);
}

public static boolean canUseGenericArrayData(int elementSize, int length) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry, I've suggested this naming, but seems it is should be something like shouldUseGenericArrayData.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both make sense. I will follow your suggestion.

s"get$ptName($i)", s"set$ptName($pos, $value)", CodeGenerator.javaType(elementType),
if (elementType == LongType) "(long)" else "(int)",
s"""
|${ctx.createUnsafeArray(unsafeArray, size, elementType, s" $prettyName failed.")}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we don't automatically choose to use GenericArrayData as the same as interpreted path?

Copy link
Member Author

@kiszk kiszk Jul 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your comment is correct. It would be good to address this choice in another PR to update ctx.createUnsafeArray.
cc: @ueshin

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean a refactoring around the usage of createUnsafeArray through new collection functions in another PR? If so, I'm okay with doing it in another PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I mean a refactoring the usage of createUnsafeArray thru new collection functions.

@viirya
Copy link
Member

viirya commented Jul 11, 2018

Few minor comments. otherwise LGTM.

@SparkQA
Copy link

SparkQA commented Jul 11, 2018

Test build #92861 has finished for PR 21061 at commit 7b51564.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Jul 12, 2018 via email

@ueshin
Copy link
Member

ueshin commented Jul 12, 2018

Thanks! merging to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants