Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] NDS query 16 hangs at SF30K #8278

Closed
mattahrens opened this issue May 11, 2023 · 17 comments · Fixed by #8659
Closed

[BUG] NDS query 16 hangs at SF30K #8278

mattahrens opened this issue May 11, 2023 · 17 comments · Fixed by #8659
Assignees
Labels
bug Something isn't working reliability Features to improve reliability or bugs that severly impact the reliability of the plugin

Comments

@mattahrens
Copy link
Collaborator

NDS query 16 was hanging (still running for over 1.5 hours) at SF30K for an on-prem 8-node A100 cluster.

Query seems to be hanging here:

ai.rapids.cudf.Table.mixedLeftSemiJoinGatherMap(Native Method)
ai.rapids.cudf.Table.mixedLeftSemiJoinGatherMap(Table.java:3180)
org.apache.spark.sql.rapids.execution.ConditionalHashJoinIterator.$anonfun$joinGathererLeftRight$6(GpuHashJoin.scala:519)
org.apache.spark.sql.rapids.execution.ConditionalHashJoinIterator$$Lambda$4492/1820099669.apply(Unknown Source)
com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
org.apache.spark.sql.rapids.execution.ConditionalHashJoinIterator.$anonfun$joinGathererLeftRight$5(GpuHashJoin.scala:504)
org.apache.spark.sql.rapids.execution.ConditionalHashJoinIterator$$Lambda$4491/1140376598.apply(Unknown Source)
com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
org.apache.spark.sql.rapids.execution.ConditionalHashJoinIterator.$anonfun$joinGathererLeftRight$4(GpuHashJoin.scala:503)
org.apache.spark.sql.rapids.execution.ConditionalHashJoinIterator$$Lambda$4490/1598377815.apply(Unknown Source)
com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
org.apache.spark.sql.rapids.execution.ConditionalHashJoinIterator.joinGathererLeftRight(GpuHashJoin.scala:502)
org.apache.spark.sql.rapids.execution.BaseHashJoinIterator.$anonfun$joinGathererLeftRight$2(GpuHashJoin.scala:355)
org.apache.spark.sql.rapids.execution.BaseHashJoinIterator$$Lambda$1890/962623661.apply(Unknown Source)
com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
org.apache.spark.sql.rapids.execution.BaseHashJoinIterator.$anonfun$joinGathererLeftRight$1(GpuHashJoin.scala:354)
org.apache.spark.sql.rapids.execution.BaseHashJoinIterator$$Lambda$1887/516624053.apply(Unknown Source)
com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
org.apache.spark.sql.rapids.execution.BaseHashJoinIterator.joinGathererLeftRight(GpuHashJoin.scala:353)
org.apache.spark.sql.rapids.execution.BaseHashJoinIterator.joinGatherer(GpuHashJoin.scala:369)
org.apache.spark.sql.rapids.execution.BaseHashJoinIterator.$anonfun$joinGatherer$1(GpuHashJoin.scala:378)
org.apache.spark.sql.rapids.execution.BaseHashJoinIterator$$Lambda$1883/558162977.apply(Unknown Source)
com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
org.apache.spark.sql.rapids.execution.BaseHashJoinIterator.joinGatherer(GpuHashJoin.scala:377)
org.apache.spark.sql.rapids.execution.BaseHashJoinIterator.$anonfun$createGatherer$5(GpuHashJoin.scala:310)
org.apache.spark.sql.rapids.execution.BaseHashJoinIterator$$Lambda$1880/1485116549.apply(Unknown Source)
com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
org.apache.spark.sql.rapids.execution.BaseHashJoinIterator.$anonfun$createGatherer$4(GpuHashJoin.scala:309)
org.apache.spark.sql.rapids.execution.BaseHashJoinIterator$$Lambda$1878/96947938.apply(Unknown Source)
com.nvidia.spark.rapids.Arm$.closeOnExcept(Arm.scala:88)
org.apache.spark.sql.rapids.execution.BaseHashJoinIterator.$anonfun$createGatherer$3(GpuHashJoin.scala:308)
org.apache.spark.sql.rapids.execution.BaseHashJoinIterator$$Lambda$1873/1893580726.apply(Unknown Source)
com.nvidia.spark.rapids.RmmRapidsRetryIterator$.withRestoreOnRetry(RmmRapidsRetryIterator.scala:256)
org.apache.spark.sql.rapids.execution.BaseHashJoinIterator.$anonfun$createGatherer$2(GpuHashJoin.scala:307)

