Skip to content

Commit

Permalink
[CELEBORN-1544] ShuffleWriter needs to call close finally to avoid me…
Browse files Browse the repository at this point in the history
…mory leaks

### What changes were proposed in this pull request?
This PR aims to fix a possible memory leak in ShuffleWriter.

### Why are the changes needed?
When we turn on `spark.speculation=true` or we kill the executing SQL, the task may be interrupted. At this time, `ShuffleWriter` may not call close.
At this time, `DataPusher#idleQueue` will occupy some memory capacity ( `celeborn.client.push.buffer.max.size` * `celeborn.client.push.queue.capacity` ) and the instance will not be released.

```java
Thread 537 (DataPusher-78931):
  State: TIMED_WAITING
  Blocked count: 0
  Waited count: 16337
  IsDaemon: true
  Stack:
    java.lang.Thread.sleep(Native Method)
    org.apache.celeborn.client.write.DataPushQueue.takePushTasks(DataPushQueue.java:135)
    org.apache.celeborn.client.write.DataPusher$1.run(DataPusher.java:122)
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Production testing

#### Current
<img width="547" alt="image" src="https://github.com/user-attachments/assets/d6f64257-144e-4139-96c6-518ca5f1bfd2">

#### PR
<img width="479" alt="image" src="https://github.com/user-attachments/assets/e4ff62ec-5b9d-47a4-a36c-1d13bf378cbc">

Closes apache#2661 from cxzl25/CELEBORN-1544.

Authored-by: sychen <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
  • Loading branch information
cxzl25 authored and waitinfuture committed Aug 2, 2024
1 parent dceef47 commit 48ce9b9
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,14 @@ public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOEx
} else {
write0(records);
}
close();
} catch (InterruptedException e) {
TaskInterruptedHelper.throwTaskKillException();
} finally {
try {
close();
} catch (InterruptedException e) {
TaskInterruptedHelper.throwTaskKillException();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,21 @@ public SortBasedShuffleWriter(

@Override
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
if (canUseFastWrite()) {
fastWrite0(records);
} else if (dep.mapSideCombine()) {
if (dep.aggregator().isEmpty()) {
throw new UnsupportedOperationException(
"When using map side combine, an aggregator must be specified.");
try {
if (canUseFastWrite()) {
fastWrite0(records);
} else if (dep.mapSideCombine()) {
if (dep.aggregator().isEmpty()) {
throw new UnsupportedOperationException(
"When using map side combine, an aggregator must be specified.");
}
write0(dep.aggregator().get().combineValuesByKey(records, taskContext));
} else {
write0(records);
}
write0(dep.aggregator().get().combineValuesByKey(records, taskContext));
} else {
write0(records);
} finally {
close();
}
close();
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,14 @@ public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOEx
} else {
write0(records);
}
close();
} catch (InterruptedException e) {
TaskInterruptedHelper.throwTaskKillException();
} finally {
try {
close();
} catch (InterruptedException e) {
TaskInterruptedHelper.throwTaskKillException();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,11 @@ void doWrite(scala.collection.Iterator<Product2<K, V>> records) throws IOExcepti

@Override
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
doWrite(records);
close();
try {
doWrite(records);
} finally {
close();
}
}

@VisibleForTesting
Expand Down

0 comments on commit 48ce9b9

Please sign in to comment.