Skip to content

Commit

Permalink
Remove ThreadPool (opensource4you#513)
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 authored Jul 28, 2022
1 parent 5dc5de3 commit 9a7731e
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 466 deletions.
37 changes: 0 additions & 37 deletions app/src/main/java/org/astraea/app/concurrent/Executor.java

This file was deleted.

24 changes: 0 additions & 24 deletions app/src/main/java/org/astraea/app/concurrent/State.java

This file was deleted.

136 changes: 0 additions & 136 deletions app/src/main/java/org/astraea/app/concurrent/ThreadPool.java

This file was deleted.

137 changes: 64 additions & 73 deletions app/src/test/java/org/astraea/app/admin/AdminTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
Expand All @@ -37,8 +38,6 @@
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.astraea.app.common.DataRate;
import org.astraea.app.common.Utils;
import org.astraea.app.concurrent.State;
import org.astraea.app.concurrent.ThreadPool;
import org.astraea.app.consumer.Consumer;
import org.astraea.app.consumer.Deserializer;
import org.astraea.app.producer.Producer;
Expand Down Expand Up @@ -976,32 +975,29 @@ void testReassignmentWhenMovingPartitionToAnotherBroker() {
try (var producer = Producer.of(bootstrapServers())) {
var done = new AtomicBoolean(false);
var data = new byte[1000];
try (var pool =
ThreadPool.builder()
.executor(
() -> {
producer.sender().topic(topicName).key(data).value(data).run();
return done.get() ? State.DONE : State.RUNNING;
})
.build()) {

try {
admin.migrator().topic(topicName).moveTo(List.of(nextBroker));
var reassignment =
admin.reassignments(Set.of(topicName)).get(TopicPartition.of(topicName, 0));

// Don't verify the result if the migration is done
if (reassignment != null) {
Assertions.assertEquals(1, reassignment.from().size());
var from = reassignment.from().iterator().next();
Assertions.assertEquals(currentBroker, from.broker());
Assertions.assertEquals(1, reassignment.to().size());
var to = reassignment.to().iterator().next();
Assertions.assertEquals(nextBroker, to.broker());
}
} finally {
done.set(true);
var f =
CompletableFuture.runAsync(
() -> {
while (!done.get())
producer.sender().topic(topicName).key(data).value(data).run();
});
try {
admin.migrator().topic(topicName).moveTo(List.of(nextBroker));
var reassignment =
admin.reassignments(Set.of(topicName)).get(TopicPartition.of(topicName, 0));

// Don't verify the result if the migration is done
if (reassignment != null) {
Assertions.assertEquals(1, reassignment.from().size());
var from = reassignment.from().iterator().next();
Assertions.assertEquals(currentBroker, from.broker());
Assertions.assertEquals(1, reassignment.to().size());
var to = reassignment.to().iterator().next();
Assertions.assertEquals(nextBroker, to.broker());
}
} finally {
done.set(true);
Utils.swallowException(f::get);
}
}
}
Expand All @@ -1027,33 +1023,31 @@ void testReassignmentWhenMovingPartitionToAnotherPath() {
try (var producer = Producer.of(bootstrapServers())) {
var done = new AtomicBoolean(false);
var data = new byte[1000];
try (var pool =
ThreadPool.builder()
.executor(
() -> {
producer.sender().topic(topicName).key(data).value(data).run();
return done.get() ? State.DONE : State.RUNNING;
})
.build()) {

try {
admin.migrator().topic(topicName).moveTo(Map.of(currentReplica.broker(), nextPath));
var reassignment =
admin.reassignments(Set.of(topicName)).get(TopicPartition.of(topicName, 0));
// Don't verify the result if the migration is done
if (reassignment != null) {
Assertions.assertEquals(1, reassignment.from().size());
var from = reassignment.from().iterator().next();
Assertions.assertEquals(currentBroker, from.broker());
Assertions.assertEquals(currentPath, from.path());
Assertions.assertEquals(1, reassignment.to().size());
var to = reassignment.to().iterator().next();
Assertions.assertEquals(currentBroker, to.broker());
Assertions.assertEquals(nextPath, to.path());
}
} finally {
done.set(true);
var f =
CompletableFuture.runAsync(
() -> {
while (!done.get())
producer.sender().topic(topicName).key(data).value(data).run();
});

try {
admin.migrator().topic(topicName).moveTo(Map.of(currentReplica.broker(), nextPath));
var reassignment =
admin.reassignments(Set.of(topicName)).get(TopicPartition.of(topicName, 0));
// Don't verify the result if the migration is done
if (reassignment != null) {
Assertions.assertEquals(1, reassignment.from().size());
var from = reassignment.from().iterator().next();
Assertions.assertEquals(currentBroker, from.broker());
Assertions.assertEquals(currentPath, from.path());
Assertions.assertEquals(1, reassignment.to().size());
var to = reassignment.to().iterator().next();
Assertions.assertEquals(currentBroker, to.broker());
Assertions.assertEquals(nextPath, to.path());
}
} finally {
done.set(true);
Utils.swallowException(f::get);
}
}
}
Expand All @@ -1070,27 +1064,24 @@ void testMultiReassignments() {
try (var producer = Producer.of(bootstrapServers())) {
var done = new AtomicBoolean(false);
var data = new byte[1000];
try (var pool =
ThreadPool.builder()
.executor(
() -> {
producer.sender().topic(topicName).key(data).value(data).run();
return done.get() ? State.DONE : State.RUNNING;
})
.build()) {

try {
admin.migrator().topic(topicName).moveTo(brokers);
var reassignment =
admin.reassignments(Set.of(topicName)).get(TopicPartition.of(topicName, 0));
// Don't verify the result if the migration is done
if (reassignment != null) {
Assertions.assertEquals(3, reassignment.from().size());
Assertions.assertEquals(2, reassignment.to().size());
}
} finally {
done.set(true);
var f =
CompletableFuture.runAsync(
() -> {
while (!done.get())
producer.sender().topic(topicName).key(data).value(data).run();
});
try {
admin.migrator().topic(topicName).moveTo(brokers);
var reassignment =
admin.reassignments(Set.of(topicName)).get(TopicPartition.of(topicName, 0));
// Don't verify the result if the migration is done
if (reassignment != null) {
Assertions.assertEquals(3, reassignment.from().size());
Assertions.assertEquals(2, reassignment.to().size());
}
} finally {
done.set(true);
Utils.swallowException(f::get);
}
}
}
Expand Down
Loading

0 comments on commit 9a7731e

Please sign in to comment.