diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java index 57be7f49abffc7..a4608f24b2c628 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java @@ -61,7 +61,6 @@ import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; -import java.util.function.Supplier; import static org.apache.flink.configuration.StateChangelogOptions.STATE_CHANGE_LOG_STORAGE; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -81,6 +80,7 @@ public static DefaultExecutionGraph buildGraph( CompletedCheckpointStore completedCheckpointStore, CheckpointsCleaner checkpointsCleaner, CheckpointIDCounter checkpointIdCounter, + CheckpointStatsTracker checkpointStatsTracker, Time rpcTimeout, BlobWriter blobWriter, Logger log, @@ -92,7 +92,6 @@ public static DefaultExecutionGraph buildGraph( long initializationTimestamp, VertexAttemptNumberStore vertexAttemptNumberStore, VertexParallelismStore vertexParallelismStore, - Supplier checkpointStatsTrackerFactory, boolean isDynamicGraph, ExecutionJobVertex.Factory executionJobVertexFactory, MarkPartitionFinishedStrategy markPartitionFinishedStrategy, @@ -342,7 +341,7 @@ public static DefaultExecutionGraph buildGraph( completedCheckpointStore, rootBackend, rootStorage, - checkpointStatsTrackerFactory.get(), + checkpointStatsTracker, checkpointsCleaner, jobManagerConfig.get(STATE_CHANGE_LOG_STORAGE)); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java index 59a88870384002..bbf116cb0ae620 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java @@ -20,7 +20,6 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; @@ -42,14 +41,12 @@ import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTrackerDeploymentListenerAdapter; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.shuffle.ShuffleMaster; -import org.apache.flink.util.function.CachingSupplier; import org.slf4j.Logger; import java.util.HashSet; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; -import java.util.function.Supplier; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -66,7 +63,6 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory { private final BlobWriter blobWriter; private final ShuffleMaster shuffleMaster; private final JobMasterPartitionTracker jobMasterPartitionTracker; - private final Supplier checkpointStatsTrackerFactory; private final boolean isDynamicGraph; private final ExecutionJobVertex.Factory executionJobVertexFactory; @@ -123,12 +119,6 @@ public DefaultExecutionGraphFactory( this.blobWriter = blobWriter; this.shuffleMaster = shuffleMaster; this.jobMasterPartitionTracker = jobMasterPartitionTracker; - this.checkpointStatsTrackerFactory = - new CachingSupplier<>( - () -> - new CheckpointStatsTracker( - configuration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE), - jobManagerJobMetricGroup)); this.isDynamicGraph = isDynamicGraph; this.executionJobVertexFactory = checkNotNull(executionJobVertexFactory); this.nonFinishedHybridPartitionShouldBeUnknown = nonFinishedHybridPartitionShouldBeUnknown; @@ -140,6 +130,7 @@ public ExecutionGraph createAndRestoreExecutionGraph( CompletedCheckpointStore completedCheckpointStore, CheckpointsCleaner checkpointsCleaner, CheckpointIDCounter checkpointIdCounter, + CheckpointStatsTracker checkpointStatsTracker, TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint, long initializationTimestamp, VertexAttemptNumberStore vertexAttemptNumberStore, @@ -168,6 +159,7 @@ public ExecutionGraph createAndRestoreExecutionGraph( completedCheckpointStore, checkpointsCleaner, checkpointIdCounter, + checkpointStatsTracker, rpcTimeout, blobWriter, log, @@ -179,7 +171,6 @@ public ExecutionGraph createAndRestoreExecutionGraph( initializationTimestamp, vertexAttemptNumberStore, vertexParallelismStore, - checkpointStatsTrackerFactory, isDynamicGraph, executionJobVertexFactory, markPartitionFinishedStrategy, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphFactory.java index 59fbdfe82b202b..e926470bc65330 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphFactory.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.scheduler; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory; @@ -40,6 +41,7 @@ public interface ExecutionGraphFactory { * @param completedCheckpointStore completedCheckpointStore to pass to the CheckpointCoordinator * @param checkpointsCleaner checkpointsCleaner to pass to the CheckpointCoordinator * @param checkpointIdCounter checkpointIdCounter to pass to the CheckpointCoordinator + * @param checkpointStatsTracker checkpointStatsTracker used for tracking checkpoints * @param partitionLocationConstraint partitionLocationConstraint for this job * @param initializationTimestamp initializationTimestamp when the ExecutionGraph was created * @param vertexAttemptNumberStore vertexAttemptNumberStore keeping information about the vertex @@ -57,6 +59,7 @@ ExecutionGraph createAndRestoreExecutionGraph( CompletedCheckpointStore completedCheckpointStore, CheckpointsCleaner checkpointsCleaner, CheckpointIDCounter checkpointIdCounter, + CheckpointStatsTracker checkpointStatsTracker, TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint, long initializationTimestamp, VertexAttemptNumberStore vertexAttemptNumberStore, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index cdd8e73798697d..6534e858ce78f7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -43,6 +43,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CheckpointScheduling; import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; +import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; @@ -226,6 +227,9 @@ public SchedulerBase( completedCheckpointStore, checkpointsCleaner, checkpointIdCounter, + new CheckpointStatsTracker( + jobMasterConfiguration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE), + jobManagerJobMetricGroup), initializationTimestamp, mainThreadExecutor, jobStatusListener, @@ -372,6 +376,7 @@ private ExecutionGraph createAndRestoreExecutionGraph( CompletedCheckpointStore completedCheckpointStore, CheckpointsCleaner checkpointsCleaner, CheckpointIDCounter checkpointIdCounter, + CheckpointStatsTracker checkpointStatsTracker, long initializationTimestamp, ComponentMainThreadExecutor mainThreadExecutor, JobStatusListener jobStatusListener, @@ -384,6 +389,7 @@ private ExecutionGraph createAndRestoreExecutionGraph( completedCheckpointStore, checkpointsCleaner, checkpointIdCounter, + checkpointStatsTracker, TaskDeploymentDescriptorFactory.PartitionLocationConstraint.fromJobType( jobGraph.getJobType()), initializationTimestamp, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 9e4a26a9cbbcec..c2ea053d888417 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -43,6 +43,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CheckpointScheduling; import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; +import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; @@ -327,6 +328,7 @@ public Duration getMaximumDelayForTriggeringRescale() { private final CheckpointsCleaner checkpointsCleaner; private final CompletedCheckpointStore completedCheckpointStore; private final CheckpointIDCounter checkpointIdCounter; + private final CheckpointStatsTracker checkpointStatsTracker; private final CompletableFuture jobTerminationFuture = new CompletableFuture<>(); @@ -416,6 +418,10 @@ public AdaptiveScheduler( this.checkpointIdCounter = SchedulerUtils.createCheckpointIDCounterIfCheckpointingIsEnabled( jobGraph, checkpointRecoveryFactory); + this.checkpointStatsTracker = + new CheckpointStatsTracker( + configuration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE), + jobManagerJobMetricGroup); this.slotAllocator = slotAllocator; @@ -1289,6 +1295,7 @@ private ExecutionGraph createExecutionGraphAndRestoreState( completedCheckpointStore, checkpointsCleaner, checkpointIdCounter, + checkpointStatsTracker, TaskDeploymentDescriptorFactory.PartitionLocationConstraint.MUST_BE_KNOWN, initializationTimestamp, vertexAttemptNumberStore, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java index da33c96b1bf05d..f0c9ed75ee0612 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java @@ -179,6 +179,8 @@ private DefaultExecutionGraph build( completedCheckpointStore, new CheckpointsCleaner(), checkpointIdCounter, + new CheckpointStatsTracker( + 0, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup()), rpcTimeout, blobWriter, LOG, @@ -192,11 +194,6 @@ private DefaultExecutionGraph build( new DefaultVertexAttemptNumberStore(), Optional.ofNullable(vertexParallelismStore) .orElseGet(() -> SchedulerBase.computeVertexParallelismStore(jobGraph)), - () -> - new CheckpointStatsTracker( - 0, - UnregisteredMetricGroups - .createUnregisteredJobManagerJobMetricGroup()), isDynamicGraph, executionJobVertexFactory, markPartitionFinishedStrategy, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java index 4e66f7f669ea25..62900fe2f73e9e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java @@ -31,8 +31,10 @@ import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory; import org.apache.flink.runtime.executiongraph.DefaultVertexAttemptNumberStore; +import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -48,6 +50,7 @@ import org.apache.flink.testutils.executor.TestExecutorExtension; import org.apache.flink.traces.Span; import org.apache.flink.traces.SpanBuilder; +import org.apache.flink.util.IterableUtils; import org.apache.flink.util.clock.SystemClock; import org.junit.jupiter.api.BeforeEach; @@ -64,7 +67,9 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -91,7 +96,10 @@ private void setup() { void testRestoringModifiedJobFromSavepointFails() throws Exception { final JobGraph jobGraphWithNewOperator = createJobGraphWithSavepoint(false, 42L, 1); - final ExecutionGraphFactory executionGraphFactory = createExecutionGraphFactory(); + final JobManagerJobMetricGroup metricGroup = + UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(); + final ExecutionGraphFactory executionGraphFactory = + createExecutionGraphFactory(metricGroup); assertThatThrownBy( () -> @@ -100,6 +108,7 @@ void testRestoringModifiedJobFromSavepointFails() throws Exception { new StandaloneCompletedCheckpointStore(1), new CheckpointsCleaner(), new StandaloneCheckpointIDCounter(), + new CheckpointStatsTracker(10, metricGroup), TaskDeploymentDescriptorFactory.PartitionLocationConstraint .CAN_BE_UNKNOWN, 0L, @@ -121,7 +130,10 @@ void testRestoringModifiedJobFromSavepointWithAllowNonRestoredStateSucceeds() th final long savepointId = 42L; final JobGraph jobGraphWithNewOperator = createJobGraphWithSavepoint(true, savepointId, 1); - final ExecutionGraphFactory executionGraphFactory = createExecutionGraphFactory(); + final JobManagerJobMetricGroup metricGroup = + UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(); + final ExecutionGraphFactory executionGraphFactory = + createExecutionGraphFactory(metricGroup); final StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1); @@ -130,6 +142,7 @@ void testRestoringModifiedJobFromSavepointWithAllowNonRestoredStateSucceeds() th completedCheckpointStore, new CheckpointsCleaner(), new StandaloneCheckpointIDCounter(), + new CheckpointStatsTracker(10, metricGroup), TaskDeploymentDescriptorFactory.PartitionLocationConstraint.CAN_BE_UNKNOWN, 0L, new DefaultVertexAttemptNumberStore(), @@ -150,23 +163,27 @@ void testCheckpointStatsTrackerUpdatedWithNewParallelism() throws Exception { final JobGraph jobGraphWithParallelism2 = createJobGraphWithSavepoint(true, savepointId, 2); List spans = new ArrayList<>(); + final JobManagerJobMetricGroup jobManagerJobMetricGroup = + new UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup() { + @Override + public void addSpan(SpanBuilder spanBuilder) { + spans.add(spanBuilder.build()); + } + }; final ExecutionGraphFactory executionGraphFactory = - createExecutionGraphFactory( - new UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup() { - @Override - public void addSpan(SpanBuilder spanBuilder) { - spans.add(spanBuilder.build()); - } - }); + createExecutionGraphFactory(jobManagerJobMetricGroup); final StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1); + final CheckpointStatsTracker checkpointStatsTracker = + new CheckpointStatsTracker(10, jobManagerJobMetricGroup); ExecutionGraph executionGraph = executionGraphFactory.createAndRestoreExecutionGraph( jobGraphWithParallelism2, completedCheckpointStore, new CheckpointsCleaner(), new StandaloneCheckpointIDCounter(), + checkpointStatsTracker, TaskDeploymentDescriptorFactory.PartitionLocationConstraint.CAN_BE_UNKNOWN, 0L, new DefaultVertexAttemptNumberStore(), @@ -177,20 +194,20 @@ public void addSpan(SpanBuilder spanBuilder) { rp -> false, log); - CheckpointStatsTracker checkpointStatsTracker = executionGraph.getCheckpointStatsTracker(); - assertThat(checkpointStatsTracker).isNotNull(); - - final ExecutionAttemptID randomAttemptId = ExecutionAttemptID.randomId(); - checkpointStatsTracker.reportInitializationStarted( - Sets.newHashSet(randomAttemptId), SystemClock.getInstance().absoluteTimeMillis()); - checkpointStatsTracker.reportRestoredCheckpoint( savepointId, CheckpointProperties.forSavepoint(false, SavepointFormatType.NATIVE), "foo", 1337); + + final Set executionAttemptIDs = + IterableUtils.toStream(executionGraph.getAllExecutionVertices()) + .map(ExecutionVertex::getCurrentExecutionAttempt) + .map(Execution::getAttemptId) + .collect(Collectors.toSet()); + assertThat(executionAttemptIDs).hasSize(1); checkpointStatsTracker.reportInitializationMetrics( - randomAttemptId, + executionAttemptIDs.iterator().next(), new SubTaskInitializationMetricsBuilder( SystemClock.getInstance().absoluteTimeMillis()) .build()); @@ -198,13 +215,6 @@ public void addSpan(SpanBuilder spanBuilder) { assertThat(spans).hasSize(1); } - @Nonnull - private ExecutionGraphFactory createExecutionGraphFactory() { - return createExecutionGraphFactory( - UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup()); - } - - @Nonnull private ExecutionGraphFactory createExecutionGraphFactory( JobManagerJobMetricGroup metricGroup) { return new DefaultExecutionGraphFactory(