Skip to content

Commit

Permalink
Aligns the check for checkpointing being enabled with how it's done f…
Browse files Browse the repository at this point in the history
…or the other Checkpoint-related components
  • Loading branch information
XComp committed Jun 26, 2024
1 parent 5d14fb9 commit 032ca46
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public SchedulerBase(

final CheckpointStatsTracker checkpointStatsTracker =
SchedulerUtils.createCheckpointStatsTrackerIfCheckpointingIsEnabled(
jobGraph.getCheckpointingSettings(),
jobGraph,
() ->
new DefaultCheckpointStatsTracker(
jobMasterConfiguration.get(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,10 @@
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.state.SharedStateRegistry;

import org.slf4j.Logger;

import javax.annotation.Nullable;

import java.util.concurrent.Executor;
import java.util.function.Supplier;

Expand Down Expand Up @@ -114,12 +111,9 @@ public static CheckpointIDCounter createCheckpointIDCounterIfCheckpointingIsEnab
}

public static CheckpointStatsTracker createCheckpointStatsTrackerIfCheckpointingIsEnabled(
@Nullable JobCheckpointingSettings checkpointingSettings,
JobGraph checkpointingSettings,
Supplier<CheckpointStatsTracker> checkpointStatsTrackerFactory) {
if (checkpointingSettings != null
&& checkpointingSettings
.getCheckpointCoordinatorConfiguration()
.isCheckpointingEnabled()) {
if (DefaultExecutionGraphBuilder.isCheckpointingEnabled(checkpointingSettings)) {
return checkpointStatsTrackerFactory.get();
} else {
return NoOpCheckpointStatsTracker.INSTANCE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ public AdaptiveScheduler(
jobGraph, checkpointRecoveryFactory);
this.checkpointStatsTracker =
SchedulerUtils.createCheckpointStatsTrackerIfCheckpointingIsEnabled(
jobGraph.getCheckpointingSettings(),
jobGraph,
() ->
checkpointStatsTrackerFactory.apply(
jobManagerJobMetricGroup, createCheckpointStatsListener()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import org.apache.flink.runtime.scheduler.DefaultSchedulerTest;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
Expand Down Expand Up @@ -2228,6 +2229,7 @@ private AdaptiveScheduler createSchedulerThatReachesExecutingState(
.build(),
null))
.build();
SchedulerTestingUtils.enableCheckpointing(jobGraph);

// testing SlotPool instance that would allow for the scheduler to transition to Executing
// state
Expand Down

0 comments on commit 032ca46

Please sign in to comment.