Skip to content

Commit

Permalink
[hotfix][core] Makes ExecutorUtils#gracefulShutdown return any outsta…
Browse files Browse the repository at this point in the history
…nding tasks
  • Loading branch information
XComp committed Feb 5, 2024
1 parent 59044ca commit bbae036
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -38,8 +40,9 @@ public class ExecutorUtils {
* @param timeout to wait for the termination of all ExecutorServices
* @param unit of the timeout
* @param executorServices to shut down
* @return Tasks that were not executed prior to a {@link ExecutorService#shutdownNow()}.
*/
public static void gracefulShutdown(
public static List<Runnable> gracefulShutdown(
long timeout, TimeUnit unit, ExecutorService... executorServices) {
for (ExecutorService executorService : executorServices) {
executorService.shutdown();
Expand All @@ -50,22 +53,23 @@ public static void gracefulShutdown(
long timeLeft = unit.toMillis(timeout);
boolean hasTimeLeft = timeLeft > 0L;

final List<Runnable> outstandingTasks = new ArrayList<>();
for (ExecutorService executorService : executorServices) {
if (wasInterrupted || !hasTimeLeft) {
executorService.shutdownNow();
outstandingTasks.addAll(executorService.shutdownNow());
} else {
try {
if (!executorService.awaitTermination(timeLeft, TimeUnit.MILLISECONDS)) {
LOG.warn(
"ExecutorService did not terminate in time. Shutting it down now.");
executorService.shutdownNow();
outstandingTasks.addAll(executorService.shutdownNow());
}
} catch (InterruptedException e) {
LOG.warn(
"Interrupted while shutting down executor services. Shutting all "
+ "remaining ExecutorServices down now.",
e);
executorService.shutdownNow();
outstandingTasks.addAll(executorService.shutdownNow());

wasInterrupted = true;

Expand All @@ -76,6 +80,8 @@ public static void gracefulShutdown(
hasTimeLeft = timeLeft > 0L;
}
}

return outstandingTasks;
}

/**
Expand Down

0 comments on commit bbae036

Please sign in to comment.