From ae1dcc4416099a4e129da8bd6bd7b26d286dbd6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=9B=BD=E5=90=9B?= Date: Thu, 28 Sep 2023 16:34:03 +0800 Subject: [PATCH] register commit metrics to flink metric group --- .../paimon/metrics/commit/CommitMetrics.java | 2 +- .../paimon/table/sink/TableCommitImpl.java | 4 ++ .../apache/paimon/flink/sink/Committer.java | 4 +- .../paimon/flink/sink/CommitterMetrics.java | 48 +++++++++++++++++-- .../paimon/flink/sink/CommitterOperator.java | 2 +- .../paimon/flink/sink/FlinkWriteSink.java | 22 ++++++--- .../paimon/flink/sink/StoreCommitter.java | 6 +++ .../flink/sink/StoreMultiCommitter.java | 13 ++++- .../flink/sink/CommitterOperatorTest.java | 34 +++++++------ 9 files changed, 106 insertions(+), 29 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitMetrics.java b/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitMetrics.java index ed9d85796cab..de086a04bac0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitMetrics.java +++ b/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitMetrics.java @@ -28,7 +28,7 @@ /** Metrics to measure a commit. */ public class CommitMetrics { private static final int HISTOGRAM_WINDOW_SIZE = 10_000; - protected static final String GROUP_NAME = "commit"; + public static final String GROUP_NAME = "commit"; private final AbstractMetricGroup genericMetricGroup; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java index 40cbf5f5fbbf..6d08c1264b77 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java @@ -255,6 +255,10 @@ private void expire(long partitionExpireIdentifier) { } } + public FileStoreCommit getStoreCommit() { + return commit; + } + @Override public void close() throws Exception { for (CommitCallback commitCallback : commitCallbacks) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java index cb3ed0b20784..c80fc39e0999 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java @@ -19,7 +19,7 @@ package org.apache.paimon.flink.sink; -import org.apache.flink.metrics.groups.OperatorIOMetricGroup; +import org.apache.flink.metrics.groups.OperatorMetricGroup; import java.io.IOException; import java.io.Serializable; @@ -54,6 +54,6 @@ GlobalCommitT combine(long checkpointId, long watermark, List committab interface Factory extends Serializable { Committer create( - String commitUser, OperatorIOMetricGroup metricGroup); + String commitUser, OperatorMetricGroup metricGroup); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterMetrics.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterMetrics.java index 51830c8a8bc2..74f4ccff25bf 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterMetrics.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterMetrics.java @@ -19,13 +19,22 @@ package org.apache.paimon.flink.sink; import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.metrics.Metric; +import org.apache.paimon.metrics.commit.CommitMetrics; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.MeterView; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.OperatorIOMetricGroup; +import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.metrics.MetricNames; +import javax.annotation.Nullable; + +import java.util.Map; + /** Flink metrics for {@link Committer}. */ public class CommitterMetrics { @@ -33,18 +42,49 @@ public class CommitterMetrics { private final Counter numBytesOutCounter; private final Counter numRecordsOutCounter; + private final OperatorMetricGroup metricGroup; + + public CommitterMetrics(OperatorMetricGroup metricGroup) { + this(metricGroup, null); + } - public CommitterMetrics(OperatorIOMetricGroup metricGroup) { - MetricGroup sinkMetricGroup = metricGroup.addGroup(SINK_METRIC_GROUP); + public CommitterMetrics(OperatorMetricGroup metricGroup, @Nullable CommitMetrics commitMetrics) { + this.metricGroup = metricGroup; + OperatorIOMetricGroup operatorIOMetricGroup = metricGroup.getIOMetricGroup(); + MetricGroup sinkMetricGroup = operatorIOMetricGroup.addGroup(SINK_METRIC_GROUP); - numBytesOutCounter = metricGroup.getNumBytesOutCounter(); + numBytesOutCounter = operatorIOMetricGroup.getNumBytesOutCounter(); sinkMetricGroup.counter(MetricNames.IO_NUM_BYTES_OUT, numBytesOutCounter); sinkMetricGroup.meter(MetricNames.IO_NUM_BYTES_OUT_RATE, new MeterView(numBytesOutCounter)); - numRecordsOutCounter = metricGroup.getNumRecordsOutCounter(); + numRecordsOutCounter = operatorIOMetricGroup.getNumRecordsOutCounter(); sinkMetricGroup.counter(MetricNames.IO_NUM_RECORDS_OUT, numRecordsOutCounter); sinkMetricGroup.meter( MetricNames.IO_NUM_RECORDS_OUT_RATE, new MeterView(numRecordsOutCounter)); + + // Paimon Commit Metrics + registerCommitMetrics(commitMetrics); + } + + public void registerCommitMetrics(CommitMetrics commitMetrics) { + if (commitMetrics != null) { + MetricGroup commitMetricGroup = metricGroup.addGroup("commit"); + for (Map.Entry metric : commitMetrics.getMetricGroup().getMetrics().entrySet()) { + switch (metric.getValue().getMetricType()) { + case COUNTER: + commitMetricGroup.counter(metric.getKey(), (Counter) metric.getValue()); + break; + case GAUGE: + commitMetricGroup.gauge(metric.getKey(), (Gauge) metric.getValue()); + break; + case HISTOGRAM: + commitMetricGroup.histogram(metric.getKey(), (Histogram) metric.getValue()); + break; + default: + throw new UnsupportedOperationException("Custom metric types are not supported."); + } + } + } } public void increaseNumBytesOut(long numBytesOut) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java index a698d5e9ed6e..b27bb3bb9100 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java @@ -105,7 +105,7 @@ public void initializeState(StateInitializationContext context) throws Exception StateUtils.getSingleValueFromState( context, "commit_user_state", String.class, initialCommitUser); // parallelism of commit operator is always 1, so commitUser will never be null - committer = committerFactory.create(commitUser, getMetricGroup().getIOMetricGroup()); + committer = committerFactory.create(commitUser, getMetricGroup()); committableStateManager.initializeState(context, committer); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java index 20d491fa0bb7..7c57bf3c8a49 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java @@ -21,7 +21,11 @@ import org.apache.paimon.flink.VersionedSerializerWrapper; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.manifest.ManifestCommittableSerializer; +import org.apache.paimon.metrics.commit.CommitMetrics; +import org.apache.paimon.operation.FileStoreCommit; +import org.apache.paimon.operation.FileStoreCommitImpl; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.TableCommitImpl; import javax.annotation.Nullable; @@ -46,12 +50,18 @@ protected Committer.Factory createCommitterFac // commit new files list even if they're empty. // Otherwise we can't tell if the commit is successful after // a restart. - return (user, metricGroup) -> - new StoreCommitter( - table.newCommit(user) - .withOverwrite(overwritePartition) - .ignoreEmptyCommit(!streamingCheckpointEnabled), - new CommitterMetrics(metricGroup)); + + return (user, metricGroup) -> { + TableCommitImpl tableCommit = table.newCommit(user) + .withOverwrite(overwritePartition) + .ignoreEmptyCommit(!streamingCheckpointEnabled); + FileStoreCommit storeCommit = tableCommit.getStoreCommit(); + CommitMetrics commitMetrics = null; + if (storeCommit instanceof FileStoreCommitImpl) { + commitMetrics = ((FileStoreCommitImpl) storeCommit).getCommitMetrics(); + } + return new StoreCommitter(tableCommit, new CommitterMetrics(metricGroup, commitMetrics)); + }; } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java index fe0b35a0a1e3..785cd66a8ed4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java @@ -38,6 +38,7 @@ public class StoreCommitter implements Committer { private final TableCommitImpl commit; + @Nullable private final CommitterMetrics metrics; public StoreCommitter(TableCommit commit, @Nullable CommitterMetrics metrics) { @@ -122,4 +123,9 @@ private static long calcTotalFileSize(List files) { private static long calcTotalFileRowCount(List files) { return files.stream().mapToLong(DataFileMeta::rowCount).reduce(Long::sum).orElse(0); } + + @Nullable + public CommitterMetrics getMetrics() { + return metrics; + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java index 6559e9969dd1..e08a393f949a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java @@ -23,8 +23,12 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.manifest.WrappedManifestCommittable; +import org.apache.paimon.metrics.commit.CommitMetrics; +import org.apache.paimon.operation.FileStoreCommit; +import org.apache.paimon.operation.FileStoreCommitImpl; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.flink.api.java.tuple.Tuple2; @@ -173,9 +177,16 @@ private StoreCommitter getStoreCommitter(Identifier tableId) { "Failed to get committer for table %s", tableId.getFullName()), e); } + TableCommitImpl tableCommit = table.newCommit(commitUser).ignoreEmptyCommit(isCompactJob); + FileStoreCommit storeCommit = tableCommit.getStoreCommit(); + CommitMetrics commitMetrics = null; + if (storeCommit instanceof FileStoreCommitImpl) { + commitMetrics = ((FileStoreCommitImpl) storeCommit).getCommitMetrics(); + } + metrics.registerCommitMetrics(commitMetrics); committer = new StoreCommitter( - table.newCommit(commitUser).ignoreEmptyCommit(isCompactJob), metrics); + tableCommit, metrics); tableCommitters.put(tableId, committer); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java index 447d7972d8da..41c1006c7e6f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java @@ -23,11 +23,14 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.manifest.ManifestCommittableSerializer; +import org.apache.paimon.metrics.commit.CommitMetrics; +import org.apache.paimon.operation.FileStoreCommit; +import org.apache.paimon.operation.FileStoreCommitImpl; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; -import org.apache.paimon.table.sink.StreamTableCommit; import org.apache.paimon.table.sink.StreamTableWrite; import org.apache.paimon.table.sink.StreamWriteBuilder; +import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.ThrowingConsumer; @@ -37,6 +40,7 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.state.StateInitializationContext; @@ -333,11 +337,8 @@ public void testCalcDataBytesSend() throws Exception { for (CommitMessage commitMessage : committable) { manifestCommittable.addFileCommittable(commitMessage); } - - StreamTableCommit commit = table.newCommit(initialCommitUser); - CommitterMetrics metrics = - new CommitterMetrics(UnregisteredMetricsGroup.createOperatorIOMetricGroup()); - StoreCommitter committer = new StoreCommitter(commit, metrics); + StoreCommitter committer = createStoreCommitter(table, initialCommitUser, UnregisteredMetricsGroup.createOperatorMetricGroup()); + CommitterMetrics metrics = committer.getMetrics(); committer.commit(Collections.singletonList(manifestCommittable)); assertThat(metrics.getNumBytesOutCounter().getCount()).isEqualTo(275); assertThat(metrics.getNumRecordsOutCounter().getCount()).isEqualTo(2); @@ -390,10 +391,7 @@ protected OneInputStreamOperator createCommitterOperat return new CommitterOperator<>( true, commitUser == null ? initialCommitUser : commitUser, - (user, metricGroup) -> - new StoreCommitter( - table.newStreamWriteBuilder().withCommitUser(user).newCommit(), - new CommitterMetrics(metricGroup)), + (user, metricGroup) -> createStoreCommitter(table, user, metricGroup), committableStateManager); } @@ -405,10 +403,7 @@ protected OneInputStreamOperator createCommitterOperat return new CommitterOperator( true, commitUser == null ? initialCommitUser : commitUser, - (user, metricGroup) -> - new StoreCommitter( - table.newStreamWriteBuilder().withCommitUser(user).newCommit(), - new CommitterMetrics(metricGroup)), + (user, metricGroup) -> createStoreCommitter(table, user, metricGroup), committableStateManager) { @Override public void initializeState(StateInitializationContext context) throws Exception { @@ -416,4 +411,15 @@ public void initializeState(StateInitializationContext context) throws Exception } }; } + + private StoreCommitter createStoreCommitter(FileStoreTable table, String user, OperatorMetricGroup metricGroup) { + TableCommitImpl tableCommit = table.newCommit(user); + FileStoreCommit storeCommit = tableCommit.getStoreCommit(); + CommitMetrics commitMetrics = null; + if (storeCommit instanceof FileStoreCommitImpl) { + commitMetrics = ((FileStoreCommitImpl) storeCommit).getCommitMetrics(); + } + CommitterMetrics metrics = new CommitterMetrics(metricGroup, commitMetrics); + return new StoreCommitter(tableCommit, metrics); + } }