diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index c4913a86c97e70..1e9103ee560e99 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -225,7 +225,7 @@ public SchedulerBase( final CheckpointStatsTracker checkpointStatsTracker = SchedulerUtils.createCheckpointStatsTrackerIfCheckpointingIsEnabled( - jobGraph.getCheckpointingSettings(), + jobGraph, () -> new DefaultCheckpointStatsTracker( jobMasterConfiguration.get( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java index 564e9b0e660a6c..f3f812d3943c89 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java @@ -33,13 +33,10 @@ import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.state.SharedStateRegistry; import org.slf4j.Logger; -import javax.annotation.Nullable; - import java.util.concurrent.Executor; import java.util.function.Supplier; @@ -114,12 +111,9 @@ public static CheckpointIDCounter createCheckpointIDCounterIfCheckpointingIsEnab } public static CheckpointStatsTracker createCheckpointStatsTrackerIfCheckpointingIsEnabled( - @Nullable JobCheckpointingSettings checkpointingSettings, + JobGraph checkpointingSettings, Supplier checkpointStatsTrackerFactory) { - if (checkpointingSettings != null - && checkpointingSettings - .getCheckpointCoordinatorConfiguration() - .isCheckpointingEnabled()) { + if (DefaultExecutionGraphBuilder.isCheckpointingEnabled(checkpointingSettings)) { return checkpointStatsTrackerFactory.get(); } else { return NoOpCheckpointStatsTracker.INSTANCE; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index a98e90e92dc4e7..fb6344b31dfcf2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -512,7 +512,7 @@ public AdaptiveScheduler( jobGraph, checkpointRecoveryFactory); this.checkpointStatsTracker = SchedulerUtils.createCheckpointStatsTrackerIfCheckpointingIsEnabled( - jobGraph.getCheckpointingSettings(), + jobGraph, () -> checkpointStatsTrackerFactory.apply( jobManagerJobMetricGroup, createCheckpointStatsListener())); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index 3c8a81f056315b..d7fba370f8c807 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -94,6 +94,7 @@ import org.apache.flink.runtime.scheduler.DefaultSchedulerTest; import org.apache.flink.runtime.scheduler.SchedulerBase; import org.apache.flink.runtime.scheduler.SchedulerNG; +import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; import org.apache.flink.runtime.scheduler.TestingPhysicalSlot; import org.apache.flink.runtime.scheduler.VertexParallelismInformation; import org.apache.flink.runtime.scheduler.VertexParallelismStore; @@ -2228,6 +2229,7 @@ private AdaptiveScheduler createSchedulerThatReachesExecutingState( .build(), null)) .build(); + SchedulerTestingUtils.enableCheckpointing(jobGraph); // testing SlotPool instance that would allow for the scheduler to transition to Executing // state