Skip to content

Commit

Permalink
register commit metrics to flink metric group
Browse files Browse the repository at this point in the history
  • Loading branch information
schnappi17 committed Oct 7, 2023
1 parent f65af6f commit ae1dcc4
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
/** Metrics to measure a commit. */
public class CommitMetrics {
private static final int HISTOGRAM_WINDOW_SIZE = 10_000;
protected static final String GROUP_NAME = "commit";
public static final String GROUP_NAME = "commit";

private final AbstractMetricGroup genericMetricGroup;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,10 @@ private void expire(long partitionExpireIdentifier) {
}
}

public FileStoreCommit getStoreCommit() {
return commit;
}

@Override
public void close() throws Exception {
for (CommitCallback commitCallback : commitCallbacks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.paimon.flink.sink;

import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;

import java.io.IOException;
import java.io.Serializable;
Expand Down Expand Up @@ -54,6 +54,6 @@ GlobalCommitT combine(long checkpointId, long watermark, List<CommitT> committab
interface Factory<CommitT, GlobalCommitT> extends Serializable {

Committer<CommitT, GlobalCommitT> create(
String commitUser, OperatorIOMetricGroup metricGroup);
String commitUser, OperatorMetricGroup metricGroup);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,72 @@
package org.apache.paimon.flink.sink;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.metrics.Metric;
import org.apache.paimon.metrics.commit.CommitMetrics;

import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.MetricNames;

import javax.annotation.Nullable;

import java.util.Map;

/** Flink metrics for {@link Committer}. */
public class CommitterMetrics {

private static final String SINK_METRIC_GROUP = "sink";

private final Counter numBytesOutCounter;
private final Counter numRecordsOutCounter;
private final OperatorMetricGroup metricGroup;

public CommitterMetrics(OperatorMetricGroup metricGroup) {
this(metricGroup, null);
}

public CommitterMetrics(OperatorIOMetricGroup metricGroup) {
MetricGroup sinkMetricGroup = metricGroup.addGroup(SINK_METRIC_GROUP);
public CommitterMetrics(OperatorMetricGroup metricGroup, @Nullable CommitMetrics commitMetrics) {
this.metricGroup = metricGroup;
OperatorIOMetricGroup operatorIOMetricGroup = metricGroup.getIOMetricGroup();
MetricGroup sinkMetricGroup = operatorIOMetricGroup.addGroup(SINK_METRIC_GROUP);

numBytesOutCounter = metricGroup.getNumBytesOutCounter();
numBytesOutCounter = operatorIOMetricGroup.getNumBytesOutCounter();
sinkMetricGroup.counter(MetricNames.IO_NUM_BYTES_OUT, numBytesOutCounter);
sinkMetricGroup.meter(MetricNames.IO_NUM_BYTES_OUT_RATE, new MeterView(numBytesOutCounter));

numRecordsOutCounter = metricGroup.getNumRecordsOutCounter();
numRecordsOutCounter = operatorIOMetricGroup.getNumRecordsOutCounter();
sinkMetricGroup.counter(MetricNames.IO_NUM_RECORDS_OUT, numRecordsOutCounter);
sinkMetricGroup.meter(
MetricNames.IO_NUM_RECORDS_OUT_RATE, new MeterView(numRecordsOutCounter));

// Paimon Commit Metrics
registerCommitMetrics(commitMetrics);
}

public void registerCommitMetrics(CommitMetrics commitMetrics) {
if (commitMetrics != null) {
MetricGroup commitMetricGroup = metricGroup.addGroup("commit");
for (Map.Entry<String, Metric> metric : commitMetrics.getMetricGroup().getMetrics().entrySet()) {
switch (metric.getValue().getMetricType()) {
case COUNTER:
commitMetricGroup.counter(metric.getKey(), (Counter) metric.getValue());
break;
case GAUGE:
commitMetricGroup.gauge(metric.getKey(), (Gauge<?>) metric.getValue());
break;
case HISTOGRAM:
commitMetricGroup.histogram(metric.getKey(), (Histogram) metric.getValue());
break;
default:
throw new UnsupportedOperationException("Custom metric types are not supported.");
}
}
}
}

public void increaseNumBytesOut(long numBytesOut) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void initializeState(StateInitializationContext context) throws Exception
StateUtils.getSingleValueFromState(
context, "commit_user_state", String.class, initialCommitUser);
// parallelism of commit operator is always 1, so commitUser will never be null
committer = committerFactory.create(commitUser, getMetricGroup().getIOMetricGroup());
committer = committerFactory.create(commitUser, getMetricGroup());

committableStateManager.initializeState(context, committer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestCommittableSerializer;
import org.apache.paimon.metrics.commit.CommitMetrics;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.FileStoreCommitImpl;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.TableCommitImpl;

import javax.annotation.Nullable;

Expand All @@ -46,12 +50,18 @@ protected Committer.Factory<Committable, ManifestCommittable> createCommitterFac
// commit new files list even if they're empty.
// Otherwise we can't tell if the commit is successful after
// a restart.
return (user, metricGroup) ->
new StoreCommitter(
table.newCommit(user)
.withOverwrite(overwritePartition)
.ignoreEmptyCommit(!streamingCheckpointEnabled),
new CommitterMetrics(metricGroup));

return (user, metricGroup) -> {
TableCommitImpl tableCommit = table.newCommit(user)
.withOverwrite(overwritePartition)
.ignoreEmptyCommit(!streamingCheckpointEnabled);
FileStoreCommit storeCommit = tableCommit.getStoreCommit();
CommitMetrics commitMetrics = null;
if (storeCommit instanceof FileStoreCommitImpl) {
commitMetrics = ((FileStoreCommitImpl) storeCommit).getCommitMetrics();
}
return new StoreCommitter(tableCommit, new CommitterMetrics(metricGroup, commitMetrics));
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
public class StoreCommitter implements Committer<Committable, ManifestCommittable> {

private final TableCommitImpl commit;

@Nullable private final CommitterMetrics metrics;

public StoreCommitter(TableCommit commit, @Nullable CommitterMetrics metrics) {
Expand Down Expand Up @@ -122,4 +123,9 @@ private static long calcTotalFileSize(List<DataFileMeta> files) {
private static long calcTotalFileRowCount(List<DataFileMeta> files) {
return files.stream().mapToLong(DataFileMeta::rowCount).reduce(Long::sum).orElse(0);
}

@Nullable
public CommitterMetrics getMetrics() {
return metrics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.WrappedManifestCommittable;
import org.apache.paimon.metrics.commit.CommitMetrics;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.FileStoreCommitImpl;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.TableCommitImpl;

import org.apache.flink.api.java.tuple.Tuple2;

Expand Down Expand Up @@ -173,9 +177,16 @@ private StoreCommitter getStoreCommitter(Identifier tableId) {
"Failed to get committer for table %s", tableId.getFullName()),
e);
}
TableCommitImpl tableCommit = table.newCommit(commitUser).ignoreEmptyCommit(isCompactJob);
FileStoreCommit storeCommit = tableCommit.getStoreCommit();
CommitMetrics commitMetrics = null;
if (storeCommit instanceof FileStoreCommitImpl) {
commitMetrics = ((FileStoreCommitImpl) storeCommit).getCommitMetrics();
}
metrics.registerCommitMetrics(commitMetrics);
committer =
new StoreCommitter(
table.newCommit(commitUser).ignoreEmptyCommit(isCompactJob), metrics);
tableCommit, metrics);
tableCommitters.put(tableId, committer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestCommittableSerializer;
import org.apache.paimon.metrics.commit.CommitMetrics;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.FileStoreCommitImpl;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.ThrowingConsumer;

Expand All @@ -37,6 +40,7 @@
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.StateInitializationContext;
Expand Down Expand Up @@ -333,11 +337,8 @@ public void testCalcDataBytesSend() throws Exception {
for (CommitMessage commitMessage : committable) {
manifestCommittable.addFileCommittable(commitMessage);
}

StreamTableCommit commit = table.newCommit(initialCommitUser);
CommitterMetrics metrics =
new CommitterMetrics(UnregisteredMetricsGroup.createOperatorIOMetricGroup());
StoreCommitter committer = new StoreCommitter(commit, metrics);
StoreCommitter committer = createStoreCommitter(table, initialCommitUser, UnregisteredMetricsGroup.createOperatorMetricGroup());
CommitterMetrics metrics = committer.getMetrics();
committer.commit(Collections.singletonList(manifestCommittable));
assertThat(metrics.getNumBytesOutCounter().getCount()).isEqualTo(275);
assertThat(metrics.getNumRecordsOutCounter().getCount()).isEqualTo(2);
Expand Down Expand Up @@ -390,10 +391,7 @@ protected OneInputStreamOperator<Committable, Committable> createCommitterOperat
return new CommitterOperator<>(
true,
commitUser == null ? initialCommitUser : commitUser,
(user, metricGroup) ->
new StoreCommitter(
table.newStreamWriteBuilder().withCommitUser(user).newCommit(),
new CommitterMetrics(metricGroup)),
(user, metricGroup) -> createStoreCommitter(table, user, metricGroup),
committableStateManager);
}

Expand All @@ -405,15 +403,23 @@ protected OneInputStreamOperator<Committable, Committable> createCommitterOperat
return new CommitterOperator<Committable, ManifestCommittable>(
true,
commitUser == null ? initialCommitUser : commitUser,
(user, metricGroup) ->
new StoreCommitter(
table.newStreamWriteBuilder().withCommitUser(user).newCommit(),
new CommitterMetrics(metricGroup)),
(user, metricGroup) -> createStoreCommitter(table, user, metricGroup),
committableStateManager) {
@Override
public void initializeState(StateInitializationContext context) throws Exception {
initializeFunction.accept(context);
}
};
}

private StoreCommitter createStoreCommitter(FileStoreTable table, String user, OperatorMetricGroup metricGroup) {
TableCommitImpl tableCommit = table.newCommit(user);
FileStoreCommit storeCommit = tableCommit.getStoreCommit();
CommitMetrics commitMetrics = null;
if (storeCommit instanceof FileStoreCommitImpl) {
commitMetrics = ((FileStoreCommitImpl) storeCommit).getCommitMetrics();
}
CommitterMetrics metrics = new CommitterMetrics(metricGroup, commitMetrics);
return new StoreCommitter(tableCommit, metrics);
}
}

0 comments on commit ae1dcc4

Please sign in to comment.