callback,
+ String callbackLabel) {
+ AdaptiveScheduler.this
+ .getMainThreadExecutor()
+ .execute(
+ () ->
+ state.tryRun(
+ CheckpointStatsListener.class,
+ callback,
+ logger ->
+ logger.debug(
+ "{} is not supported by {}.",
+ callbackLabel,
+ state.getClass().getName())));
+ }
+ };
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManager.java
index 69c11f1b8129bb..0b0fc013357e09 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManager.java
@@ -26,6 +26,7 @@
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
import java.time.Duration;
import java.time.Instant;
@@ -47,8 +48,12 @@
* available (its lower threshold is defined by (@code scalingIntervalMax}).
*
*
+ * Thread-safety: This class is not implemented in a thread-safe manner and relies on the fact
+ * that any method call happens within a single thread.
+ *
* @see Executing
*/
+@NotThreadSafe
public class DefaultRescaleManager implements RescaleManager {
private static final Logger LOG = LoggerFactory.getLogger(DefaultRescaleManager.class);
@@ -116,37 +121,21 @@ public class DefaultRescaleManager implements RescaleManager {
@Override
public void onChange() {
- runInContextMainThread(
- () -> {
- if (this.triggerFuture.isDone()) {
- this.triggerFuture =
- scheduleOperationWithTrigger(this::evaluateChangeEvent);
- }
- });
+ if (this.triggerFuture.isDone()) {
+ this.triggerFuture = scheduleOperationWithTrigger(this::evaluateChangeEvent);
+ }
}
@Override
public void onTrigger() {
- runInContextMainThread(
- () -> {
- if (!this.triggerFuture.isDone()) {
- this.triggerFuture.complete(null);
- LOG.debug(
- "A rescale trigger event was observed causing the rescale verification logic to be initiated.");
- } else {
- LOG.debug(
- "A rescale trigger event was observed outside of a rescale cycle. No action taken.");
- }
- });
- }
-
- /**
- * Runs the {@code callback} in the context's main thread by scheduling the operation with no
- * delay. This method should be used for internal state changes that might be triggered from
- * outside the context's main thread.
- */
- private void runInContextMainThread(Runnable callback) {
- rescaleContext.scheduleOperation(callback, Duration.ZERO);
+ if (!this.triggerFuture.isDone()) {
+ this.triggerFuture.complete(null);
+ LOG.debug(
+ "A rescale trigger event was observed causing the rescale verification logic to be initiated.");
+ } else {
+ LOG.debug(
+ "A rescale trigger event was observed outside of a rescale cycle. No action taken.");
+ }
}
private void evaluateChangeEvent() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
index 80ee29af6ad92b..1fcd23884f5a1c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
@@ -22,6 +22,7 @@
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsListener;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -52,17 +53,21 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/** State which represents a running job with an {@link ExecutionGraph} and assigned slots. */
class Executing extends StateWithExecutionGraph
- implements ResourceListener, RescaleManager.Context {
+ implements ResourceListener, RescaleManager.Context, CheckpointStatsListener {
private final Context context;
private final RescalingController sufficientResourcesController;
private final RescalingController desiredResourcesController;
private final RescaleManager rescaleManager;
+ private final int rescaleOnFailedCheckpointCount;
+ // null indicates that there was no change event observed, yet
+ @Nullable private AtomicInteger failedCheckpointCountdown;
Executing(
ExecutionGraph executionGraph,
@@ -74,6 +79,7 @@ class Executing extends StateWithExecutionGraph
List failureCollection,
RescaleManager.Factory rescaleManagerFactory,
int minParallelismChangeForRescale,
+ int rescaleOnFailedCheckpointCount,
Instant lastRescale) {
super(
context,
@@ -92,10 +98,22 @@ class Executing extends StateWithExecutionGraph
new EnforceMinimalIncreaseRescalingController(minParallelismChangeForRescale);
this.rescaleManager = rescaleManagerFactory.create(this, lastRescale);
+ Preconditions.checkArgument(
+ rescaleOnFailedCheckpointCount > 0,
+ "The rescaleOnFailedCheckpointCount should be larger than 0.");
+ this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount;
+ this.failedCheckpointCountdown = null;
+
deploy();
// check if new resources have come available in the meantime
- context.runIfState(this, this::evaluateRescaling, Duration.ZERO);
+ context.runIfState(
+ this,
+ () -> {
+ rescaleManager.onChange();
+ rescaleManager.onTrigger();
+ },
+ Duration.ZERO);
}
@Override
@@ -194,17 +212,38 @@ private void handleDeploymentFailure(ExecutionVertex executionVertex, JobExcepti
@Override
public void onNewResourcesAvailable() {
- evaluateRescaling();
+ rescaleManager.onChange();
+ initializeFailedCheckpointCountdownIfUnset();
}
@Override
public void onNewResourceRequirements() {
- evaluateRescaling();
+ rescaleManager.onChange();
+ initializeFailedCheckpointCountdownIfUnset();
}
- private void evaluateRescaling() {
- rescaleManager.onChange();
+ @Override
+ public void onCompletedCheckpoint() {
+ triggerPotentialRescale();
+ }
+
+ @Override
+ public void onFailedCheckpoint() {
+ if (this.failedCheckpointCountdown != null
+ && this.failedCheckpointCountdown.decrementAndGet() <= 0) {
+ triggerPotentialRescale();
+ }
+ }
+
+ private void triggerPotentialRescale() {
rescaleManager.onTrigger();
+ this.failedCheckpointCountdown = null;
+ }
+
+ private void initializeFailedCheckpointCountdownIfUnset() {
+ if (failedCheckpointCountdown == null) {
+ this.failedCheckpointCountdown = new AtomicInteger(this.rescaleOnFailedCheckpointCount);
+ }
}
CompletableFuture stopWithSavepoint(
@@ -285,6 +324,7 @@ static class Factory implements StateFactory {
private final List failureCollection;
private final RescaleManager.Factory rescaleManagerFactory;
private final int minParallelismChangeForRescale;
+ private final int rescaleOnFailedCheckpointCount;
Factory(
ExecutionGraph executionGraph,
@@ -295,7 +335,8 @@ static class Factory implements StateFactory {
ClassLoader userCodeClassLoader,
List failureCollection,
RescaleManager.Factory rescaleManagerFactory,
- int minParallelismChangeForRescale) {
+ int minParallelismChangeForRescale,
+ int rescaleOnFailedCheckpointCount) {
this.context = context;
this.log = log;
this.executionGraph = executionGraph;
@@ -305,6 +346,7 @@ static class Factory implements StateFactory {
this.failureCollection = failureCollection;
this.rescaleManagerFactory = rescaleManagerFactory;
this.minParallelismChangeForRescale = minParallelismChangeForRescale;
+ this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount;
}
public Class getStateClass() {
@@ -322,6 +364,7 @@ public Executing getState() {
failureCollection,
rescaleManagerFactory,
minParallelismChangeForRescale,
+ rescaleOnFailedCheckpointCount,
Instant.now());
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java
index 4a815642d1c5cf..2a4bb9660421b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java
@@ -26,6 +26,7 @@
import org.slf4j.Logger;
import java.util.Optional;
+import java.util.function.Consumer;
/**
* State abstraction of the {@link AdaptiveScheduler}. This interface contains all methods every
@@ -100,17 +101,38 @@ default Optional as(Class extends T> clazz) {
*/
default void tryRun(
Class extends T> clazz, ThrowingConsumer action, String debugMessage) throws E {
+ tryRun(
+ clazz,
+ action,
+ logger ->
+ logger.debug(
+ "Cannot run '{}' because the actual state is {} and not {}.",
+ debugMessage,
+ this.getClass().getSimpleName(),
+ clazz.getSimpleName()));
+ }
+
+ /**
+ * Tries to run the action if this state is of type clazz.
+ *
+ * @param clazz clazz describes the target type
+ * @param action action to run if this state is of the target type
+ * @param invalidStateCallback that is called if the state isn't matching the expected one.
+ * @param target type
+ * @param error type
+ * @throws E an exception if the action fails
+ */
+ default void tryRun(
+ Class extends T> clazz,
+ ThrowingConsumer action,
+ Consumer invalidStateCallback)
+ throws E {
final Optional extends T> asOptional = as(clazz);
if (asOptional.isPresent()) {
action.accept(asOptional.get());
} else {
- getLogger()
- .debug(
- "Cannot run '{}' because the actual state is {} and not {}.",
- debugMessage,
- this.getClass().getSimpleName(),
- clazz.getSimpleName());
+ invalidStateCallback.accept(getLogger());
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java
index 42555be1701239..ad1571925db777 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java
@@ -46,6 +46,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import static java.util.Collections.singletonMap;
@@ -239,6 +241,71 @@ void testCheckpointTracking() throws Exception {
assertThat(snapshot.getLatestRestoredCheckpoint()).isEqualTo(restored);
}
+ @Test
+ void testCheckpointStatsListenerOnCompletedCheckpoint() {
+ testCheckpointStatsListener(
+ (checkpointStatsTracker, pendingCheckpointStats) ->
+ checkpointStatsTracker.reportCompletedCheckpoint(
+ pendingCheckpointStats.toCompletedCheckpointStats(
+ "random-external-pointer")),
+ 1,
+ 0);
+ }
+
+ @Test
+ void testCheckpointStatsListenerOnFailedCheckpoint() {
+ testCheckpointStatsListener(
+ (checkpointStatsTracker, pendingCheckpointStats) ->
+ checkpointStatsTracker.reportFailedCheckpoint(
+ pendingCheckpointStats.toFailedCheckpoint(
+ System.currentTimeMillis(), null)),
+ 0,
+ 1);
+ }
+
+ private void testCheckpointStatsListener(
+ BiConsumer testCodeCallback,
+ int expectedOnCompletedCheckpointCount,
+ int expectedOnFailedCheckpointCount) {
+ final AtomicInteger onCompletedCheckpointCount = new AtomicInteger();
+ final AtomicInteger onFailedCheckpointCount = new AtomicInteger();
+ final CheckpointStatsListener listener =
+ new CheckpointStatsListener() {
+ @Override
+ public void onCompletedCheckpoint() {
+ onCompletedCheckpointCount.incrementAndGet();
+ }
+
+ @Override
+ public void onFailedCheckpoint() {
+ onFailedCheckpointCount.incrementAndGet();
+ }
+ };
+
+ final CheckpointStatsTracker statsTracker =
+ new DefaultCheckpointStatsTracker(
+ 10,
+ UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(),
+ listener);
+
+ // "factory" code to enable the instantiation of test data based on a PendingCheckpointStats
+ // instance
+ final JobVertexID jobVertexID = new JobVertexID();
+ final PendingCheckpointStats pending =
+ statsTracker.reportPendingCheckpoint(
+ 0,
+ 1,
+ CheckpointProperties.forCheckpoint(
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+ singletonMap(jobVertexID, 1));
+ pending.reportSubtaskStats(jobVertexID, createSubtaskStats(0));
+
+ testCodeCallback.accept(statsTracker, pending);
+
+ assertThat(onCompletedCheckpointCount).hasValue(expectedOnCompletedCheckpointCount);
+ assertThat(onFailedCheckpointCount).hasValue(expectedOnFailedCheckpointCount);
+ }
+
/** Tests that snapshots are only created if a new snapshot has been reported or updated. */
@Test
void testCreateSnapshot() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
index fca3c7a854833d..42f50616490b2b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
@@ -23,7 +23,10 @@
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsListener;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.DefaultCheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
@@ -52,6 +55,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.BiFunction;
import java.util.function.Function;
/** Builder for {@link AdaptiveScheduler}. */
@@ -89,6 +93,17 @@ public class AdaptiveSchedulerBuilder {
@Nullable private SlotAllocator slotAllocator;
+ /**
+ * {@code null} indicates that the default factory will be used based on the set configuration.
+ */
+ @Nullable private RescaleManager.Factory rescaleManagerFactory = null;
+
+ private BiFunction
+ checkpointStatsTrackerFactory =
+ (metricGroup, checkpointStatsListener) ->
+ new DefaultCheckpointStatsTracker(
+ 10, metricGroup, checkpointStatsListener);
+
public AdaptiveSchedulerBuilder(
final JobGraph jobGraph,
ComponentMainThreadExecutor mainThreadExecutor,
@@ -206,6 +221,23 @@ public AdaptiveSchedulerBuilder setSlotAllocator(SlotAllocator slotAllocator) {
return this;
}
+ public AdaptiveSchedulerBuilder setRescaleManagerFactory(
+ @Nullable RescaleManager.Factory rescaleManagerFactory) {
+ this.rescaleManagerFactory = rescaleManagerFactory;
+ return this;
+ }
+
+ public AdaptiveSchedulerBuilder setCheckpointStatsTrackerFactory(
+ @Nullable
+ BiFunction<
+ JobManagerJobMetricGroup,
+ CheckpointStatsListener,
+ CheckpointStatsTracker>
+ checkpointStatsTrackerFactory) {
+ this.checkpointStatsTrackerFactory = checkpointStatsTrackerFactory;
+ return this;
+ }
+
public AdaptiveScheduler build() throws Exception {
final ExecutionGraphFactory executionGraphFactory =
new DefaultExecutionGraphFactory(
@@ -220,8 +252,14 @@ public AdaptiveScheduler build() throws Exception {
shuffleMaster,
partitionTracker);
+ final AdaptiveScheduler.Settings settings =
+ AdaptiveScheduler.Settings.of(jobMasterConfiguration);
return new AdaptiveScheduler(
- AdaptiveScheduler.Settings.of(jobMasterConfiguration),
+ settings,
+ rescaleManagerFactory == null
+ ? DefaultRescaleManager.Factory.fromSettings(settings)
+ : rescaleManagerFactory,
+ checkpointStatsTrackerFactory,
jobGraph,
jobResourceRequirements,
jobMasterConfiguration,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 6fb3d0e017f85c..d1bde31da6c2f9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -33,8 +33,10 @@
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsListener;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.NoOpCheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter;
@@ -64,6 +66,7 @@
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
@@ -75,6 +78,8 @@
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.TestingDeclarativeSlotPoolBuilder;
+import org.apache.flink.runtime.jobmaster.slotpool.TestingFreeSlotInfoTracker;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.metrics.MetricNames;
@@ -89,6 +94,8 @@
import org.apache.flink.runtime.scheduler.DefaultSchedulerTest;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
+import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.scheduler.adaptive.allocator.TestSlotInfo;
@@ -106,6 +113,7 @@
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.traces.Span;
import org.apache.flink.traces.SpanBuilder;
+import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;
@@ -141,6 +149,7 @@
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
@@ -150,6 +159,7 @@
import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots;
import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.enableCheckpointing;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for the {@link AdaptiveScheduler}. */
@@ -1280,7 +1290,8 @@ private static Configuration createConfigurationWithNoTimeouts() {
return new Configuration()
.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(-1L))
.set(JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT, Duration.ofMillis(1L))
- .set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN, Duration.ofMillis(1L));
+ .set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN, Duration.ofMillis(1L))
+ .set(JobManagerOptions.MAXIMUM_DELAY_FOR_SCALE_TRIGGER, Duration.ZERO);
}
private AdaptiveSchedulerBuilder prepareSchedulerWithNoTimeouts(
@@ -2112,7 +2123,7 @@ void testRequestUpdatedResourceRequirements() throws Exception {
}
@Test
- public void testScalingIntervalConfigurationIsRespected() {
+ void testScalingIntervalConfigurationIsRespected() throws ConfigurationException {
final Duration scalingIntervalMin = Duration.ofMillis(1337);
final Duration scalingIntervalMax = Duration.ofMillis(7331);
final Configuration configuration = createConfigurationWithNoTimeouts();
@@ -2124,10 +2135,151 @@ public void testScalingIntervalConfigurationIsRespected() {
assertThat(settings.getScalingIntervalMax()).isEqualTo(scalingIntervalMax);
}
+ @Test
+ void testOnCompletedCheckpointIsHandledInMainThread() throws Exception {
+ testCheckpointStatsEventBeingExecutedInTheMainThread(
+ CheckpointStatsListener::onCompletedCheckpoint, 1, Integer.MAX_VALUE);
+ }
+
+ @Test
+ void testOnFailedCheckpointIsHandledInMainThread() throws Exception {
+ testCheckpointStatsEventBeingExecutedInTheMainThread(
+ CheckpointStatsListener::onFailedCheckpoint, 2, 2);
+ }
+
+ private void testCheckpointStatsEventBeingExecutedInTheMainThread(
+ Consumer eventCallback,
+ int eventRepetitions,
+ int triggerOnFailedCheckpointCount)
+ throws Exception {
+
+ final CompletableFuture statsListenerInstantiatedFuture =
+ new CompletableFuture<>();
+ final BlockingQueue eventQueue = new ArrayBlockingQueue<>(1);
+
+ final AdaptiveScheduler testInstance =
+ createSchedulerThatReachesExecutingState(
+ PARALLELISM,
+ triggerOnFailedCheckpointCount,
+ eventQueue,
+ statsListenerInstantiatedFuture);
+
+ try {
+ // start scheduling to reach Executing state
+ singleThreadMainThreadExecutor.execute(testInstance::startScheduling);
+
+ final CheckpointStatsListener statsListener = statsListenerInstantiatedFuture.get();
+ assertThat(statsListener)
+ .as("The CheckpointStatsListener should have been instantiated.")
+ .isNotNull();
+
+ // the first trigger happens in the Executing initialization - let's wait for that event
+ // to pass
+ assertThat(eventQueue.take())
+ .as(
+ "The first event should have been appeared during Executing state initialization and should be ignored.")
+ .isEqualTo(0);
+
+ // counting the failed checkpoints only starts on a change event
+ testInstance.updateJobResourceRequirements(
+ JobResourceRequirements.newBuilder()
+ .setParallelismForJobVertex(JOB_VERTEX.getID(), 1, PARALLELISM - 1)
+ .build());
+
+ for (int i = 0; i < eventRepetitions; i++) {
+ assertThatNoException()
+ .as(
+ "Triggering the event from outside the main thread should not have caused an error.")
+ .isThrownBy(() -> eventCallback.accept(statsListener));
+ }
+
+ assertThat(eventQueue.take())
+ .as("Only one event should have been observed.")
+ .isEqualTo(1);
+ } finally {
+ final CompletableFuture closeFuture = new CompletableFuture<>();
+ singleThreadMainThreadExecutor.execute(
+ () -> FutureUtils.forward(testInstance.closeAsync(), closeFuture));
+ assertThatFuture(closeFuture).eventuallySucceeds();
+ }
+ }
+
// ---------------------------------------------------------------------------------------------
// Utils
// ---------------------------------------------------------------------------------------------
+ private AdaptiveScheduler createSchedulerThatReachesExecutingState(
+ int parallelism,
+ int onFailedCheckpointCount,
+ BlockingQueue eventQueue,
+ CompletableFuture statsListenerInstantiatedFuture)
+ throws Exception {
+ final Configuration config = new Configuration();
+ config.set(
+ JobManagerOptions.SCHEDULER_SCALE_ON_FAILED_CHECKPOINTS_COUNT,
+ onFailedCheckpointCount);
+
+ final JobGraph jobGraph =
+ JobGraphBuilder.newStreamingJobGraphBuilder()
+ .addJobVertices(Collections.singletonList(JOB_VERTEX))
+ .setJobCheckpointingSettings(
+ new JobCheckpointingSettings(
+ new CheckpointCoordinatorConfiguration
+ .CheckpointCoordinatorConfigurationBuilder()
+ .build(),
+ null))
+ .build();
+ SchedulerTestingUtils.enableCheckpointing(jobGraph);
+
+ // testing SlotPool instance that would allow for the scheduler to transition to Executing
+ // state
+ final DeclarativeSlotPool slotPool =
+ new TestingDeclarativeSlotPoolBuilder()
+ .setContainsFreeSlotFunction(allocationID -> true)
+ .setReserveFreeSlotFunction(
+ (allocationId, resourceProfile) ->
+ TestingPhysicalSlot.builder()
+ .withAllocationID(allocationId)
+ .build())
+ .setGetFreeSlotInfoTrackerSupplier(
+ () ->
+ TestingFreeSlotInfoTracker.newBuilder()
+ .setGetFreeSlotsInformationSupplier(
+ () ->
+ IntStream.range(0, parallelism)
+ .mapToObj(
+ v ->
+ new TestSlotInfo())
+ .collect(
+ Collectors.toSet()))
+ .build())
+ .build();
+
+ final AtomicInteger eventCounter = new AtomicInteger();
+ return new AdaptiveSchedulerBuilder(
+ jobGraph, singleThreadMainThreadExecutor, EXECUTOR_RESOURCE.getExecutor())
+ .setJobMasterConfiguration(config)
+ .setDeclarativeSlotPool(slotPool)
+ .setRescaleManagerFactory(
+ new TestingRescaleManager.Factory(
+ () -> {},
+ () -> {
+ singleThreadMainThreadExecutor.assertRunningInMainThread();
+
+ eventQueue.offer(eventCounter.getAndIncrement());
+ }))
+ .setCheckpointStatsTrackerFactory(
+ (metricGroup, listener) -> {
+ assertThat(statsListenerInstantiatedFuture)
+ .as(
+ "The CheckpointStatsListener should be only instantiated once.")
+ .isNotCompleted();
+ statsListenerInstantiatedFuture.complete(listener);
+ return NoOpCheckpointStatsTracker.INSTANCE;
+ })
+ .build();
+ }
+
private CompletableFuture getArchivedExecutionGraphForRunningJob(
SchedulerNG scheduler) {
return CompletableFuture.supplyAsync(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManagerTest.java
index c24f5ed8c80b67..94e6a57ea093ae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManagerTest.java
@@ -20,6 +20,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.util.ConfigurationException;
import org.junit.jupiter.api.Test;
@@ -39,7 +40,7 @@
class DefaultRescaleManagerTest {
@Test
- void testProperConfiguration() {
+ void testProperConfiguration() throws ConfigurationException {
final Duration scalingIntervalMin = Duration.ofMillis(1337);
final Duration scalingIntervalMax = Duration.ofMillis(7331);
final Duration maximumDelayForRescaleTrigger = Duration.ofMillis(4242);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
index e3a9870f9328d3..48ee70bfd297f7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
@@ -97,9 +97,12 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
+import java.util.stream.IntStream;
import static org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest.assertNonNull;
import static org.assertj.core.api.Assertions.assertThat;
@@ -154,6 +157,7 @@ void testNoDeploymentCallOnEnterWhenVertexRunning() throws Exception {
new ArrayList<>(),
TestingRescaleManager.Factory.noOpFactory(),
1,
+ 1,
Instant.now());
assertThat(mockExecutionVertex.isDeployCalled()).isFalse();
}
@@ -181,12 +185,124 @@ void testIllegalStateExceptionOnNotRunningExecutionGraph() {
new ArrayList<>(),
TestingRescaleManager.Factory.noOpFactory(),
1,
+ 1,
Instant.now());
}
})
.isInstanceOf(IllegalStateException.class);
}
+ @Test
+ public void testTriggerRescaleOnCompletedCheckpoint() throws Exception {
+ final AtomicBoolean rescaleTriggered = new AtomicBoolean();
+ final RescaleManager.Factory rescaleManagerFactory =
+ new TestingRescaleManager.Factory(() -> {}, () -> rescaleTriggered.set(true));
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ final Executing testInstance =
+ new ExecutingStateBuilder()
+ .setRescaleManagerFactory(rescaleManagerFactory)
+ .build(ctx);
+
+ assertThat(rescaleTriggered).isFalse();
+ testInstance.onCompletedCheckpoint();
+ assertThat(rescaleTriggered).isTrue();
+ }
+ }
+
+ @Test
+ public void testTriggerRescaleOnFailedCheckpoint() throws Exception {
+ final AtomicInteger rescaleTriggerCount = new AtomicInteger();
+ final RescaleManager.Factory rescaleManagerFactory =
+ new TestingRescaleManager.Factory(() -> {}, rescaleTriggerCount::incrementAndGet);
+ final int rescaleOnFailedCheckpointsCount = 3;
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ final Executing testInstance =
+ new ExecutingStateBuilder()
+ .setRescaleManagerFactory(rescaleManagerFactory)
+ .setRescaleOnFailedCheckpointCount(rescaleOnFailedCheckpointsCount)
+ .build(ctx);
+
+ // do multiple rescale iterations to verify that subsequent failed checkpoints after a
+ // rescale result in the expected behavior
+ for (int rescaleIteration = 1; rescaleIteration <= 3; rescaleIteration++) {
+
+ // trigger an initial failed checkpoint event to show that the counting only starts
+ // with the subsequent change event
+ testInstance.onFailedCheckpoint();
+
+ // trigger change
+ testInstance.onNewResourceRequirements();
+
+ for (int i = 0; i < rescaleOnFailedCheckpointsCount; i++) {
+ assertThat(rescaleTriggerCount)
+ .as(
+ "No rescale operation should have been triggered for iteration #%d, yet.",
+ rescaleIteration)
+ .hasValue(rescaleIteration - 1);
+ testInstance.onFailedCheckpoint();
+ }
+
+ assertThat(rescaleTriggerCount)
+ .as(
+ "The rescale operation for iteration #%d should have been properly triggered.",
+ rescaleIteration)
+ .hasValue(rescaleIteration);
+ }
+ }
+ }
+
+ @Test
+ public void testOnCompletedCheckpointResetsFailedCheckpointCount() throws Exception {
+ final AtomicInteger rescaleTriggeredCount = new AtomicInteger();
+ final RescaleManager.Factory rescaleManagerFactory =
+ new TestingRescaleManager.Factory(() -> {}, rescaleTriggeredCount::incrementAndGet);
+ final int rescaleOnFailedCheckpointsCount = 3;
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ final Executing testInstance =
+ new ExecutingStateBuilder()
+ .setRescaleManagerFactory(rescaleManagerFactory)
+ .setRescaleOnFailedCheckpointCount(rescaleOnFailedCheckpointsCount)
+ .build(ctx);
+
+ // trigger an initial failed checkpoint event to show that the counting only starts with
+ // the subsequent change event
+ testInstance.onFailedCheckpoint();
+
+ // trigger change
+ testInstance.onNewResourcesAvailable();
+
+ IntStream.range(0, rescaleOnFailedCheckpointsCount - 1)
+ .forEach(ignored -> testInstance.onFailedCheckpoint());
+
+ assertThat(rescaleTriggeredCount)
+ .as("No rescaling should have been trigger, yet.")
+ .hasValue(0);
+
+ testInstance.onCompletedCheckpoint();
+
+ // trigger change
+ testInstance.onNewResourceRequirements();
+
+ assertThat(rescaleTriggeredCount)
+ .as("The completed checkpoint should have triggered a rescale.")
+ .hasValue(1);
+
+ IntStream.range(0, rescaleOnFailedCheckpointsCount - 1)
+ .forEach(ignored -> testInstance.onFailedCheckpoint());
+
+ assertThat(rescaleTriggeredCount)
+ .as(
+ "No additional rescaling should have been trigger by any subsequent failed checkpoint, yet.")
+ .hasValue(1);
+
+ testInstance.onFailedCheckpoint();
+
+ assertThat(rescaleTriggeredCount)
+ .as("The previous failed checkpoint should have triggered the rescale.")
+ .hasValue(2);
+ }
+ }
+
@Test
void testDisposalOfOperatorCoordinatorsOnLeaveOfStateWithExecutionGraph() throws Exception {
try (MockExecutingContext ctx = new MockExecutingContext()) {
@@ -490,8 +606,9 @@ private final class ExecutingStateBuilder {
TestingDefaultExecutionGraphBuilder.newBuilder()
.build(EXECUTOR_EXTENSION.getExecutor());
private OperatorCoordinatorHandler operatorCoordinatorHandler;
- private TestingRescaleManager.Factory rescaleManagerFactory =
+ private RescaleManager.Factory rescaleManagerFactory =
TestingRescaleManager.Factory.noOpFactory();
+ private int rescaleOnFailedCheckpointCount = 1;
private ExecutingStateBuilder() throws JobException, JobExecutionException {
operatorCoordinatorHandler = new TestingOperatorCoordinatorHandler();
@@ -509,11 +626,17 @@ public ExecutingStateBuilder setOperatorCoordinatorHandler(
}
public ExecutingStateBuilder setRescaleManagerFactory(
- TestingRescaleManager.Factory rescaleManagerFactory) {
+ RescaleManager.Factory rescaleManagerFactory) {
this.rescaleManagerFactory = rescaleManagerFactory;
return this;
}
+ public ExecutingStateBuilder setRescaleOnFailedCheckpointCount(
+ int rescaleOnFailedCheckpointCount) {
+ this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount;
+ return this;
+ }
+
private Executing build(MockExecutingContext ctx) {
executionGraph.transitionToRunning();
@@ -528,6 +651,7 @@ private Executing build(MockExecutingContext ctx) {
new ArrayList<>(),
rescaleManagerFactory,
1,
+ rescaleOnFailedCheckpointCount,
// will be ignored by the TestingRescaleManager.Factory
Instant.now());
} finally {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/RescaleOnCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/RescaleOnCheckpointITCase.java
new file mode 100644
index 00000000000000..60c2b19cca1056
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/RescaleOnCheckpointITCase.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.scheduling;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.InjectMiniCluster;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.Iterator;
+
+import static org.apache.flink.test.scheduling.UpdateJobResourceRequirementsITCase.waitForAvailableSlots;
+import static org.apache.flink.test.scheduling.UpdateJobResourceRequirementsITCase.waitForRunningTasks;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@ExtendWith(TestLoggerExtension.class)
+class RescaleOnCheckpointITCase {
+
+ // Scaling down is used here because scaling up is not supported by the NumberSequenceSource
+ // that's used in this test.
+ private static final int NUMBER_OF_SLOTS = 4;
+ private static final int BEFORE_RESCALE_PARALLELISM = NUMBER_OF_SLOTS;
+ private static final int AFTER_RESCALE_PARALLELISM = NUMBER_OF_SLOTS - 1;
+
+ // This timeout is used to wait for any possible rescale after the JobRequirement
+ // update (which shouldn't happen). A longer gap makes the test more reliable (it's hard to test
+ // that something didn't happen) but also increases the runtime of the test.
+ private static final Duration REQUIREMENT_UPDATE_TO_CHECKPOINT_GAP = Duration.ofSeconds(2);
+
+ @RegisterExtension
+ private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
+ new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(createConfiguration())
+ .setNumberSlotsPerTaskManager(NUMBER_OF_SLOTS)
+ .build());
+
+ private static Configuration createConfiguration() {
+ final Configuration configuration = new Configuration();
+ configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);
+
+ // speed the test suite up
+ // - lower refresh interval -> controls how fast we invalidate ExecutionGraphCache
+ // - lower slot idle timeout -> controls how fast we return idle slots to TM
+ configuration.set(WebOptions.REFRESH_INTERVAL, Duration.ofMillis(50L));
+ configuration.set(JobManagerOptions.SLOT_IDLE_TIMEOUT, Duration.ofMillis(50L));
+
+ // no checkpoints shall be triggered by Flink itself
+ configuration.set(
+ CheckpointingOptions.CHECKPOINTING_INTERVAL, TestingUtils.infiniteDuration());
+
+ // rescale shouldn't be triggered due to the timeout
+ configuration.set(
+ JobManagerOptions.MAXIMUM_DELAY_FOR_SCALE_TRIGGER, TestingUtils.infiniteDuration());
+
+ // no cooldown to avoid delaying the test even more
+ configuration.set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN, Duration.ZERO);
+
+ return configuration;
+ }
+
+ @Test
+ void testRescaleOnCheckpoint(
+ @InjectMiniCluster MiniCluster miniCluster,
+ @InjectClusterClient RestClusterClient> restClusterClient)
+ throws Exception {
+ final Configuration config = new Configuration();
+
+ final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.setParallelism(BEFORE_RESCALE_PARALLELISM);
+ env.fromSequence(0, Integer.MAX_VALUE).sinkTo(new DiscardingSink<>());
+
+ final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+ final Iterator jobVertexIterator = jobGraph.getVertices().iterator();
+ assertThat(jobVertexIterator.hasNext())
+ .as("There needs to be at least one JobVertex.")
+ .isTrue();
+ final JobResourceRequirements jobResourceRequirements =
+ JobResourceRequirements.newBuilder()
+ .setParallelismForJobVertex(
+ jobVertexIterator.next().getID(), 1, AFTER_RESCALE_PARALLELISM)
+ .build();
+ assertThat(jobVertexIterator.hasNext())
+ .as("This test expects to have only one JobVertex.")
+ .isFalse();
+
+ restClusterClient.submitJob(jobGraph).join();
+ try {
+ final JobID jobId = jobGraph.getJobID();
+
+ waitForRunningTasks(restClusterClient, jobId, BEFORE_RESCALE_PARALLELISM);
+
+ restClusterClient.updateJobResourceRequirements(jobId, jobResourceRequirements).join();
+
+ // timeout to allow any unexpected rescaling to happen anyway
+ Thread.sleep(REQUIREMENT_UPDATE_TO_CHECKPOINT_GAP.toMillis());
+
+ // verify that the previous timeout didn't result in a change of parallelism
+ waitForRunningTasks(restClusterClient, jobId, BEFORE_RESCALE_PARALLELISM);
+
+ miniCluster.triggerCheckpoint(jobId);
+
+ waitForRunningTasks(restClusterClient, jobId, AFTER_RESCALE_PARALLELISM);
+
+ waitForAvailableSlots(restClusterClient, NUMBER_OF_SLOTS - AFTER_RESCALE_PARALLELISM);
+ } finally {
+ restClusterClient.cancel(jobGraph.getJobID()).join();
+ }
+ }
+}