A task failed with this error as well:

Previous exception in task: GPU OutOfMemory: could not split inputs and retry
	com.nvidia.spark.rapids.RmmRapidsRetryIterator$AutoCloseableAttemptSpliterator.split(RmmRapidsRetryIterator.scala:410)
	com.nvidia.spark.rapids.RmmRapidsRetryIterator$RmmRapidsRetryIterator.next(RmmRapidsRetryIterator.scala:516)
	com.nvidia.spark.rapids.RmmRapidsRetryIterator$RmmRapidsRetryAutoCloseableIterator.next(RmmRapidsRetryIterator.scala:458)
	com.nvidia.spark.rapids.RmmRapidsRetryIterator$.drainSingleWithVerification(RmmRapidsRetryIterator.scala:275)
	com.nvidia.spark.rapids.RmmRapidsRetryIterator$.withRetryNoSplit(RmmRapidsRetryIterator.scala:160)
	org.apache.spark.sql.rapids.execution.GpuSubPartitionHashJoin$.concatSpillBatchesAndClose(GpuSubPartitionHashJoin.scala:44)
	org.apache.spark.sql.rapids.execution.GpuSubPartitionPairIterator.$anonfun$tryPullNextPair$5(GpuSubPartitionHashJoin.scala:423)
	com.nvidia.spark.rapids.Arm$.closeOnExcept(Arm.scala:130)
	org.apache.spark.sql.rapids.execution.GpuSubPartitionPairIterator.tryPullNextPair(GpuSubPartitionHashJoin.scala:414)
	org.apache.spark.sql.rapids.execution.GpuSubPartitionPairIterator.hasNext(GpuSubPartitionHashJoin.scala:362)
	org.apache.spark.sql.rapids.execution.BaseSubHashJoinIterator.$anonfun$hasNext$5(GpuSubPartitionHashJoin.scala:506)
	scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
@mattahrens mattahrens added bug Something isn't working ? - Needs Triage Need team to review and classify reliability Features to improve reliability or bugs that severly impact the reliability of the plugin and removed ? - Needs Triage Need team to review and classify labels May 11, 2023
@mattahrens
Copy link
Collaborator Author

Confirmed that issue started after subpartition join was introduced in 23.04 release cycle: #7794.

@firestarman
Copy link
Collaborator

Do we have a full call stack for the hanging task ?

@revans2
Copy link
Collaborator

revans2 commented Jun 28, 2023

@firestarman I am not 100% sure that it is a hang. It could be a really big performance regression, or even a live lock. The thing to understand is that there is a lot of skew for one of the tasks that is hanging. But the skew appears to be in the stream side of the join, not necessarily the build side.

@mattahrens
Copy link
Collaborator Author

