Skip to content

Commit

Permalink
Extracts CheckpointStatsTracker interface and introduces DefaultCheck…
Browse files Browse the repository at this point in the history
…pointStatsTracker
  • Loading branch information
XComp committed Jun 25, 2024
1 parent f52650d commit 3db714d
Show file tree
Hide file tree
Showing 19 changed files with 819 additions and 626 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.checkpoint;

import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;

import java.util.Collections;
import java.util.Map;
import java.util.Set;

public enum NoOpCheckpointStatsTracker implements CheckpointStatsTracker {
INSTANCE;

@Override
public void reportRestoredCheckpoint(
long checkpointID,
CheckpointProperties properties,
String externalPath,
long stateSize) {}

@Override
public void reportCompletedCheckpoint(CompletedCheckpointStats completed) {}

@Override
public PendingCheckpointStats getPendingCheckpointStats(long checkpointId) {
return createPendingCheckpoint(
checkpointId,
System.currentTimeMillis(),
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
Collections.emptyMap());
}

@Override
public void reportIncompleteStats(
long checkpointId, ExecutionAttemptID attemptId, CheckpointMetrics metrics) {}

@Override
public void reportInitializationStarted(
Set<ExecutionAttemptID> toInitialize, long initializationStartTs) {}

@Override
public void reportInitializationMetrics(
ExecutionAttemptID executionAttemptId,
SubTaskInitializationMetrics initializationMetrics) {}

@Override
public PendingCheckpointStats reportPendingCheckpoint(
long checkpointId,
long triggerTimestamp,
CheckpointProperties props,
Map<JobVertexID, Integer> vertexToDop) {
return createPendingCheckpoint(checkpointId, triggerTimestamp, props, vertexToDop);
}

private PendingCheckpointStats createPendingCheckpoint(
long checkpointId,
long triggerTimestamp,
CheckpointProperties props,
Map<JobVertexID, Integer> vertexToDop) {
return new PendingCheckpointStats(checkpointId, triggerTimestamp, props, vertexToDop);
}

@Override
public void reportFailedCheckpoint(FailedCheckpointStats failed) {}

@Override
public void reportFailedCheckpointsWithoutInProgress() {}

@Override
public CheckpointStatsSnapshot createSnapshot() {
return CheckpointStatsSnapshot.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.util.DynamicCodeLoadingException;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.function.CachingSupplier;

import org.slf4j.Logger;

Expand Down Expand Up @@ -92,7 +91,7 @@ public static DefaultExecutionGraph buildGraph(
long initializationTimestamp,
VertexAttemptNumberStore vertexAttemptNumberStore,
VertexParallelismStore vertexParallelismStore,
CachingSupplier<CheckpointStatsTracker> checkpointStatsTrackerCachingSupplier,
CheckpointStatsTracker checkpointStatsTracker,
boolean isDynamicGraph,
ExecutionJobVertex.Factory executionJobVertexFactory,
MarkPartitionFinishedStrategy markPartitionFinishedStrategy,
Expand Down Expand Up @@ -342,7 +341,7 @@ public static DefaultExecutionGraph buildGraph(
completedCheckpointStore,
rootBackend,
rootStorage,
checkpointStatsTrackerCachingSupplier.get(),
checkpointStatsTracker,
checkpointsCleaner,
jobManagerConfig.get(STATE_CHANGE_LOG_STORAGE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTrackerDeploymentListenerAdapter;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.util.function.CachingSupplier;

import org.slf4j.Logger;

Expand Down Expand Up @@ -131,7 +130,7 @@ public ExecutionGraph createAndRestoreExecutionGraph(
CompletedCheckpointStore completedCheckpointStore,
CheckpointsCleaner checkpointsCleaner,
CheckpointIDCounter checkpointIdCounter,
CachingSupplier<CheckpointStatsTracker> checkpointStatsTrackerCachingSupplier,
CheckpointStatsTracker checkpointStatsTracker,
TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint,
long initializationTimestamp,
VertexAttemptNumberStore vertexAttemptNumberStore,
Expand Down Expand Up @@ -171,7 +170,7 @@ public ExecutionGraph createAndRestoreExecutionGraph(
initializationTimestamp,
vertexAttemptNumberStore,
vertexParallelismStore,
checkpointStatsTrackerCachingSupplier,
checkpointStatsTracker,
isDynamicGraph,
executionJobVertexFactory,
markPartitionFinishedStrategy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.flink.runtime.executiongraph.MarkPartitionFinishedStrategy;
import org.apache.flink.runtime.executiongraph.VertexAttemptNumberStore;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.util.function.CachingSupplier;

import org.slf4j.Logger;

Expand All @@ -42,10 +41,8 @@ public interface ExecutionGraphFactory {
* @param completedCheckpointStore completedCheckpointStore to pass to the CheckpointCoordinator
* @param checkpointsCleaner checkpointsCleaner to pass to the CheckpointCoordinator
* @param checkpointIdCounter checkpointIdCounter to pass to the CheckpointCoordinator
* @param checkpointStatsTrackerCachingSupplier The {@link CachingSupplier} that is used provide
* the {@link CheckpointStatsTracker}. {@code CachingSupplier} is used here to allow for
* lazy instantiation. This is required to avoid the side effects that appear during the
* creation of a {@code CheckpointStatsTracker} if checkpointing is disabled.
* @param checkpointStatsTracker The {@link CheckpointStatsTracker} that's used for collecting
* the checkpoint-related statistics.
* @param partitionLocationConstraint partitionLocationConstraint for this job
* @param initializationTimestamp initializationTimestamp when the ExecutionGraph was created
* @param vertexAttemptNumberStore vertexAttemptNumberStore keeping information about the vertex
Expand All @@ -63,7 +60,7 @@ ExecutionGraph createAndRestoreExecutionGraph(
CompletedCheckpointStore completedCheckpointStore,
CheckpointsCleaner checkpointsCleaner,
CheckpointIDCounter checkpointIdCounter,
CachingSupplier<CheckpointStatsTracker> checkpointStatsTrackerCachingSupplier,
CheckpointStatsTracker checkpointStatsTracker,
TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint,
long initializationTimestamp,
VertexAttemptNumberStore vertexAttemptNumberStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.DefaultCheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
Expand Down Expand Up @@ -105,7 +106,6 @@
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.CachingSupplier;

import org.slf4j.Logger;

Expand Down Expand Up @@ -223,12 +223,20 @@ public SchedulerBase(
this.deploymentStateTimeMetrics =
new DeploymentStateTimeMetrics(jobGraph.getJobType(), jobStatusMetricsSettings);

final CheckpointStatsTracker checkpointStatsTracker =
SchedulerUtils.createCheckpointStatsTrackerIfCheckpointingIsEnabled(
jobGraph.getCheckpointingSettings(),
() ->
new DefaultCheckpointStatsTracker(
jobMasterConfiguration.get(
WebOptions.CHECKPOINTS_HISTORY_SIZE),
jobManagerJobMetricGroup));
this.executionGraph =
createAndRestoreExecutionGraph(
completedCheckpointStore,
checkpointsCleaner,
checkpointIdCounter,
jobMasterConfiguration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE),
checkpointStatsTracker,
initializationTimestamp,
mainThreadExecutor,
jobStatusListener,
Expand Down Expand Up @@ -375,7 +383,7 @@ private ExecutionGraph createAndRestoreExecutionGraph(
CompletedCheckpointStore completedCheckpointStore,
CheckpointsCleaner checkpointsCleaner,
CheckpointIDCounter checkpointIdCounter,
int checkpointsHistorySize,
CheckpointStatsTracker checkpointStatsTracker,
long initializationTimestamp,
ComponentMainThreadExecutor mainThreadExecutor,
JobStatusListener jobStatusListener,
Expand All @@ -388,10 +396,7 @@ private ExecutionGraph createAndRestoreExecutionGraph(
completedCheckpointStore,
checkpointsCleaner,
checkpointIdCounter,
new CachingSupplier<>(
() ->
new CheckpointStatsTracker(
checkpointsHistorySize, jobManagerJobMetricGroup)),
checkpointStatsTracker,
TaskDeploymentDescriptorFactory.PartitionLocationConstraint.fromJobType(
jobGraph.getJobType()),
initializationTimestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,24 @@
import org.apache.flink.core.execution.RestoreMode;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.DeactivatedCheckpointCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.DeactivatedCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils;
import org.apache.flink.runtime.checkpoint.NoOpCheckpointStatsTracker;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.state.SharedStateRegistry;

import org.slf4j.Logger;

import javax.annotation.Nullable;

import java.util.concurrent.Executor;
import java.util.function.Supplier;

/** Utils class for Flink's scheduler implementations. */
public final class SchedulerUtils {
Expand Down Expand Up @@ -107,6 +113,19 @@ public static CheckpointIDCounter createCheckpointIDCounterIfCheckpointingIsEnab
}
}

public static CheckpointStatsTracker createCheckpointStatsTrackerIfCheckpointingIsEnabled(
@Nullable JobCheckpointingSettings checkpointingSettings,
Supplier<CheckpointStatsTracker> checkpointStatsTrackerFactory) {
if (checkpointingSettings != null
&& checkpointingSettings
.getCheckpointCoordinatorConfiguration()
.isCheckpointingEnabled()) {
return checkpointStatsTrackerFactory.get();
} else {
return NoOpCheckpointStatsTracker.INSTANCE;
}
}

private static CheckpointIDCounter createCheckpointIdCounter(
CheckpointRecoveryFactory recoveryFactory, JobID jobId) throws Exception {
return recoveryFactory.createCheckpointIDCounter(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.DefaultCheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.client.JobExecutionException;
Expand Down Expand Up @@ -121,7 +122,6 @@
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.CachingSupplier;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.ThrowingConsumer;

Expand Down Expand Up @@ -330,7 +330,7 @@ public Duration getMaximumDelayForTriggeringRescale() {
private final CheckpointsCleaner checkpointsCleaner;
private final CompletedCheckpointStore completedCheckpointStore;
private final CheckpointIDCounter checkpointIdCounter;
private final CachingSupplier<CheckpointStatsTracker> checkpointStatsTrackerCachingSupplier;
private final CheckpointStatsTracker checkpointStatsTracker;

private final CompletableFuture<JobStatus> jobTerminationFuture = new CompletableFuture<>();

Expand Down Expand Up @@ -422,10 +422,11 @@ public AdaptiveScheduler(
this.checkpointIdCounter =
SchedulerUtils.createCheckpointIDCounterIfCheckpointingIsEnabled(
jobGraph, checkpointRecoveryFactory);
this.checkpointStatsTrackerCachingSupplier =
new CachingSupplier<>(
this.checkpointStatsTracker =
SchedulerUtils.createCheckpointStatsTrackerIfCheckpointingIsEnabled(
jobGraph.getCheckpointingSettings(),
() ->
new CheckpointStatsTracker(
new DefaultCheckpointStatsTracker(
configuration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE),
jobManagerJobMetricGroup));

Expand Down Expand Up @@ -1302,7 +1303,7 @@ private ExecutionGraph createExecutionGraphAndRestoreState(
completedCheckpointStore,
checkpointsCleaner,
checkpointIdCounter,
checkpointStatsTrackerCachingSupplier,
checkpointStatsTracker,
TaskDeploymentDescriptorFactory.PartitionLocationConstraint.MUST_BE_KNOWN,
initializationTimestamp,
vertexAttemptNumberStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ private void testStoringFailureHandling(Exception failure, int expectedCleanupCa
new FailingCompletedCheckpointStore(failure);

CheckpointStatsTracker statsTracker =
new CheckpointStatsTracker(
new DefaultCheckpointStatsTracker(
Integer.MAX_VALUE,
UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
final AtomicInteger cleanupCallCount = new AtomicInteger(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ private CheckpointCoordinator instantiateCheckpointCoordinator(
new ExecutionGraphCheckpointPlanCalculatorContext(graph),
graph.getVerticesTopologically(),
false),
new CheckpointStatsTracker(
new DefaultCheckpointStatsTracker(
1, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup()));
}

Expand Down
Loading

0 comments on commit 3db714d

Please sign in to comment.