From 48ce9b9b497ac2346e8a2fc32d7a369da0f181fa Mon Sep 17 00:00:00 2001 From: sychen Date: Fri, 2 Aug 2024 20:24:31 +0800 Subject: [PATCH] [CELEBORN-1544] ShuffleWriter needs to call close finally to avoid memory leaks ### What changes were proposed in this pull request? This PR aims to fix a possible memory leak in ShuffleWriter. ### Why are the changes needed? When we turn on `spark.speculation=true` or we kill the executing SQL, the task may be interrupted. At this time, `ShuffleWriter` may not call close. At this time, `DataPusher#idleQueue` will occupy some memory capacity ( `celeborn.client.push.buffer.max.size` * `celeborn.client.push.queue.capacity` ) and the instance will not be released. ```java Thread 537 (DataPusher-78931): State: TIMED_WAITING Blocked count: 0 Waited count: 16337 IsDaemon: true Stack: java.lang.Thread.sleep(Native Method) org.apache.celeborn.client.write.DataPushQueue.takePushTasks(DataPushQueue.java:135) org.apache.celeborn.client.write.DataPusher$1.run(DataPusher.java:122) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Production testing #### Current image #### PR image Closes #2661 from cxzl25/CELEBORN-1544. Authored-by: sychen Signed-off-by: zky.zhoukeyong --- .../celeborn/HashBasedShuffleWriter.java | 7 +++++- .../celeborn/SortBasedShuffleWriter.java | 23 +++++++++++-------- .../celeborn/HashBasedShuffleWriter.java | 7 +++++- .../celeborn/SortBasedShuffleWriter.java | 7 ++++-- 4 files changed, 30 insertions(+), 14 deletions(-) diff --git a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java index 407b3284921..0986c23ef28 100644 --- a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java +++ b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java @@ -174,9 +174,14 @@ public void write(scala.collection.Iterator> records) throws IOEx } else { write0(records); } - close(); } catch (InterruptedException e) { TaskInterruptedHelper.throwTaskKillException(); + } finally { + try { + close(); + } catch (InterruptedException e) { + TaskInterruptedHelper.throwTaskKillException(); + } } } diff --git a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java index 2479d19cf85..5ecf1edba5c 100644 --- a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java +++ b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java @@ -143,18 +143,21 @@ public SortBasedShuffleWriter( @Override public void write(scala.collection.Iterator> records) throws IOException { - if (canUseFastWrite()) { - fastWrite0(records); - } else if (dep.mapSideCombine()) { - if (dep.aggregator().isEmpty()) { - throw new UnsupportedOperationException( - "When using map side combine, an aggregator must be specified."); + try { + if (canUseFastWrite()) { + fastWrite0(records); + } else if (dep.mapSideCombine()) { + if (dep.aggregator().isEmpty()) { + throw new UnsupportedOperationException( + "When using map side combine, an aggregator must be specified."); + } + write0(dep.aggregator().get().combineValuesByKey(records, taskContext)); + } else { + write0(records); } - write0(dep.aggregator().get().combineValuesByKey(records, taskContext)); - } else { - write0(records); + } finally { + close(); } - close(); } @VisibleForTesting diff --git a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java index 34377ee4a44..5a9b0455293 100644 --- a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java +++ b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java @@ -170,9 +170,14 @@ public void write(scala.collection.Iterator> records) throws IOEx } else { write0(records); } - close(); } catch (InterruptedException e) { TaskInterruptedHelper.throwTaskKillException(); + } finally { + try { + close(); + } catch (InterruptedException e) { + TaskInterruptedHelper.throwTaskKillException(); + } } } diff --git a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java index 2b810b190d7..f3b856394d5 100644 --- a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java +++ b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java @@ -226,8 +226,11 @@ void doWrite(scala.collection.Iterator> records) throws IOExcepti @Override public void write(scala.collection.Iterator> records) throws IOException { - doWrite(records); - close(); + try { + doWrite(records); + } finally { + close(); + } } @VisibleForTesting