Here is sample stderr output for the skewed task that continues to spill (and doesn't complete):

23/06/28 14:02:12:367 WARN RapidsBufferCatalog: Targeting a host memory size of 34358965376. Current total 33815441920. Current spillable 33815441920
Executor task launch worker for task 42.0 in stage 658.0 (TID 53439) 23/06/28 14:02:12:367 WARN RapidsBufferCatalog: Targeting a host memory size of 34358970752. Current total 33816214912. Current spillable 33816214912
Executor task launch worker for task 42.0 in stage 658.0 (TID 53439) 23/06/28 14:02:12:367 WARN RapidsBufferCatalog: Targeting a host memory size of 34358560128. Current total 33816982528. Current spillable 33816982528
Executor task launch worker for task 42.0 in stage 658.0 (TID 53439) 23/06/28 14:02:12:367 WARN RapidsBufferCatalog: Targeting a host memory size of 34358542336. Current total 33818160768. Current spillable 33818160768
Executor task launch worker for task 42.0 in stage 658.0 (TID 53439) 23/06/28 14:02:12:367 WARN RapidsBufferCatalog: Targeting a host memory size of 34358545152. Current total 33819356800. Current spillable 33819356800
Executor task launch worker for task 42.0 in stage 658.0 (TID 53439) 23/06/28 14:02:12:367 WARN RapidsBufferCatalog: Targeting a host memory size of 33441763072. Current total 33820550016. Current spillable 33820550016
Executor task launch worker for task 42.0 in stage 658.0 (TID 53439) 23/06/28 14:02:12:367 WARN RapidsBufferCatalog: host memory store spilling to reduce usage from 33820550016 total (33820550016 spillable) to 33441763072 bytes
Executor task launch worker for task 42.0 in stage 658.0 (TID 53439) 23/06/28 14:02:12:865 INFO RapidsBufferCatalog: Spilled 946349376 bytes from the host memory store
Executor task launch worker for task 42.0 in stage 658.0 (TID 53439) 23/06/28 14:02:12:901 INFO DeviceMemoryEventHandler: Spilled 925787904 bytes from the device store
Executor task launch worker for task 42.0 in stage 658.0 (TID 53439) 23/06/28 14:02:13:774 INFO DeviceMemoryEventHandler: Device allocation of 446257616 bytes failed, device store has 37840067776 total and 36919661248 spillable bytes. First attempt. Total RMM allocated is 39787275520 bytes. 
Executor task launch worker for task 42.0 in stage 658.0 (TID 53439) 23/06/28 14:02:13:774 WARN RapidsBufferCatalog: Targeting a device memory size of 36473403632. Current total 37840067776. Current spillable 36919661248
Executor task launch worker for task 42.0 in stage 658.0 (TID 53439) 23/06/28 14:02:13:774 WARN RapidsBufferCatalog: device memory store spilling to reduce usage from 37840067776 total (36919661248 spillable) to 36473403632 bytes
Executor task launch worker for task 42.0 in stage 658.0 (TID 53439) 23/06/28 14:02:13:774 WARN RapidsBufferCatalog: Targeting a host memory size of 34358553088. Current total 33792175936. Current spillable 33792175936
Executor task launch worker for task 42.0 in stage 658.0 (TID 53439) 23/06/28 14:02:13:777 WARN RapidsBufferCatalog: Targeting a host memory size of 34359629184. Current total 33793361216. Current spillable 33793361216

@mattahrens
Copy link
Collaborator Author

Note that when I disable the subpartion join by setting spark.rapids.sql.test.subPartitioning.enabled=false, I get an OOM exception.

Here is exception sample:

	Suppressed: com.nvidia.spark.rapids.jni.SplitAndRetryOOM: GPU OutOfMemory: could not split inputs and retry
		at com.nvidia.spark.rapids.RmmRapidsRetryIterator$NoInputSpliterator.split(RmmRapidsRetryIterator.scala:359)
		at com.nvidia.spark.rapids.RmmRapidsRetryIterator$RmmRapidsRetryIterator.next(RmmRapidsRetryIterator.scala:520)
		at com.nvidia.spark.rapids.RmmRapidsRetryIterator$RmmRapidsRetryAutoCloseableIterator.next(RmmRapidsRetryIterator.scala:458)
		at com.nvidia.spark.rapids.RmmRapidsRetryIterator$.drainSingleWithVerification(RmmRapidsRetryIterator.scala:275)
		at com.nvidia.spark.rapids.RmmRapidsRetryIterator$.withRetryNoSplit(RmmRapidsRetryIterator.scala:181)
		at com.nvidia.spark.rapids.cudf_utils.HostConcatResultUtil$.getColumnarBatch(HostConcatResultUtil.scala:54)
		at com.nvidia.spark.rapids.GpuShuffleCoalesceIterator.$anonfun$next$4(GpuShuffleCoalesceExec.scala:222)
		at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
		at com.nvidia.spark.rapids.GpuShuffleCoalesceIterator.$anonfun$next$3(GpuShuffleCoalesceExec.scala:221)
		at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
		at com.nvidia.spark.rapids.GpuShuffleCoalesceIterator.$anonfun$next$1(GpuShuffleCoalesceExec.scala:214)
		at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
		at com.nvidia.spark.rapids.GpuShuffleCoalesceIterator.next(GpuShuffleCoalesceExec.scala:206)
		at com.nvidia.spark.rapids.GpuShuffleCoalesceIterator.next(GpuShuffleCoalesceExec.scala:192)
		at com.nvidia.spark.rapids.CollectTimeIterator.$anonfun$next$1(GpuExec.scala:183)
		at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
		at com.nvidia.spark.rapids.CollectTimeIterator.next(GpuExec.scala:182)
		at com.nvidia.spark.rapids.CollectTimeIterator.next(GpuExec.scala:171)
		at scala.collection.Iterator$$anon$22.next(Iterator.scala:1095)
		at scala.collection.Iterator$$anon$22.head(Iterator.scala:1082)
		at scala.collection.BufferedIterator.headOption(BufferedIterator.scala:32)
		at scala.collection.BufferedIterator.headOption$(BufferedIterator.scala:32)
		at scala.collection.Iterator$$anon$22.headOption(Iterator.scala:1076)
		at com.nvidia.spark.rapids.CloseableBufferedIterator.headOption(CloseableBufferedIterator.scala:36)
		at com.nvidia.spark.rapids.CloseableBufferedIterator.close(CloseableBufferedIterator.scala:41)
		at com.nvidia.spark.rapids.RapidsPluginImplicits$AutoCloseableColumn.safeClose(implicits.scala:51)
		at com.nvidia.spark.rapids.Arm$.closeOnExcept(Arm.scala:94)

@revans2
Copy link
Collaborator

revans2 commented Jun 28, 2023

The OOM error looks really odd to me. How in the world did we need to split data on a 40 GiB GPU when we just read in data from a shuffle that obviously fit on the GPU. Something is off here and I am not sure what it is.

@firestarman
Copy link
Collaborator

firestarman commented Jun 29, 2023

I am thinking if it was due to a potential live lock in the spilling framework according to the spilling logs.

It will always retry the allocation when there are spillable buffers according the code here after doing the spilling, even no buffer is actually spilled. So when it re-enters the onAllocFailure method, the storeSpillableSize is still the same as that in the previous loop, which makes the allocation keep retrying, but no buffer is spilled.

@abellina
Copy link
Collaborator

I am thinking if it was due to a potential live lock in the spilling framework according to the spilling logs.

It will always retry the allocation when there are spillable buffers according the code here after doing the spilling, even no buffer is actually spilled. So when it re-enters the onAllocFailure method, the storeSpillableSize is still the same as that in the previous loop, which makes the allocation keep retrying, but no buffer is spilled.

storeSpillableSize should be the amount of bytes that are actually spillable right now. This code assumes that storeSpillableSize will continue to decrease each time. Note that we print this number here https://github.com/NVIDIA/spark-rapids/blob/branch-23.08/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DeviceMemoryEventHandler.scala#L133C19-L133C39 so we should be able to verify that the store is getting depleted. If it is not, there is a bug somewhere.

I will try to run this today and see if I can repro the hang/spill situation.

@abellina
Copy link
Collaborator

Here's what I see after adding some debug logging, I am not as familiar with this code yet so I am mostly adding what I see so far:

I let the query run for ~1 hour and it eventually failed with OOM here https://github.com/NVIDIA/spark-rapids/blob/branch-23.08/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuSubPartitionHashJoin.scala#L45. So my attempt had loaded and partitioned build and stream batches as spillable and it is trying to exit tryPullNextPair https://github.com/NVIDIA/spark-rapids/blob/branch-23.08/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuSubPartitionHashJoin.scala#L423 with a full buildBatch.

There's a single task running at this stage, and it depleted the DeviceMemoryStore after spilling 1.6TiB to host memory and 1.6TiB to disk (I should mention the disk was close to filling up, which would lead to bad behavior if we ran this query with all the other queries... I ran q16 by itself).

This single task is trying to concatenate > 40GB of memory (see RMM allocated number) for the build side.. It is actually trying to materialize 197 GiB from the build side (https://github.com/NVIDIA/spark-rapids/blob/branch-23.08/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuSubPartitionHashJoin.scala#L423):

23/06/29 20:12:06 INFO GpuSubPartitionPairIterator: NEEDS repartitioning? true 197083141888

If I understand correctly this table is likely needed whole in memory as the code is currently written.

Rest of the OOM part of the log:

23/06/29 20:12:31 INFO DeviceMemoryEventHandler: Device allocation of 908468352 bytes failed, device store has 12046253440 total and 0 spillable bytes. Attempt 1. Total RMM allocated is 40383898880 bytes.
23/06/29 20:12:31 WARN DeviceMemoryEventHandler: [RETRY 1] Retrying allocation of 908468352 after a synchronize. Total RMM allocated is 40383898880 bytes.
23/06/29 20:12:31 INFO DeviceMemoryEventHandler: Device allocation of 908468352 bytes failed, device store has 12046253440 total and 0 spillable bytes. Attempt 2. Total RMM allocated is 40383898880 bytes.
23/06/29 20:12:31 WARN DeviceMemoryEventHandler: [RETRY 2] Retrying allocation of 908468352 after a synchronize. Total RMM allocated is 40383898880 bytes.
23/06/29 20:12:31 INFO DeviceMemoryEventHandler: Device allocation of 908468352 bytes failed, device store has 12046253440 total and 0 spillable bytes. Attempt 3. Total RMM allocated is 40383898880 bytes.
23/06/29 20:12:31 WARN DeviceMemoryEventHandler: Device store exhausted, unable to allocate 908468352 bytes. Total RMM allocated is 40383898880 bytes.
23/06/29 20:12:31 WARN GpuSemaphore: Dumping stack traces. The semaphore sees 1 tasks, 1 are holding onto the semaphore.
Semaphore held. Stack trace for task attempt id 15978:
    java.lang.Thread.getStackTrace(Thread.java:1559)
    com.nvidia.spark.rapids.GpuSemaphore.$anonfun$dumpActiveStackTracesToLog$1(GpuSemaphore.scala:194)
    com.nvidia.spark.rapids.GpuSemaphore.$anonfun$dumpActiveStackTracesToLog$1$adapted(GpuSemaphore.scala:191)
    java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1597)
    com.nvidia.spark.rapids.GpuSemaphore.dumpActiveStackTracesToLog(GpuSemaphore.scala:191)
    com.nvidia.spark.rapids.GpuSemaphore$.dumpActiveStackTracesToLog(GpuSemaphore.scala:88)
    com.nvidia.spark.rapids.DeviceMemoryEventHandler.$anonfun$onAllocFailure$3(DeviceMemoryEventHandler.scala:149)
    scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
    org.apache.spark.sql.rapids.GpuTaskMetrics.$anonfun$timeIt$1(GpuTaskMetrics.scala:125)
    com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
    org.apache.spark.sql.rapids.GpuTaskMetrics.timeIt(GpuTaskMetrics.scala:123)
    org.apache.spark.sql.rapids.GpuTaskMetrics.spillTime(GpuTaskMetrics.scala:134)
    com.nvidia.spark.rapids.DeviceMemoryEventHandler.onAllocFailure(DeviceMemoryEventHandler.scala:120)
    ai.rapids.cudf.Rmm.allocInternal(Native Method)
    ai.rapids.cudf.Rmm.alloc(Rmm.java:506)
    ai.rapids.cudf.DeviceMemoryBuffer.allocate(DeviceMemoryBuffer.java:143)
    ai.rapids.cudf.DeviceMemoryBuffer.allocate(DeviceMemoryBuffer.java:133)
    com.nvidia.spark.rapids.RapidsBufferStore$RapidsBufferBase.$anonfun$getDeviceMemoryBuffer$7(RapidsBufferStore.scala:355)
    com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
    com.nvidia.spark.rapids.RapidsBufferStore$RapidsBufferBase.$anonfun$getDeviceMemoryBuffer$1(RapidsBufferStore.scala:354)
    org.apache.spark.sql.rapids.GpuTaskMetrics.$anonfun$timeIt$1(GpuTaskMetrics.scala:125)
    com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
    org.apache.spark.sql.rapids.GpuTaskMetrics.timeIt(GpuTaskMetrics.scala:123)
    org.apache.spark.sql.rapids.GpuTaskMetrics.readSpillTime(GpuTaskMetrics.scala:136)
    com.nvidia.spark.rapids.RapidsBufferStore$RapidsBufferBase.getDeviceMemoryBuffer(RapidsBufferStore.scala:328)
    com.nvidia.spark.rapids.RapidsBufferStore$RapidsBufferBase.getColumnarBatch(RapidsBufferStore.scala:291)
    com.nvidia.spark.rapids.SpillableColumnarBatchImpl.$anonfun$getColumnarBatch$1(SpillableColumnarBatch.scala:112)
    com.nvidia.spark.rapids.SpillableColumnarBatchImpl.$anonfun$withRapidsBuffer$1(SpillableColumnarBatch.scala:95)
    com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
    com.nvidia.spark.rapids.SpillableColumnarBatchImpl.withRapidsBuffer(SpillableColumnarBatch.scala:94)
    com.nvidia.spark.rapids.SpillableColumnarBatchImpl.getColumnarBatch(SpillableColumnarBatch.scala:110)
    org.apache.spark.sql.rapids.execution.GpuSubPartitionHashJoin$.$anonfun$concatSpillBatchesAndClose$2(GpuSubPartitionHashJoin.scala:45)
    com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.$anonfun$safeMap$1(implicits.scala:216)
    com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.$anonfun$safeMap$1$adapted(implicits.scala:213)
    scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.safeMap(implicits.scala:213)
    com.nvidia.spark.rapids.RapidsPluginImplicits$AutoCloseableProducingSeq.safeMap(implicits.scala:248)
    org.apache.spark.sql.rapids.execution.GpuSubPartitionHashJoin$.$anonfun$concatSpillBatchesAndClose$1(GpuSubPartitionHashJoin.scala:45)
    com.nvidia.spark.rapids.RmmRapidsRetryIterator$AutoCloseableAttemptSpliterator.next(RmmRapidsRetryIterator.scala:431)

Additionally, we spend almost all of the time reading from the build/stream iterators, splitting, and making things spillable. This causes spill to happen as we do it for all of the input in one pass. In this case the input is massive.

I didn't see a livelock situation with the spill framework, but perhaps I have missed it (the code could have race conditions so I am happy to be wrong and @firestarman let me know if you have evidence to the contrary).

In discussing with @revans2 and @jlowe we need to change the algorithm here. We'll have to document what was discussed, but essentially the idea is to go away from pulling everything into memory and instead perform several small joins against a build table round robin, and merging the individual products.

@revans2
Copy link
Collaborator

revans2 commented Jun 29, 2023

@abellina do you know how big the build side is in this case? I just don't see how we end up with spilling 1.6 TiB for a build side table when the input for the entire shuffle is not that big. I think there has to be some kind of a bug in the internal code. Especially because previously the entire join worked without any errors so the entire build side fit into GPU memory.

@abellina
Copy link
Collaborator

@abellina do you know how big the build side is in this case? I just don't see how we end up with spilling 1.6 TiB for a build side table when the input for the entire shuffle is not that big. I think there has to be some kind of a bug in the internal code. Especially because previously the entire join worked without any errors so the entire build side fit into GPU memory.

The build batch was ~190 GiB total in this particular case. Running the query again, I actually see SQL metrics with a stream side that is even bigger (630 GiB):

data read size total (min, med, max (stageId: taskId))
1130.0 GiB (2.5 GiB, 2.5 GiB, 628.3 GiB (stage 42.0: task 16010))

And that build side:

build side size total (min, med, max (stageId: taskId))
332.0 GiB (759.8 MiB, 760.3 MiB, 184.3 GiB (stage 42.0: task 16010))

What looks to happen is both the build and stream side are being pulled fully and then we get the sub partition pairs that we call doJoin on. It looks like we can do this join for at least one pair, but then we get the huge build/stream sides that we need to repartition:

23/06/30 03:57:35 INFO GpuSubPartitionPairIterator: Build batches 206: size 197132660352 B and stream batches 899: size 672094546112 B

Then we attempt to repartition the build side 184 ways and it looks like we put this aside in the "big batches" area. Then I see we pop it again:

23/06/30 04:22:53 INFO GpuSubPartitionPairIterator: Build batches 206: size 197083141504 B and stream batches 899: size 671925724160 B

And the size is slightly different (I do not know why). But we do blow up because we say this is already partitioned, so the code tries to concatenate all of it. This is going to be bad. I do not know if all 197GB worth of build side have the same key.

By the way, I see really odd calls to contiguousSplit in partitionBatches for the partitioning:

23/06/30 04:17:49 INFO GpuBatchSubPartitioner: Partitioning batch 2936 bytes, 0,0,0,0,0,117,117,117,117,117,117,117,117,117,117,117 parts.

Which produces: 4 contiguous tables of 0 rows, then 1 table with 117 rows, then 10 0-row tables. I need to understand this better. I also see this:

 INFO GpuBatchSubPartitioner: Partitioning batch 109560 bytes, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 parts.

Code in question: https://github.com/NVIDIA/spark-rapids/blob/branch-23.08/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuSubPartitionHashJoin.scala#L182

I am not sure yet, but I believe this could be the stream side where a particular streamed batch didn't have any rows matching the key, but I would like to get some confirmation from @firestarman that this is the intention.

@firestarman
Copy link
Collaborator

firestarman commented Jun 30, 2023

Which produces: 4 contiguous tables of 0 rows, then 1 table with 117 rows, then 10 0-row tables. I need to understand this better. I also see this:

INFO GpuBatchSubPartitioner: Partitioning batch 109560 bytes, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 parts.

This should not happen if the input batch is not empty. We expect there should be at least one sub partition to have all its rows. Seems something is wrong here, but I am not sure what it is now. The indices for contiguousSplit are generated from our own hashPartition, it's GpuHashPartitioningBase.hashPartitionAndClose.

And the size is slightly different (I do not know why).

This may be due to the aligned sizes in contiguousSplit.

@firestarman
Copy link
Collaborator

I didn't see a livelock situation with the spill framework, but perhaps I have missed it (the code could have race conditions so I am happy to be wrong and @firestarman let me know if you have evidence to the contrary).

Sorry for confusion, this is just a guess from the code.

@firestarman
Copy link
Collaborator

firestarman commented Jun 30, 2023

I did some investigation and here is what I have found so far.

After loading and repartitioning all the build side data, the sub-partition 4 (shown as below) is quite big (about 190GB) than other sub-partitions. Looks like the data is highly skewed.

initBuild subpartitions: 
    Part Id        Part Size        Batch number
    0        49864448        154
    1        49751808        154
    2        49859584        154
    3        49892352        154
    4        197132661248        206
    5        49698944        154
    6        49802240        154
    7        49913984        154
    8        49870464        154
    9        49866112        154
    10        49883392        154
    11        49802496        154
    12        49837952        154
    13        49921024        154
    14        49875968        154
    15        49839488        154

I will dig more next.

@revans2
Copy link
Collaborator

revans2 commented Jun 30, 2023

I think we understand what is happening now. Query 16 has some issues with nulls on the right hand side of the join.

In the old code we used the a RequireSingleBatchWithFilter CoalesceSizeGoal for left-semi joins.

case class RequireSingleBatchWithFilter(filterExpression: GpuExpression)
extends CoalesceSizeGoal with RequireSingleBatchLike {
override val targetSizeBytes: Long = Long.MaxValue
/** Override toString to improve readability of Spark explain output */
override def toString: String = "RequireSingleBatchWithFilter"
}

// Goal to be used for the coalescing the build side. Note that this is internal to
// the join and not used for planning purposes. The two valid choices are `RequireSingleBatch` or
// `RequireSingleBatchWithFilter`
private lazy val buildGoal: CoalesceSizeGoal = joinType match {
case _: InnerLike | LeftSemi | LeftAnti =>
val nullFilteringMask = boundBuildKeys.map { bk =>
// coalesce(key1, false) or coalesce(key2, false) ... or ... coalesce(keyN, false)
// For any row with a key that is null, this filter mask will remove those rows.
GpuCoalesce(Seq(GpuCast(bk, BooleanType), GpuLiteral(false)))
}.reduce(GpuOr)
RequireSingleBatchWithFilter(nullFilteringMask)
case _ => RequireSingleBatch
}

This was used to remove nulls from the right side of these joins because there are some cases where Spark is not doing this. LeftSemi and LeftAnti are the big ones that we need to worry about.

This was missed with the new code. It is only used if need to get a single batch. It is never used if we think we have to partition the data.

So I think we need to change a few things here. Sadly there is no simple/good way to filter out nulls on the CPU when trying to get the build side table. It would be awesome, but I don't think we can do it.

So I would propose that we start out the same way that we are doing it today, and if there is a single batch with no overflow etc that we call getBuildBatchOptimizedAndClose, but then optionally filter the result on the constraint in the CoalesceGoal if it is there.

For the other cases once we get the gpuBuildIter we are going to have to filter it and possibly redo our size estimation because we could have multiple batches. I think we want to take the iterator and pull in filtered batches making them spillable and looking at the size of them until we either overflow or get the end of the stream.

If we hit the end of the stream we concat them all togheter and return it.

If we overflowed, then we need to do the safe iterator trick again with everything that we buffered up and send them to be partitioned.

@firestarman firestarman self-assigned this Jul 3, 2023
@firestarman
Copy link
Collaborator

firestarman commented Jul 3, 2023

Really appreciate for finding the root cause. I will try to make the fix next.

@firestarman
Copy link
Collaborator

firestarman commented Jul 5, 2023

With the linked PR, query16 at 30k dataset passed after running for 26 minutes without any spilling in my verification.

[Update] Need to verify the latest version of the PR, but is pending because Spark2a disks are out of space now.

[Update] The latest version can also have query16 passed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working reliability Features to improve reliability or bugs that severly impact the reliability of the plugin
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants