Skip to content

Commit

Permalink
[CELEBORN-875][FOLLOWUP] Enhance DataPushQueueSuiteJ for thread saf…
Browse files Browse the repository at this point in the history
…ety and prevent `NullPointerException`

### What changes were proposed in this pull request?

1. replaced the usage of `HashMap` with `ConcurrentHashMap` for `partitionBatchIdMap` to ensure thread safety during parallel data processing
2. put the partition id and batch id into the `partitionBatchIdMap` before adding the task to prevent the possibility of a NPE

### Why are the changes needed?

to fix NPE

https://github.com/apache/incubator-celeborn/actions/runs/5734532048/job/15540863715?pr=1785

```
xception in thread "DataPusher-0" java.lang.NullPointerException
	at org.apache.celeborn.client.write.DataPushQueueSuiteJ$1.pushData(DataPushQueueSuiteJ.java:121)
	at org.apache.celeborn.client.write.DataPusher$1.run(DataPusher.java:125)
Error: The operation was canceled.
```

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Pass GA

Closes apache#1789 from cfmcgrady/celeborn-875-followup.

Authored-by: Fu Chen <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
  • Loading branch information
cfmcgrady authored and waitinfuture committed Aug 2, 2023
1 parent 2b79c37 commit 39ab731
Showing 1 changed file with 5 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;

import org.junit.AfterClass;
Expand All @@ -45,7 +45,7 @@ public class DataPushQueueSuiteJ {
private static File tempDir = null;

private final int shuffleId = 0;
private final int numPartitions = 10;
private final int numPartitions = 1000000;

@BeforeClass
public static void beforeAll() {
Expand All @@ -63,7 +63,7 @@ public static void afterAll() {

@Test
public void testDataPushQueue() throws Exception {
final int numWorker = 3;
final int numWorker = 30;
List<List<Integer>> workerData = new ArrayList<>();
for (int i = 0; i < numWorker; i++) {
workerData.add(new ArrayList<>());
Expand All @@ -76,7 +76,7 @@ public void testDataPushQueue() throws Exception {
tarWorkerData.add(new ArrayList<>());
}

Map<Integer, Integer> partitionBatchIdMap = new HashMap<>();
Map<Integer, Integer> partitionBatchIdMap = new ConcurrentHashMap<>();

CelebornConf conf = new CelebornConf();
conf.set(CelebornConf.CLIENT_PUSH_MAX_REQS_IN_FLIGHT_PERWORKER().key(), "2");
Expand All @@ -85,7 +85,6 @@ public void testDataPushQueue() throws Exception {
int mapId = 0;
int attemptId = 0;
int numMappers = 10;
int numPartitions = 10;
final File tempFile = new File(tempDir, UUID.randomUUID().toString());
DummyShuffleClient client = new DummyShuffleClient(conf, tempFile);
client.initReducePartitionMap(shuffleId, numPartitions, numWorker);
Expand Down Expand Up @@ -125,10 +124,10 @@ protected void pushData(PushTask task) throws IOException {

for (int i = 0; i < numPartitions; i++) {
byte[] b = intToBytes(workerData.get(i % numWorker).get(i / numWorker));
dataPusher.addTask(i, b, b.length);
int batchId = pushState.nextBatchId();
pushState.addBatch(batchId, reducePartitionMap.get(i).hostAndPushPort());
partitionBatchIdMap.put(i, batchId);
dataPusher.addTask(i, b, b.length);
}

dataPusher.waitOnTermination();
Expand Down

0 comments on commit 39ab731

Please sign in to comment.