Skip to content

Commit

Permalink
[CELEBORN-1639] Fix SlotsAllocatorRackAwareSuiteJ UT
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

In offerSlotsRoundRobinWithRackAware and offerSlotsRoundRobinWithRackAwareWithoutMappingFile method of SlotsAllocatorRackAwareSuiteJ UT, the result slots is empty, so they can not test the slots allocation with rack aware is true.

### Why are the changes needed?

To fix the exists UT

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Fix UT methods: `offerSlotsRoundRobinWithRackAware()` and `offerSlotsRoundRobinWithRackAwareWithoutMappingFile()`

Closes apache#2800 from wankunde/slots_allocator2.

Authored-by: wankunde <[email protected]>
Signed-off-by: mingji <[email protected]>
  • Loading branch information
wankunde authored and FMX committed Oct 12, 2024
1 parent d14e9bb commit 991f1c2
Showing 1 changed file with 19 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.stream.IntStream;

import scala.Tuple2;
import scala.collection.mutable.ArrayBuffer;

import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.TableMapping;
Expand Down Expand Up @@ -91,6 +92,9 @@ public void accept(PartitionLocation location) {
resolver.resolve(location.getPeer().getHost()).getNetworkLocation());
}
};
Integer locationsCount =
slots.values().stream().map(tup -> tup._1.size() + tup._2.size()).reduce(0, Integer::sum);
Assert.assertEquals((long) locationsCount, partitionIds.size() * 2);
slots.values().stream().map(Tuple2::_1).flatMap(Collection::stream).forEach(assertCustomer);
}

Expand Down Expand Up @@ -131,17 +135,26 @@ public void accept(PartitionLocation location) {
resolver.resolve(location.getPeer().getHost()).getNetworkLocation());
}
};
Integer locationsCount =
slots.values().stream().map(tup -> tup._1.size() + tup._2.size()).reduce(0, Integer::sum);
Assert.assertEquals((long) locationsCount, partitionIds.size() * 2);
slots.values().stream().map(Tuple2::_1).flatMap(Collection::stream).forEach(assertConsumer);
}

private List<WorkerInfo> prepareWorkers(CelebornRackResolver resolver) {
ArrayList<WorkerInfo> workers = new ArrayList<>(3);
workers.add(new WorkerInfo("host1", 9, 10, 110, 113, 212, new HashMap<>(), null));
workers.add(new WorkerInfo("host2", 9, 11, 111, 114, 212, new HashMap<>(), null));
workers.add(new WorkerInfo("host3", 9, 12, 112, 115, 212, new HashMap<>(), null));
workers.add(new WorkerInfo("host4", 9, 10, 110, 113, 212, new HashMap<>(), null));
workers.add(new WorkerInfo("host5", 9, 11, 111, 114, 212, new HashMap<>(), null));
workers.add(new WorkerInfo("host6", 9, 12, 112, 115, 212, new HashMap<>(), null));
ArrayBuffer<File> files = new ArrayBuffer<>();
files.$plus$eq(new File("/mnt/disk/1"));
files.$plus$eq(new File("/mnt/disk/2"));
HashMap<String, DiskInfo> diskInfos = new HashMap<>();
diskInfos.put(
"disk1", new DiskInfo("/mnt/disk/0", 1000, 1000, 1000, 1000, files.toList(), null));
workers.add(new WorkerInfo("host1", 9, 10, 110, 113, 212, diskInfos, null));
workers.add(new WorkerInfo("host2", 9, 11, 111, 114, 212, diskInfos, null));
workers.add(new WorkerInfo("host3", 9, 12, 112, 115, 212, diskInfos, null));
workers.add(new WorkerInfo("host4", 9, 10, 110, 113, 212, diskInfos, null));
workers.add(new WorkerInfo("host5", 9, 11, 111, 114, 212, diskInfos, null));
workers.add(new WorkerInfo("host6", 9, 12, 112, 115, 212, diskInfos, null));

workers.forEach(
new Consumer<WorkerInfo>() {
Expand Down Expand Up @@ -217,7 +230,6 @@ public void testRackAwareRoundRobinReplicaDistribution() {
List<SlotReplicaAllocatorTestCase> allTests = getSlotReplicaAllocatorTestCases();

for (final SlotReplicaAllocatorTestCase test : allTests) {

final int numPartitions = test.getNumPartitions();
long maxValue = Long.MIN_VALUE;
List<WorkerInfo> maxValueWorkers = null;
Expand Down

0 comments on commit 991f1c2

Please sign in to comment.