Skip to content

Commit

Permalink
[#416] feat(hdfs): lazy initialization of hdfsShuffleWriteHandler whe…
Browse files Browse the repository at this point in the history
…n per-partition concurrent write is enabled (#816)

### What changes were proposed in this pull request?
 lazy initialization of `hdfsShuffleWriteHandler` when per-partition concurrent write is enabled

### Why are the changes needed?
Without this PR, it will create too much unnecessary `writerHandlers`  when enable the 
concurrent write of per-partition and no-race condition.

This PR is to reduce unnecessary handler creation.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
1. UTs
  • Loading branch information
zuston authored Apr 17, 2023
1 parent c6cde5d commit 23e0a51
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.function.Function;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
Expand All @@ -45,6 +46,8 @@ public class PooledHdfsShuffleWriteHandler implements ShuffleWriteHandler {
private final LinkedBlockingDeque<ShuffleWriteHandler> queue;
private final int maxConcurrency;
private final String basePath;
private Function<Integer, ShuffleWriteHandler> createWriterFunc;
private volatile int initializedHandlerCnt = 0;

// Only for tests
@VisibleForTesting
Expand All @@ -54,6 +57,17 @@ public PooledHdfsShuffleWriteHandler(LinkedBlockingDeque<ShuffleWriteHandler> qu
this.basePath = StringUtils.EMPTY;
}

@VisibleForTesting
public PooledHdfsShuffleWriteHandler(
LinkedBlockingDeque<ShuffleWriteHandler> queue,
int maxConcurrency,
Function<Integer, ShuffleWriteHandler> createWriterFunc) {
this.queue = queue;
this.maxConcurrency = maxConcurrency;
this.basePath = StringUtils.EMPTY;
this.createWriterFunc = createWriterFunc;
}

public PooledHdfsShuffleWriteHandler(
String appId,
int shuffleId,
Expand All @@ -70,31 +84,34 @@ public PooledHdfsShuffleWriteHandler(
this.basePath = ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePath,
ShuffleStorageUtils.getShuffleDataPath(appId, shuffleId, startPartition, endPartition));

// todo: support init lazily
try {
for (int i = 0; i < maxConcurrency; i++) {
// Use add() here because we are sure the capacity will not be exceeded.
// Note: add() throws IllegalStateException when queue is full.
queue.add(
new HdfsShuffleWriteHandler(
appId,
shuffleId,
startPartition,
endPartition,
storageBasePath,
fileNamePrefix + "_" + i,
hadoopConf,
user
)
this.createWriterFunc = index -> {
try {
return new HdfsShuffleWriteHandler(
appId,
shuffleId,
startPartition,
endPartition,
storageBasePath,
fileNamePrefix + "_" + index,
hadoopConf,
user
);
} catch (Exception e) {
throw new RssException("Errors on initializing Hdfs writer handler.", e);
}
} catch (Exception e) {
throw new RssException("Errors on initializing Hdfs writer handler.", e);
}
};
}

@Override
public void write(List<ShufflePartitionedBlock> shuffleBlocks) throws Exception {
if (queue.isEmpty() && initializedHandlerCnt < maxConcurrency) {
synchronized (this) {
if (initializedHandlerCnt < maxConcurrency) {
queue.add(createWriterFunc.apply(initializedHandlerCnt++));
}
}
}

if (queue.isEmpty()) {
LOGGER.warn("No free hdfs writer handler, it will wait. storage path: {}", basePath);
}
Expand All @@ -107,4 +124,9 @@ public void write(List<ShufflePartitionedBlock> shuffleBlocks) throws Exception
queue.addFirst(writeHandler);
}
}

@VisibleForTesting
protected int getInitializedHandlerCnt() {
return initializedHandlerCnt;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
Expand All @@ -46,13 +47,66 @@ static class FakedShuffleWriteHandler implements ShuffleWriteHandler {
this.execution = runnable;
}

FakedShuffleWriteHandler(List<Integer> initializedList, List<Integer> invokedList, int index, Runnable runnable) {
initializedList.add(index);
this.invokedList = invokedList;
this.index = index;
this.execution = runnable;
}

@Override
public void write(List<ShufflePartitionedBlock> shuffleBlocks) throws Exception {
execution.run();
invokedList.add(index);
}
}

@Test
public void lazyInitializeWriterHandlerTest() throws Exception {
int maxConcurrency = 5;
LinkedBlockingDeque deque = new LinkedBlockingDeque(maxConcurrency);

CopyOnWriteArrayList<Integer> invokedList = new CopyOnWriteArrayList<>();
CopyOnWriteArrayList<Integer> initializedList = new CopyOnWriteArrayList<>();

PooledHdfsShuffleWriteHandler handler = new PooledHdfsShuffleWriteHandler(
deque,
maxConcurrency,
index -> new FakedShuffleWriteHandler(initializedList, invokedList, index, () -> {
try {
Thread.sleep(10);
} catch (Exception e) {
// ignore
}
})
);

// case1: no race condition
for (int i = 0; i < 10; i++) {
handler.write(Collections.emptyList());
assertEquals(1, initializedList.size());
}

// case2: initialized by multi threads
invokedList.clear();
CountDownLatch latch = new CountDownLatch(100);
for (int i = 0; i < 100; i++) {
new Thread(() -> {
try {
handler.write(Collections.emptyList());
} catch (Exception e) {
// ignore
} finally {
latch.countDown();
}
}).start();
}
latch.await();
assertEquals(100, invokedList.size());
assertEquals(5, initializedList.size());
assertEquals(5, handler.getInitializedHandlerCnt());
}

@Test
public void writeSameFileWhenNoRaceCondition() throws Exception {
int concurrency = 5;
Expand Down

0 comments on commit 23e0a51

Please sign in to comment.