Skip to content

Commit

Permalink
Revert "[hotfix][core] Generalizes FutureUtils#runAsync"
Browse files Browse the repository at this point in the history
This reverts commit e64b2fd.
  • Loading branch information
XComp committed Nov 27, 2024
1 parent ff60c9d commit d9d09b6
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.util.FatalExitExceptionHandler;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.util.function.ThrowingRunnable;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -923,14 +922,14 @@ public static <T> CompletableFuture<T> supplyAsync(
}

/**
* Returns a future which is completed when {@code runnable} is finished.
* Returns a future which is completed when {@link RunnableWithException} is finished.
*
* @param runnable represents the task
* @param executor to execute the runnable
* @return Future which is completed when runnable is finished
*/
public static CompletableFuture<Void> runAsync(
ThrowingRunnable<? extends Throwable> runnable, Executor executor) {
RunnableWithException runnable, Executor executor) {
return CompletableFuture.runAsync(
() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,9 +484,13 @@ public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
@Override
public CompletableFuture<Void> runAsLeader(
UUID leaderSessionId, ThrowingRunnable<? extends Throwable> callback) {
return FutureUtils.runAsync(
() -> EmbeddedLeaderService.this.runAsLeader(this, leaderSessionId, callback),
Executors.directExecutor());
try {
EmbeddedLeaderService.this.runAsLeader(this, leaderSessionId, callback);
} catch (LeadershipLostException e) {
return FutureUtils.completedExceptionally(e);
}

return FutureUtils.completedVoidFuture();
}

void shutdown(Exception cause) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand Down Expand Up @@ -367,14 +368,18 @@ CompletableFuture<Void> runAsLeader(
String componentId,
UUID leaderSessionId,
ThrowingRunnable<? extends Throwable> callback) {
return FutureUtils.runAsync(
return CompletableFuture.runAsync(
() -> {
synchronized (lock) {
if (!hasLeadership(componentId, leaderSessionId)) {
throw new LeadershipLostException(leaderSessionId);
throw new CompletionException(new LeadershipLostException(leaderSessionId));
}

callback.run();
try {
callback.run();
} catch (Throwable e) {
throw new CompletionException(e);
}
}
},
leadershipOperationExecutor);
Expand Down

0 comments on commit d9d09b6

Please sign in to comment.