diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java index c53e227ec1ee0d..1d971fb93225f3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java @@ -559,12 +559,6 @@ public CheckpointCoordinator getCheckpointCoordinator() { return checkpointCoordinator; } - @Nullable - @Override - public CheckpointStatsTracker getCheckpointStatsTracker() { - return checkpointStatsTracker; - } - @Override public KvStateLocationRegistry getKvStateLocationRegistry() { return kvStateLocationRegistry; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 2194b6d59ebd0e..2f39f939fffbaa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -96,9 +96,6 @@ void enableCheckpointing( @Nullable CheckpointCoordinator getCheckpointCoordinator(); - @Nullable - CheckpointStatsTracker getCheckpointStatsTracker(); - KvStateLocationRegistry getKvStateLocationRegistry(); void setJsonPlan(String jsonPlan); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphHandler.java index 15e90578fab1d0..4d03468272be5b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphHandler.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; import org.apache.flink.runtime.jobmaster.SerializedInputSplit; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; @@ -81,23 +82,20 @@ public void reportCheckpointMetrics( public void reportInitializationMetrics( ExecutionAttemptID executionAttemptId, SubTaskInitializationMetrics initializationMetrics) { - if (executionGraph.getCheckpointStatsTracker() == null) { + final CheckpointCoordinatorConfiguration checkpointConfig = + executionGraph.getCheckpointCoordinatorConfiguration(); + if (checkpointConfig == null || !checkpointConfig.isCheckpointingEnabled()) { // TODO: Consider to support reporting initialization stats without checkpointing log.debug( "Ignoring reportInitializationMetrics if checkpoint coordinator is not present"); return; } - ioExecutor.execute( - () -> { - try { - executionGraph - .getCheckpointStatsTracker() - .reportInitializationMetrics( - executionAttemptId, initializationMetrics); - } catch (Exception t) { - log.warn("Error while reportInitializationMetrics", t); - } - }); + + processCheckpointCoordinatorMessage( + "ReportInitializationMetrics", + coordinator -> + coordinator.reportInitializationMetrics( + executionAttemptId, initializationMetrics)); } public void acknowledgeCheckpoint( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java index f035fb9d004a44..9a690e9c64a350 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java @@ -294,12 +294,6 @@ public CheckpointCoordinator getCheckpointCoordinator() { throw new UnsupportedOperationException(); } - @Nullable - @Override - public CheckpointStatsTracker getCheckpointStatsTracker() { - throw new UnsupportedOperationException(); - } - @Override public KvStateLocationRegistry getKvStateLocationRegistry() { throw new UnsupportedOperationException();