Skip to content

Commit

Permalink
[FLINK-35552][runtime] Rework how CheckpointStatsTracker is constructed.
Browse files Browse the repository at this point in the history
The checkpoint tracker doesn't live in the DefaultExecutionGraphFactory anymore but is moved into the AdaptiveScheduler.
  • Loading branch information
XComp committed Jun 7, 2024
1 parent c3d425b commit fba2ac2
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;

import static org.apache.flink.configuration.StateChangelogOptions.STATE_CHANGE_LOG_STORAGE;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -81,6 +80,7 @@ public static DefaultExecutionGraph buildGraph(
CompletedCheckpointStore completedCheckpointStore,
CheckpointsCleaner checkpointsCleaner,
CheckpointIDCounter checkpointIdCounter,
CheckpointStatsTracker checkpointStatsTracker,
Time rpcTimeout,
BlobWriter blobWriter,
Logger log,
Expand All @@ -92,7 +92,6 @@ public static DefaultExecutionGraph buildGraph(
long initializationTimestamp,
VertexAttemptNumberStore vertexAttemptNumberStore,
VertexParallelismStore vertexParallelismStore,
Supplier<CheckpointStatsTracker> checkpointStatsTrackerFactory,
boolean isDynamicGraph,
ExecutionJobVertex.Factory executionJobVertexFactory,
MarkPartitionFinishedStrategy markPartitionFinishedStrategy,
Expand Down Expand Up @@ -342,7 +341,7 @@ public static DefaultExecutionGraph buildGraph(
completedCheckpointStore,
rootBackend,
rootStorage,
checkpointStatsTrackerFactory.get(),
checkpointStatsTracker,
checkpointsCleaner,
jobManagerConfig.get(STATE_CHANGE_LOG_STORAGE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
Expand All @@ -42,14 +41,12 @@
import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTrackerDeploymentListenerAdapter;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.util.function.CachingSupplier;

import org.slf4j.Logger;

import java.util.HashSet;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -66,7 +63,6 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory {
private final BlobWriter blobWriter;
private final ShuffleMaster<?> shuffleMaster;
private final JobMasterPartitionTracker jobMasterPartitionTracker;
private final Supplier<CheckpointStatsTracker> checkpointStatsTrackerFactory;
private final boolean isDynamicGraph;
private final ExecutionJobVertex.Factory executionJobVertexFactory;

Expand Down Expand Up @@ -123,12 +119,6 @@ public DefaultExecutionGraphFactory(
this.blobWriter = blobWriter;
this.shuffleMaster = shuffleMaster;
this.jobMasterPartitionTracker = jobMasterPartitionTracker;
this.checkpointStatsTrackerFactory =
new CachingSupplier<>(
() ->
new CheckpointStatsTracker(
configuration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE),
jobManagerJobMetricGroup));
this.isDynamicGraph = isDynamicGraph;
this.executionJobVertexFactory = checkNotNull(executionJobVertexFactory);
this.nonFinishedHybridPartitionShouldBeUnknown = nonFinishedHybridPartitionShouldBeUnknown;
Expand All @@ -140,6 +130,7 @@ public ExecutionGraph createAndRestoreExecutionGraph(
CompletedCheckpointStore completedCheckpointStore,
CheckpointsCleaner checkpointsCleaner,
CheckpointIDCounter checkpointIdCounter,
CheckpointStatsTracker checkpointStatsTracker,
TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint,
long initializationTimestamp,
VertexAttemptNumberStore vertexAttemptNumberStore,
Expand Down Expand Up @@ -168,6 +159,7 @@ public ExecutionGraph createAndRestoreExecutionGraph(
completedCheckpointStore,
checkpointsCleaner,
checkpointIdCounter,
checkpointStatsTracker,
rpcTimeout,
blobWriter,
log,
Expand All @@ -179,7 +171,6 @@ public ExecutionGraph createAndRestoreExecutionGraph(
initializationTimestamp,
vertexAttemptNumberStore,
vertexParallelismStore,
checkpointStatsTrackerFactory,
isDynamicGraph,
executionJobVertexFactory,
markPartitionFinishedStrategy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.scheduler;

import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
Expand All @@ -40,6 +41,7 @@ public interface ExecutionGraphFactory {
* @param completedCheckpointStore completedCheckpointStore to pass to the CheckpointCoordinator
* @param checkpointsCleaner checkpointsCleaner to pass to the CheckpointCoordinator
* @param checkpointIdCounter checkpointIdCounter to pass to the CheckpointCoordinator
* @param checkpointStatsTracker checkpointStatsTracker used for tracking checkpoints
* @param partitionLocationConstraint partitionLocationConstraint for this job
* @param initializationTimestamp initializationTimestamp when the ExecutionGraph was created
* @param vertexAttemptNumberStore vertexAttemptNumberStore keeping information about the vertex
Expand All @@ -57,6 +59,7 @@ ExecutionGraph createAndRestoreExecutionGraph(
CompletedCheckpointStore completedCheckpointStore,
CheckpointsCleaner checkpointsCleaner,
CheckpointIDCounter checkpointIdCounter,
CheckpointStatsTracker checkpointStatsTracker,
TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint,
long initializationTimestamp,
VertexAttemptNumberStore vertexAttemptNumberStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
Expand Down Expand Up @@ -226,6 +227,9 @@ public SchedulerBase(
completedCheckpointStore,
checkpointsCleaner,
checkpointIdCounter,
new CheckpointStatsTracker(
jobMasterConfiguration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE),
jobManagerJobMetricGroup),
initializationTimestamp,
mainThreadExecutor,
jobStatusListener,
Expand Down Expand Up @@ -372,6 +376,7 @@ private ExecutionGraph createAndRestoreExecutionGraph(
CompletedCheckpointStore completedCheckpointStore,
CheckpointsCleaner checkpointsCleaner,
CheckpointIDCounter checkpointIdCounter,
CheckpointStatsTracker checkpointStatsTracker,
long initializationTimestamp,
ComponentMainThreadExecutor mainThreadExecutor,
JobStatusListener jobStatusListener,
Expand All @@ -384,6 +389,7 @@ private ExecutionGraph createAndRestoreExecutionGraph(
completedCheckpointStore,
checkpointsCleaner,
checkpointIdCounter,
checkpointStatsTracker,
TaskDeploymentDescriptorFactory.PartitionLocationConstraint.fromJobType(
jobGraph.getJobType()),
initializationTimestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
Expand Down Expand Up @@ -327,6 +328,7 @@ public Duration getMaximumDelayForTriggeringRescale() {
private final CheckpointsCleaner checkpointsCleaner;
private final CompletedCheckpointStore completedCheckpointStore;
private final CheckpointIDCounter checkpointIdCounter;
private final CheckpointStatsTracker checkpointStatsTracker;

private final CompletableFuture<JobStatus> jobTerminationFuture = new CompletableFuture<>();

Expand Down Expand Up @@ -416,6 +418,10 @@ public AdaptiveScheduler(
this.checkpointIdCounter =
SchedulerUtils.createCheckpointIDCounterIfCheckpointingIsEnabled(
jobGraph, checkpointRecoveryFactory);
this.checkpointStatsTracker =
new CheckpointStatsTracker(
configuration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE),
jobManagerJobMetricGroup);

this.slotAllocator = slotAllocator;

Expand Down Expand Up @@ -1289,6 +1295,7 @@ private ExecutionGraph createExecutionGraphAndRestoreState(
completedCheckpointStore,
checkpointsCleaner,
checkpointIdCounter,
checkpointStatsTracker,
TaskDeploymentDescriptorFactory.PartitionLocationConstraint.MUST_BE_KNOWN,
initializationTimestamp,
vertexAttemptNumberStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ private DefaultExecutionGraph build(
completedCheckpointStore,
new CheckpointsCleaner(),
checkpointIdCounter,
new CheckpointStatsTracker(
0, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup()),
rpcTimeout,
blobWriter,
LOG,
Expand All @@ -192,11 +194,6 @@ private DefaultExecutionGraph build(
new DefaultVertexAttemptNumberStore(),
Optional.ofNullable(vertexParallelismStore)
.orElseGet(() -> SchedulerBase.computeVertexParallelismStore(jobGraph)),
() ->
new CheckpointStatsTracker(
0,
UnregisteredMetricGroups
.createUnregisteredJobManagerJobMetricGroup()),
isDynamicGraph,
executionJobVertexFactory,
markPartitionFinishedStrategy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.executiongraph.DefaultVertexAttemptNumberStore;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
Expand All @@ -48,6 +50,7 @@
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.traces.Span;
import org.apache.flink.traces.SpanBuilder;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.clock.SystemClock;

import org.junit.jupiter.api.BeforeEach;
Expand All @@ -64,7 +67,9 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand All @@ -91,7 +96,10 @@ private void setup() {
void testRestoringModifiedJobFromSavepointFails() throws Exception {
final JobGraph jobGraphWithNewOperator = createJobGraphWithSavepoint(false, 42L, 1);

final ExecutionGraphFactory executionGraphFactory = createExecutionGraphFactory();
final JobManagerJobMetricGroup metricGroup =
UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();
final ExecutionGraphFactory executionGraphFactory =
createExecutionGraphFactory(metricGroup);

assertThatThrownBy(
() ->
Expand All @@ -100,6 +108,7 @@ void testRestoringModifiedJobFromSavepointFails() throws Exception {
new StandaloneCompletedCheckpointStore(1),
new CheckpointsCleaner(),
new StandaloneCheckpointIDCounter(),
new CheckpointStatsTracker(10, metricGroup),
TaskDeploymentDescriptorFactory.PartitionLocationConstraint
.CAN_BE_UNKNOWN,
0L,
Expand All @@ -121,7 +130,10 @@ void testRestoringModifiedJobFromSavepointWithAllowNonRestoredStateSucceeds() th
final long savepointId = 42L;
final JobGraph jobGraphWithNewOperator = createJobGraphWithSavepoint(true, savepointId, 1);

final ExecutionGraphFactory executionGraphFactory = createExecutionGraphFactory();
final JobManagerJobMetricGroup metricGroup =
UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();
final ExecutionGraphFactory executionGraphFactory =
createExecutionGraphFactory(metricGroup);

final StandaloneCompletedCheckpointStore completedCheckpointStore =
new StandaloneCompletedCheckpointStore(1);
Expand All @@ -130,6 +142,7 @@ void testRestoringModifiedJobFromSavepointWithAllowNonRestoredStateSucceeds() th
completedCheckpointStore,
new CheckpointsCleaner(),
new StandaloneCheckpointIDCounter(),
new CheckpointStatsTracker(10, metricGroup),
TaskDeploymentDescriptorFactory.PartitionLocationConstraint.CAN_BE_UNKNOWN,
0L,
new DefaultVertexAttemptNumberStore(),
Expand All @@ -150,23 +163,27 @@ void testCheckpointStatsTrackerUpdatedWithNewParallelism() throws Exception {
final JobGraph jobGraphWithParallelism2 = createJobGraphWithSavepoint(true, savepointId, 2);

List<Span> spans = new ArrayList<>();
final JobManagerJobMetricGroup jobManagerJobMetricGroup =
new UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup() {
@Override
public void addSpan(SpanBuilder spanBuilder) {
spans.add(spanBuilder.build());
}
};
final ExecutionGraphFactory executionGraphFactory =
createExecutionGraphFactory(
new UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup() {
@Override
public void addSpan(SpanBuilder spanBuilder) {
spans.add(spanBuilder.build());
}
});
createExecutionGraphFactory(jobManagerJobMetricGroup);

final StandaloneCompletedCheckpointStore completedCheckpointStore =
new StandaloneCompletedCheckpointStore(1);
final CheckpointStatsTracker checkpointStatsTracker =
new CheckpointStatsTracker(10, jobManagerJobMetricGroup);
ExecutionGraph executionGraph =
executionGraphFactory.createAndRestoreExecutionGraph(
jobGraphWithParallelism2,
completedCheckpointStore,
new CheckpointsCleaner(),
new StandaloneCheckpointIDCounter(),
checkpointStatsTracker,
TaskDeploymentDescriptorFactory.PartitionLocationConstraint.CAN_BE_UNKNOWN,
0L,
new DefaultVertexAttemptNumberStore(),
Expand All @@ -177,34 +194,27 @@ public void addSpan(SpanBuilder spanBuilder) {
rp -> false,
log);

CheckpointStatsTracker checkpointStatsTracker = executionGraph.getCheckpointStatsTracker();
assertThat(checkpointStatsTracker).isNotNull();

final ExecutionAttemptID randomAttemptId = ExecutionAttemptID.randomId();
checkpointStatsTracker.reportInitializationStarted(
Sets.newHashSet(randomAttemptId), SystemClock.getInstance().absoluteTimeMillis());

checkpointStatsTracker.reportRestoredCheckpoint(
savepointId,
CheckpointProperties.forSavepoint(false, SavepointFormatType.NATIVE),
"foo",
1337);

final Set<ExecutionAttemptID> executionAttemptIDs =
IterableUtils.toStream(executionGraph.getAllExecutionVertices())
.map(ExecutionVertex::getCurrentExecutionAttempt)
.map(Execution::getAttemptId)
.collect(Collectors.toSet());
assertThat(executionAttemptIDs).hasSize(1);
checkpointStatsTracker.reportInitializationMetrics(
randomAttemptId,
executionAttemptIDs.iterator().next(),
new SubTaskInitializationMetricsBuilder(
SystemClock.getInstance().absoluteTimeMillis())
.build());

assertThat(spans).hasSize(1);
}

@Nonnull
private ExecutionGraphFactory createExecutionGraphFactory() {
return createExecutionGraphFactory(
UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
}

@Nonnull
private ExecutionGraphFactory createExecutionGraphFactory(
JobManagerJobMetricGroup metricGroup) {
return new DefaultExecutionGraphFactory(
Expand Down

0 comments on commit fba2ac2

Please sign in to comment.