-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-7026] [SQL] fix left semi join with equi key and non-equi condition #5643
Conversation
Test build #30798 has finished for PR 5643 at commit
|
I am not sure it is suitable to broadcast a hashmap contains key and related rows, this maybe much bigger than the old hashset, may cause OOM issue. |
@scwf Of course we can go the old way when there's no additional conditions. |
override def execute(): RDD[Row] = { | ||
val buildIter= buildPlan.execute().map(_.copy()).collect().toIterator | ||
val hashSet = new java.util.HashSet[Row]() | ||
val hashMap = new java.util.HashMap[Row, scala.collection.mutable.ArrayBuffer[Row]]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why changed to arraybuffer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a consideration for performance. Anyway I'm changing it back to HashSet.
Test build #30800 has finished for PR 5643 at commit
|
Yes, i understand that when no additional condition it go the old way. i mean when there are additional conditions, your broadcasting hashmap may be much bigger since you also kept the related rows, which may leads to OOM. |
Test build #30805 has finished for PR 5643 at commit
|
Test build #30808 has finished for PR 5643 at commit
|
@@ -298,6 +298,13 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { | |||
) | |||
} | |||
|
|||
test("left semi greater than predicate and equal operator") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adrian-wang i suggest you add the case chenghao described in my PR to the unit test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
create a pr for your branch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
closed since you have added the test
Test build #30812 has finished for PR 5643 at commit
|
This LGTM |
Test build #30818 has finished for PR 5643 at commit
|
retest this please. |
Test build #30822 has finished for PR 5643 at commit
|
Test build #30834 has finished for PR 5643 at commit
|
} | ||
} | ||
} | ||
case Some(_) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if this would be simpler if we use the HashedRelation
instead.
Test build #30905 has finished for PR 5643 at commit
|
|
||
override val buildSide: BuildSide = BuildRight | ||
|
||
override def output: Seq[Attribute] = left.output | ||
|
||
@transient private lazy val boundCondition = | ||
condition.map(newPredicate(_, left.output ++ right.output)).getOrElse((row: Row) => true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
?
Test build #30924 has finished for PR 5643 at commit
|
retest this please. |
Test build #30988 has started for PR 5643 at commit |
retest this please. |
Test build #31091 has finished for PR 5643 at commit
|
ping can you update this? |
Test build #33063 has finished for PR 5643 at commit
|
val joinedRow = new JoinedRow | ||
|
||
streamIter.filter(current => { | ||
val rowBuffer = broadcastedRelation.value.get(joinKeys.currentValue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to apply first before we get currentValue, or will get null for the first row.
Mind bringing this up to date? |
Test build #35076 has finished for PR 5643 at commit
|
Test build #35084 has finished for PR 5643 at commit
|
Test build #35086 has finished for PR 5643 at commit
|
Test build #35090 has finished for PR 5643 at commit
|
Test build #35241 has finished for PR 5643 at commit
|
!joinKeys(current).anyNull && broadcastedRelation.value.contains(joinKeys.currentValue) | ||
}) | ||
} | ||
case _ => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using pattern matching here makes this a little hard to understand as I don't think its very obvious that case _ =>
implies there is a non equijoin condition and thus we need to build a full hashtable instead of a hash set. Perhaps name the variable nonEquiJoinCondition
and use isDefined
in an if
statement. Some more comments would also be helpful.
Thanks for working on this and sorry for the delay reviewing. Would be great if this could be updated in time for Spark 1.5! |
Test build #37346 has finished for PR 5643 at commit
|
Thanks! Merging to master. |
|
||
protected def buildKeyHashSet( | ||
buildIter: Iterator[InternalRow], | ||
copy: Boolean): java.util.Set[InternalRow] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Long term I wonder if its actually a win for us to build just a set instead of using hashed relation everywhere. We have done a bunch optimization on HashedRelation to make it serialize faster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we need to implement a version of HashedRelation which only stores the keys.
When the
condition
extracted byExtractEquiJoinKeys
contain join Predicate for left semi join, we can not plan it as semiJoin. Such asCondition
x.a >= y.a + 2
can not evaluate on tablex
, so it throw errors