From dbec0c3e606a1517dc868ee143c2b5e9fb796dd4 Mon Sep 17 00:00:00 2001 From: GuojunLi Date: Fri, 20 Oct 2023 11:15:21 +0800 Subject: [PATCH] [core] Support commit metrics (#1638) This closes #1638. --- .../paimon/metrics/AbstractMetricGroup.java | 10 + .../org/apache/paimon/metrics/Metrics.java | 7 + .../paimon/metrics/commit/CommitMetrics.java | 137 ++++++++ .../paimon/metrics/commit/CommitStats.java | 206 ++++++++++++ .../metrics/groups/GenericMetricGroup.java | 4 +- .../paimon/operation/FileStoreCommit.java | 4 + .../paimon/operation/FileStoreCommitImpl.java | 292 +++++++++++------- .../paimon/table/AbstractFileStoreTable.java | 3 +- .../paimon/table/sink/TableCommitImpl.java | 9 +- .../manifest/ManifestFileMetaTestBase.java | 23 ++ .../metrics/commit/CommitMetricsTest.java | 286 +++++++++++++++++ .../metrics/commit/CommitStatsTest.java | 154 +++++++++ 12 files changed, 1024 insertions(+), 111 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitMetrics.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitStats.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitMetricsTest.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitStatsTest.java diff --git a/paimon-core/src/main/java/org/apache/paimon/metrics/AbstractMetricGroup.java b/paimon-core/src/main/java/org/apache/paimon/metrics/AbstractMetricGroup.java index 78f293a50e05..41980ff3e2bc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metrics/AbstractMetricGroup.java +++ b/paimon-core/src/main/java/org/apache/paimon/metrics/AbstractMetricGroup.java @@ -190,4 +190,14 @@ public void close() { public final boolean isClosed() { return closed; } + + @Override + public String toString() { + return "MetricGroup{" + + "groupName=" + + getGroupName() + + ", metrics=" + + String.join(",", metrics.keySet()) + + '}'; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/metrics/Metrics.java b/paimon-core/src/main/java/org/apache/paimon/metrics/Metrics.java index 6ef28749bb61..379f3d6c6db3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metrics/Metrics.java +++ b/paimon-core/src/main/java/org/apache/paimon/metrics/Metrics.java @@ -19,6 +19,7 @@ package org.apache.paimon.metrics; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.stream.Collectors; /** Core of Paimon metrics system. */ public class Metrics { @@ -53,4 +54,10 @@ public void removeGroup(AbstractMetricGroup group) { public ConcurrentLinkedQueue getMetricGroups() { return metricGroups; } + + public static String groupsInfo() { + return getInstance().getMetricGroups().stream() + .map(Object::toString) + .collect(Collectors.joining(", ", "[", "]")); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitMetrics.java b/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitMetrics.java new file mode 100644 index 000000000000..a216a94e21a1 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitMetrics.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.metrics.commit; + +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.metrics.AbstractMetricGroup; +import org.apache.paimon.metrics.DescriptiveStatisticsHistogram; +import org.apache.paimon.metrics.Histogram; +import org.apache.paimon.metrics.groups.GenericMetricGroup; + +/** Metrics to measure a commit. */ +public class CommitMetrics { + private static final int HISTOGRAM_WINDOW_SIZE = 10_000; + protected static final String GROUP_NAME = "commit"; + + private final AbstractMetricGroup genericMetricGroup; + + public CommitMetrics(String tableName) { + this.genericMetricGroup = + GenericMetricGroup.createGenericMetricGroup(tableName, GROUP_NAME); + registerGenericCommitMetrics(); + } + + @VisibleForTesting + public AbstractMetricGroup getMetricGroup() { + return genericMetricGroup; + } + + private final Histogram durationHistogram = + new DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE); + + private CommitStats latestCommit; + + @VisibleForTesting static final String LAST_COMMIT_DURATION = "lastCommitDuration"; + @VisibleForTesting static final String COMMIT_DURATION = "commitDuration"; + @VisibleForTesting static final String LAST_COMMIT_ATTEMPTS = "lastCommitAttempts"; + @VisibleForTesting static final String LAST_TABLE_FILES_ADDED = "lastTableFilesAdded"; + @VisibleForTesting static final String LAST_TABLE_FILES_DELETED = "lastTableFilesDeleted"; + @VisibleForTesting static final String LAST_TABLE_FILES_APPENDED = "lastTableFilesAppended"; + + @VisibleForTesting + static final String LAST_TABLE_FILES_COMMIT_COMPACTED = "lastTableFilesCommitCompacted"; + + @VisibleForTesting + static final String LAST_CHANGELOG_FILES_APPENDED = "lastChangelogFilesAppended"; + + @VisibleForTesting + static final String LAST_CHANGELOG_FILES_COMMIT_COMPACTED = "lastChangelogFileCommitCompacted"; + + @VisibleForTesting static final String LAST_GENERATED_SNAPSHOTS = "lastGeneratedSnapshots"; + @VisibleForTesting static final String LAST_DELTA_RECORDS_APPENDED = "lastDeltaRecordsAppended"; + + @VisibleForTesting + static final String LAST_CHANGELOG_RECORDS_APPENDED = "lastChangelogRecordsAppended"; + + @VisibleForTesting + static final String LAST_DELTA_RECORDS_COMMIT_COMPACTED = "lastDeltaRecordsCommitCompacted"; + + @VisibleForTesting + static final String LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED = + "lastChangelogRecordsCommitCompacted"; + + @VisibleForTesting static final String LAST_PARTITIONS_WRITTEN = "lastPartitionsWritten"; + @VisibleForTesting static final String LAST_BUCKETS_WRITTEN = "lastBucketsWritten"; + + private void registerGenericCommitMetrics() { + genericMetricGroup.gauge( + LAST_COMMIT_DURATION, () -> latestCommit == null ? 0L : latestCommit.getDuration()); + genericMetricGroup.gauge( + LAST_COMMIT_ATTEMPTS, () -> latestCommit == null ? 0L : latestCommit.getAttempts()); + genericMetricGroup.gauge( + LAST_GENERATED_SNAPSHOTS, + () -> latestCommit == null ? 0L : latestCommit.getGeneratedSnapshots()); + genericMetricGroup.gauge( + LAST_PARTITIONS_WRITTEN, + () -> latestCommit == null ? 0L : latestCommit.getNumPartitionsWritten()); + genericMetricGroup.gauge( + LAST_BUCKETS_WRITTEN, + () -> latestCommit == null ? 0L : latestCommit.getNumBucketsWritten()); + genericMetricGroup.histogram(COMMIT_DURATION, durationHistogram); + genericMetricGroup.gauge( + LAST_TABLE_FILES_ADDED, + () -> latestCommit == null ? 0L : latestCommit.getTableFilesAdded()); + genericMetricGroup.gauge( + LAST_TABLE_FILES_DELETED, + () -> latestCommit == null ? 0L : latestCommit.getTableFilesDeleted()); + genericMetricGroup.gauge( + LAST_TABLE_FILES_APPENDED, + () -> latestCommit == null ? 0L : latestCommit.getTableFilesAppended()); + genericMetricGroup.gauge( + LAST_TABLE_FILES_COMMIT_COMPACTED, + () -> latestCommit == null ? 0L : latestCommit.getTableFilesCompacted()); + genericMetricGroup.gauge( + LAST_CHANGELOG_FILES_APPENDED, + () -> latestCommit == null ? 0L : latestCommit.getChangelogFilesAppended()); + genericMetricGroup.gauge( + LAST_CHANGELOG_FILES_COMMIT_COMPACTED, + () -> latestCommit == null ? 0L : latestCommit.getChangelogFilesCompacted()); + genericMetricGroup.gauge( + LAST_DELTA_RECORDS_APPENDED, + () -> latestCommit == null ? 0L : latestCommit.getDeltaRecordsAppended()); + genericMetricGroup.gauge( + LAST_CHANGELOG_RECORDS_APPENDED, + () -> latestCommit == null ? 0L : latestCommit.getChangelogRecordsAppended()); + genericMetricGroup.gauge( + LAST_DELTA_RECORDS_COMMIT_COMPACTED, + () -> latestCommit == null ? 0L : latestCommit.getDeltaRecordsCompacted()); + genericMetricGroup.gauge( + LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED, + () -> latestCommit == null ? 0L : latestCommit.getChangelogRecordsCompacted()); + } + + public void reportCommit(CommitStats commitStats) { + latestCommit = commitStats; + durationHistogram.update(commitStats.getDuration()); + } + + public void close() { + this.genericMetricGroup.close(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitStats.java b/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitStats.java new file mode 100644 index 000000000000..0a2772f633d2 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitStats.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.metrics.commit; + +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.ManifestEntry; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** Statistics for a commit. */ +public class CommitStats { + private final long duration; + private final int attempts; + private final long tableFilesAdded; + private final long tableFilesAppended; + private final long tableFilesDeleted; + private final long changelogFilesAppended; + private final long changelogFilesCompacted; + private final long changelogRecordsCompacted; + + private final long deltaRecordsCompacted; + private final long changelogRecordsAppended; + private final long deltaRecordsAppended; + private final long tableFilesCompacted; + private final long generatedSnapshots; + private final long numPartitionsWritten; + private final long numBucketsWritten; + + public CommitStats( + List appendTableFiles, + List appendChangelogFiles, + List compactTableFiles, + List compactChangelogFiles, + long commitDuration, + int generatedSnapshots, + int attempts) { + List addedTableFiles = new ArrayList<>(appendTableFiles); + addedTableFiles.addAll( + compactTableFiles.stream() + .filter(f -> FileKind.ADD.equals(f.kind())) + .collect(Collectors.toList())); + List deletedTableFiles = + compactTableFiles.stream() + .filter(f -> FileKind.DELETE.equals(f.kind())) + .collect(Collectors.toList()); + + this.tableFilesAdded = addedTableFiles.size(); + this.tableFilesAppended = appendTableFiles.size(); + this.tableFilesDeleted = deletedTableFiles.size(); + this.tableFilesCompacted = compactTableFiles.size(); + this.changelogFilesAppended = appendChangelogFiles.size(); + this.changelogFilesCompacted = compactChangelogFiles.size(); + this.numPartitionsWritten = numChangedPartitions(appendTableFiles, compactTableFiles); + this.numBucketsWritten = numChangedBuckets(appendTableFiles, compactTableFiles); + this.changelogRecordsCompacted = getRowCounts(compactChangelogFiles); + this.deltaRecordsCompacted = getRowCounts(compactTableFiles); + this.changelogRecordsAppended = getRowCounts(appendChangelogFiles); + this.deltaRecordsAppended = getRowCounts(appendTableFiles); + this.duration = commitDuration; + this.generatedSnapshots = generatedSnapshots; + this.attempts = attempts; + } + + @VisibleForTesting + protected static long numChangedPartitions(List... changes) { + return Arrays.stream(changes) + .flatMap(Collection::stream) + .map(ManifestEntry::partition) + .distinct() + .count(); + } + + @VisibleForTesting + protected static long numChangedBuckets(List... changes) { + return changedPartBuckets(changes).values().stream().mapToLong(Set::size).sum(); + } + + @VisibleForTesting + protected static List changedPartitions(List... changes) { + return Arrays.stream(changes) + .flatMap(Collection::stream) + .map(ManifestEntry::partition) + .distinct() + .collect(Collectors.toList()); + } + + @VisibleForTesting + protected static Map> changedPartBuckets( + List... changes) { + Map> changedPartBuckets = new LinkedHashMap<>(); + Arrays.stream(changes) + .flatMap(Collection::stream) + .forEach( + entry -> + changedPartBuckets + .computeIfAbsent( + entry.partition(), k -> new LinkedHashSet<>()) + .add(entry.bucket())); + return changedPartBuckets; + } + + private long getRowCounts(List files) { + return files.stream().mapToLong(file -> file.file().rowCount()).sum(); + } + + @VisibleForTesting + protected long getTableFilesAdded() { + return tableFilesAdded; + } + + @VisibleForTesting + protected long getTableFilesDeleted() { + return tableFilesDeleted; + } + + @VisibleForTesting + protected long getTableFilesAppended() { + return tableFilesAppended; + } + + @VisibleForTesting + protected long getTableFilesCompacted() { + return tableFilesCompacted; + } + + @VisibleForTesting + protected long getChangelogFilesAppended() { + return changelogFilesAppended; + } + + @VisibleForTesting + protected long getChangelogFilesCompacted() { + return changelogFilesCompacted; + } + + @VisibleForTesting + protected long getGeneratedSnapshots() { + return generatedSnapshots; + } + + @VisibleForTesting + protected long getDeltaRecordsAppended() { + return deltaRecordsAppended; + } + + @VisibleForTesting + protected long getChangelogRecordsAppended() { + return changelogRecordsAppended; + } + + @VisibleForTesting + protected long getDeltaRecordsCompacted() { + return deltaRecordsCompacted; + } + + @VisibleForTesting + protected long getChangelogRecordsCompacted() { + return changelogRecordsCompacted; + } + + @VisibleForTesting + protected long getNumPartitionsWritten() { + return numPartitionsWritten; + } + + @VisibleForTesting + protected long getNumBucketsWritten() { + return numBucketsWritten; + } + + @VisibleForTesting + protected long getDuration() { + return duration; + } + + @VisibleForTesting + protected int getAttempts() { + return attempts; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/metrics/groups/GenericMetricGroup.java b/paimon-core/src/main/java/org/apache/paimon/metrics/groups/GenericMetricGroup.java index df927aa356b2..f1d5c31570d2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metrics/groups/GenericMetricGroup.java +++ b/paimon-core/src/main/java/org/apache/paimon/metrics/groups/GenericMetricGroup.java @@ -34,9 +34,9 @@ public class GenericMetricGroup extends AbstractMetricGroup { } public static GenericMetricGroup createGenericMetricGroup( - final String table, final String groupName) { + final String tableName, final String groupName) { Map tags = new HashMap<>(); - tags.put("table", table); + tags.put("table", tableName); return new GenericMetricGroup(tags, groupName); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java index a585d681255b..31d22c37e01c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java @@ -20,6 +20,7 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.manifest.ManifestCommittable; +import org.apache.paimon.metrics.commit.CommitMetrics; import org.apache.paimon.table.sink.CommitMessage; import java.util.List; @@ -78,4 +79,7 @@ void overwrite( /** Abort an unsuccessful commit. The data files will be deleted. */ void abort(List commitMessages); + + /** With metrics to measure commits. */ + FileStoreCommit withMetrics(CommitMetrics metrics); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index a4096497547a..77c22687c2d3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -34,6 +34,8 @@ import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.manifest.ManifestList; +import org.apache.paimon.metrics.commit.CommitMetrics; +import org.apache.paimon.metrics.commit.CommitStats; import org.apache.paimon.options.MemorySize; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; @@ -112,6 +114,8 @@ public class FileStoreCommitImpl implements FileStoreCommit { @Nullable private Lock lock; private boolean ignoreEmptyCommit; + private CommitMetrics commitMetrics; + public FileStoreCommitImpl( FileIO fileIO, SchemaManager schemaManager, @@ -149,6 +153,7 @@ public FileStoreCommitImpl( this.lock = null; this.ignoreEmptyCommit = true; + this.commitMetrics = null; } @Override @@ -192,6 +197,9 @@ public void commit(ManifestCommittable committable, Map properti LOG.debug("Ready to commit\n" + committable.toString()); } + long started = System.nanoTime(); + int generatedSnapshot = 0; + int attempts = 0; Snapshot latestSnapshot = null; Long safeLatestSnapshotId = null; List baseEntries = new ArrayList<>(); @@ -208,65 +216,104 @@ public void commit(ManifestCommittable committable, Map properti compactTableFiles, compactChangelog, appendIndexFiles); + try { + if (!ignoreEmptyCommit + || !appendTableFiles.isEmpty() + || !appendChangelog.isEmpty() + || !appendIndexFiles.isEmpty()) { + // Optimization for common path. + // Step 1: + // Read manifest entries from changed partitions here and check for conflicts. + // If there are no other jobs committing at the same time, + // we can skip conflict checking in tryCommit method. + // This optimization is mainly used to decrease the number of times we read from + // files. + latestSnapshot = snapshotManager.latestSnapshot(); + if (latestSnapshot != null) { + // it is possible that some partitions only have compact changes, + // so we need to contain all changes + baseEntries.addAll( + readAllEntriesFromChangedPartitions( + latestSnapshot, appendTableFiles, compactTableFiles)); + noConflictsOrFail(latestSnapshot.commitUser(), baseEntries, appendTableFiles); + safeLatestSnapshotId = latestSnapshot.id(); + } - if (!ignoreEmptyCommit - || !appendTableFiles.isEmpty() - || !appendChangelog.isEmpty() - || !appendIndexFiles.isEmpty()) { - // Optimization for common path. - // Step 1: - // Read manifest entries from changed partitions here and check for conflicts. - // If there are no other jobs committing at the same time, - // we can skip conflict checking in tryCommit method. - // This optimization is mainly used to decrease the number of times we read from files. - latestSnapshot = snapshotManager.latestSnapshot(); - if (latestSnapshot != null) { - // it is possible that some partitions only have compact changes, - // so we need to contain all changes - baseEntries.addAll( - readAllEntriesFromChangedPartitions( - latestSnapshot, appendTableFiles, compactTableFiles)); - noConflictsOrFail(latestSnapshot.commitUser(), baseEntries, appendTableFiles); - safeLatestSnapshotId = latestSnapshot.id(); + attempts += + tryCommit( + appendTableFiles, + appendChangelog, + appendIndexFiles, + committable.identifier(), + committable.watermark(), + committable.logOffsets(), + Snapshot.CommitKind.APPEND, + safeLatestSnapshotId); + generatedSnapshot += 1; } - tryCommit( - appendTableFiles, - appendChangelog, - appendIndexFiles, - committable.identifier(), - committable.watermark(), - committable.logOffsets(), - Snapshot.CommitKind.APPEND, - safeLatestSnapshotId); - } + if (!compactTableFiles.isEmpty() || !compactChangelog.isEmpty()) { + // Optimization for common path. + // Step 2: + // Add appendChanges to the manifest entries read above and check for conflicts. + // If there are no other jobs committing at the same time, + // we can skip conflict checking in tryCommit method. + // This optimization is mainly used to decrease the number of times we read from + // files. + if (safeLatestSnapshotId != null) { + baseEntries.addAll(appendTableFiles); + noConflictsOrFail(latestSnapshot.commitUser(), baseEntries, compactTableFiles); + // assume this compact commit follows just after the append commit created above + safeLatestSnapshotId += 1; + } - if (!compactTableFiles.isEmpty() || !compactChangelog.isEmpty()) { - // Optimization for common path. - // Step 2: - // Add appendChanges to the manifest entries read above and check for conflicts. - // If there are no other jobs committing at the same time, - // we can skip conflict checking in tryCommit method. - // This optimization is mainly used to decrease the number of times we read from files. - if (safeLatestSnapshotId != null) { - baseEntries.addAll(appendTableFiles); - noConflictsOrFail(latestSnapshot.commitUser(), baseEntries, compactTableFiles); - // assume this compact commit follows just after the append commit created above - safeLatestSnapshotId += 1; + attempts += + tryCommit( + compactTableFiles, + compactChangelog, + Collections.emptyList(), + committable.identifier(), + committable.watermark(), + committable.logOffsets(), + Snapshot.CommitKind.COMPACT, + safeLatestSnapshotId); + generatedSnapshot += 1; + } + } finally { + long commitDuration = (System.nanoTime() - started) / 1_000_000; + if (this.commitMetrics != null) { + reportCommit( + appendTableFiles, + appendChangelog, + compactTableFiles, + compactChangelog, + commitDuration, + generatedSnapshot, + attempts); } - - tryCommit( - compactTableFiles, - compactChangelog, - Collections.emptyList(), - committable.identifier(), - committable.watermark(), - committable.logOffsets(), - Snapshot.CommitKind.COMPACT, - safeLatestSnapshotId); } } + private void reportCommit( + List appendTableFiles, + List appendChangelogFiles, + List compactTableFiles, + List compactChangelogFiles, + long commitDuration, + int generatedSnapshots, + int attempts) { + CommitStats commitStats = + new CommitStats( + appendTableFiles, + appendChangelogFiles, + compactTableFiles, + compactChangelogFiles, + commitDuration, + generatedSnapshots, + attempts); + commitMetrics.reportCommit(commitStats); + } + @Override public void overwrite( Map partition, @@ -280,6 +327,9 @@ public void overwrite( properties); } + long started = System.nanoTime(); + int generatedSnapshot = 0; + int attempts = 0; List appendTableFiles = new ArrayList<>(); List appendChangelog = new ArrayList<>(); List compactTableFiles = new ArrayList<>(); @@ -309,65 +359,83 @@ public void overwrite( LOG.warn(warnMessage.toString()); } - boolean skipOverwrite = false; - // partition filter is built from static or dynamic partition according to properties - Predicate partitionFilter = null; - if (dynamicPartitionOverwrite) { - if (appendTableFiles.isEmpty()) { - // in dynamic mode, if there is no changes to commit, no data will be deleted - skipOverwrite = true; + try { + boolean skipOverwrite = false; + // partition filter is built from static or dynamic partition according to properties + Predicate partitionFilter = null; + if (dynamicPartitionOverwrite) { + if (appendTableFiles.isEmpty()) { + // in dynamic mode, if there is no changes to commit, no data will be deleted + skipOverwrite = true; + } else { + partitionFilter = + appendTableFiles.stream() + .map(ManifestEntry::partition) + .distinct() + // partition filter is built from new data's partitions + .map(p -> PredicateBuilder.equalPartition(p, partitionType)) + .reduce(PredicateBuilder::or) + .orElseThrow( + () -> + new RuntimeException( + "Failed to get dynamic partition filter. This is unexpected.")); + } } else { - partitionFilter = - appendTableFiles.stream() - .map(ManifestEntry::partition) - .distinct() - // partition filter is built from new data's partitions - .map(p -> PredicateBuilder.equalPartition(p, partitionType)) - .reduce(PredicateBuilder::or) - .orElseThrow( - () -> - new RuntimeException( - "Failed to get dynamic partition filter. This is unexpected.")); - } - } else { - partitionFilter = PredicateBuilder.partition(partition, partitionType); - // sanity check, all changes must be done within the given partition - if (partitionFilter != null) { - for (ManifestEntry entry : appendTableFiles) { - if (!partitionFilter.test( - partitionObjectConverter.convert(entry.partition()))) { - throw new IllegalArgumentException( - "Trying to overwrite partition " - + partition - + ", but the changes in " - + pathFactory.getPartitionString(entry.partition()) - + " does not belong to this partition"); + partitionFilter = PredicateBuilder.partition(partition, partitionType); + // sanity check, all changes must be done within the given partition + if (partitionFilter != null) { + for (ManifestEntry entry : appendTableFiles) { + if (!partitionFilter.test( + partitionObjectConverter.convert(entry.partition()))) { + throw new IllegalArgumentException( + "Trying to overwrite partition " + + partition + + ", but the changes in " + + pathFactory.getPartitionString(entry.partition()) + + " does not belong to this partition"); + } } } } - } - // overwrite new files - if (!skipOverwrite) { - tryOverwrite( - partitionFilter, - appendTableFiles, - appendIndexFiles, - committable.identifier(), - committable.watermark(), - committable.logOffsets()); - } + // overwrite new files + if (!skipOverwrite) { + attempts += + tryOverwrite( + partitionFilter, + appendTableFiles, + appendIndexFiles, + committable.identifier(), + committable.watermark(), + committable.logOffsets()); + generatedSnapshot += 1; + } - if (!compactTableFiles.isEmpty()) { - tryCommit( - compactTableFiles, - Collections.emptyList(), - Collections.emptyList(), - committable.identifier(), - committable.watermark(), - committable.logOffsets(), - Snapshot.CommitKind.COMPACT, - null); + if (!compactTableFiles.isEmpty()) { + attempts += + tryCommit( + compactTableFiles, + Collections.emptyList(), + Collections.emptyList(), + committable.identifier(), + committable.watermark(), + committable.logOffsets(), + Snapshot.CommitKind.COMPACT, + null); + generatedSnapshot += 1; + } + } finally { + long commitDuration = (System.nanoTime() - started) / 1_000_000; + if (this.commitMetrics != null) { + reportCommit( + appendTableFiles, + Collections.emptyList(), + compactTableFiles, + Collections.emptyList(), + commitDuration, + generatedSnapshot, + attempts); + } } } @@ -430,6 +498,12 @@ public void abort(List commitMessages) { } } + @Override + public FileStoreCommit withMetrics(CommitMetrics metrics) { + this.commitMetrics = metrics; + return this; + } + private void collectChanges( List commitMessages, List appendTableFiles, @@ -481,7 +555,7 @@ private ManifestEntry makeEntry(FileKind kind, CommitMessage commitMessage, Data kind, commitMessage.partition(), commitMessage.bucket(), numBucket, file); } - private void tryCommit( + private int tryCommit( List tableFiles, List changelogFiles, List indexFiles, @@ -490,8 +564,10 @@ private void tryCommit( Map logOffsets, Snapshot.CommitKind commitKind, Long safeLatestSnapshotId) { + int cnt = 0; while (true) { Snapshot latestSnapshot = snapshotManager.latestSnapshot(); + cnt++; if (tryCommitOnce( tableFiles, changelogFiles, @@ -505,18 +581,21 @@ private void tryCommit( break; } } + return cnt; } - private void tryOverwrite( + private int tryOverwrite( Predicate partitionFilter, List changes, List indexFiles, long identifier, @Nullable Long watermark, Map logOffsets) { + int cnt = 0; while (true) { Snapshot latestSnapshot = snapshotManager.latestSnapshot(); + cnt++; List changesWithOverwrite = new ArrayList<>(); List indexChangesWithOverwrite = new ArrayList<>(); if (latestSnapshot != null) { @@ -565,6 +644,7 @@ private void tryOverwrite( break; } } + return cnt; } @VisibleForTesting diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 916fe17f455c..875e2fed7a8c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -261,7 +261,8 @@ public TableCommitImpl newCommit(String commitUser) { catalogEnvironment.lockFactory().create(), CoreOptions.fromMap(options()).consumerExpireTime(), new ConsumerManager(fileIO, path), - coreOptions().snapshotExpireExecutionMode()); + coreOptions().snapshotExpireExecutionMode(), + name()); } private List createCommitCallbacks() { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java index 7293dce3a4f5..19e95bb0cc6d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java @@ -21,6 +21,7 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.consumer.ConsumerManager; import org.apache.paimon.manifest.ManifestCommittable; +import org.apache.paimon.metrics.commit.CommitMetrics; import org.apache.paimon.operation.FileStoreCommit; import org.apache.paimon.operation.FileStoreExpire; import org.apache.paimon.operation.Lock; @@ -75,6 +76,7 @@ public class TableCommitImpl implements InnerTableCommit { private ExecutorService expireMainExecutor; private AtomicReference expireError; + private final CommitMetrics commitMetrics; public TableCommitImpl( FileStoreCommit commit, @@ -85,8 +87,10 @@ public TableCommitImpl( Lock lock, @Nullable Duration consumerExpireTime, ConsumerManager consumerManager, - ExpireExecutionMode expireExecutionMode) { - commit.withLock(lock); + ExpireExecutionMode expireExecutionMode, + String tableName) { + this.commitMetrics = new CommitMetrics(tableName); + commit.withLock(lock).withMetrics(commitMetrics); if (expire != null) { expire.withLock(lock); } @@ -261,6 +265,7 @@ public void close() throws Exception { } IOUtils.closeQuietly(lock); expireMainExecutor.shutdownNow(); + commitMetrics.close(); } @Override diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java index e66a6eccbbf1..ac0c0a64e2a1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java @@ -39,6 +39,7 @@ import java.util.List; import java.util.stream.Collectors; +import static org.apache.paimon.io.DataFileTestUtils.row; import static org.assertj.core.api.Assertions.assertThat; /** base class for Test {@link ManifestFile}. */ @@ -221,4 +222,26 @@ protected void addDeltaManifests(List input, boolean hasPartit makeEntry(false, "B2", partition2), makeEntry(true, "D2", partition2))); } + + public static ManifestEntry makeEntry( + FileKind fileKind, int partition, int bucket, long rowCount) { + return new ManifestEntry( + fileKind, + row(partition), + bucket, + 0, // not used + new DataFileMeta( + "", // not used + 0, // not used + rowCount, + null, // not used + null, // not used + StatsTestUtils.newEmptyTableStats(), // not used + StatsTestUtils.newEmptyTableStats(), // not used + 0, // not used + 0, // not used + 0, // not used + 0 // not used + )); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitMetricsTest.java b/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitMetricsTest.java new file mode 100644 index 000000000000..dead22c2ed69 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitMetricsTest.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.metrics.commit; + +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.metrics.Gauge; +import org.apache.paimon.metrics.Histogram; +import org.apache.paimon.metrics.Metric; +import org.apache.paimon.metrics.MetricGroup; +import org.apache.paimon.metrics.Metrics; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.paimon.manifest.ManifestFileMetaTestBase.makeEntry; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.offset; + +/** Tests for {@link CommitMetrics}. */ +public class CommitMetricsTest { + private static final String TABLE_NAME = "myTable"; + + private CommitMetrics commitMetrics; + + @BeforeEach + public void beforeEach() { + commitMetrics = getCommitMetrics(); + } + + @AfterEach + public void afterEach() { + commitMetrics.close(); + } + + /** Tests the registration of the commit metrics. */ + @Test + public void testGenericMetricsRegistration() { + MetricGroup genericMetricGroup = commitMetrics.getMetricGroup(); + assertThat(Metrics.getInstance().getMetricGroups().size()) + .withFailMessage( + String.format( + "Please close the created metric groups %s in case of metrics resource leak.", + Metrics.groupsInfo())) + .isEqualTo(1); + assertThat(genericMetricGroup.getGroupName()).isEqualTo(CommitMetrics.GROUP_NAME); + Map registeredMetrics = genericMetricGroup.getMetrics(); + assertThat(registeredMetrics.keySet()) + .containsExactlyInAnyOrder( + CommitMetrics.LAST_COMMIT_DURATION, + CommitMetrics.LAST_COMMIT_ATTEMPTS, + CommitMetrics.LAST_GENERATED_SNAPSHOTS, + CommitMetrics.LAST_PARTITIONS_WRITTEN, + CommitMetrics.LAST_BUCKETS_WRITTEN, + CommitMetrics.COMMIT_DURATION, + CommitMetrics.LAST_TABLE_FILES_ADDED, + CommitMetrics.LAST_TABLE_FILES_DELETED, + CommitMetrics.LAST_TABLE_FILES_APPENDED, + CommitMetrics.LAST_TABLE_FILES_COMMIT_COMPACTED, + CommitMetrics.LAST_CHANGELOG_FILES_APPENDED, + CommitMetrics.LAST_CHANGELOG_FILES_COMMIT_COMPACTED, + CommitMetrics.LAST_DELTA_RECORDS_APPENDED, + CommitMetrics.LAST_CHANGELOG_RECORDS_APPENDED, + CommitMetrics.LAST_DELTA_RECORDS_COMMIT_COMPACTED, + CommitMetrics.LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED); + + reportOnce(commitMetrics); + assertThat(Metrics.getInstance().getMetricGroups().size()).isEqualTo(1); + } + + /** Tests that the metrics are updated properly. */ + @Test + public void testMetricsAreUpdated() { + Map registeredGenericMetrics = commitMetrics.getMetricGroup().getMetrics(); + + // Check initial values + Gauge lastCommitDuration = + (Gauge) registeredGenericMetrics.get(CommitMetrics.LAST_COMMIT_DURATION); + Histogram commitDuration = + (Histogram) registeredGenericMetrics.get(CommitMetrics.COMMIT_DURATION); + Gauge lastCommitAttempts = + (Gauge) registeredGenericMetrics.get(CommitMetrics.LAST_COMMIT_ATTEMPTS); + Gauge lastGeneratedSnapshots = + (Gauge) registeredGenericMetrics.get(CommitMetrics.LAST_GENERATED_SNAPSHOTS); + Gauge lastPartitionsWritten = + (Gauge) registeredGenericMetrics.get(CommitMetrics.LAST_PARTITIONS_WRITTEN); + Gauge lastBucketsWritten = + (Gauge) registeredGenericMetrics.get(CommitMetrics.LAST_BUCKETS_WRITTEN); + Gauge lastTableFilesAdded = + (Gauge) registeredGenericMetrics.get(CommitMetrics.LAST_TABLE_FILES_ADDED); + Gauge lastTableFilesDeleted = + (Gauge) registeredGenericMetrics.get(CommitMetrics.LAST_TABLE_FILES_DELETED); + + Gauge lastTableFilesAppended = + (Gauge) registeredGenericMetrics.get(CommitMetrics.LAST_TABLE_FILES_APPENDED); + + Gauge lastTableFilesCompacted = + (Gauge) + registeredGenericMetrics.get( + CommitMetrics.LAST_TABLE_FILES_COMMIT_COMPACTED); + + Gauge lastChangelogFilesAppended = + (Gauge) + registeredGenericMetrics.get(CommitMetrics.LAST_CHANGELOG_FILES_APPENDED); + + Gauge lastChangelogFilesCompacted = + (Gauge) + registeredGenericMetrics.get( + CommitMetrics.LAST_CHANGELOG_FILES_COMMIT_COMPACTED); + + Gauge lastDeltaRecordsAppended = + (Gauge) + registeredGenericMetrics.get(CommitMetrics.LAST_DELTA_RECORDS_APPENDED); + + Gauge lastChangelogRecordsAppended = + (Gauge) + registeredGenericMetrics.get(CommitMetrics.LAST_CHANGELOG_RECORDS_APPENDED); + + Gauge lastDeltaRecordsCompacted = + (Gauge) + registeredGenericMetrics.get( + CommitMetrics.LAST_DELTA_RECORDS_COMMIT_COMPACTED); + + Gauge lastChangelogRecordsCompacted = + (Gauge) + registeredGenericMetrics.get( + CommitMetrics.LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED); + + assertThat(lastCommitDuration.getValue()).isEqualTo(0); + assertThat(commitDuration.getCount()).isEqualTo(0); + assertThat(commitDuration.getStatistics().size()).isEqualTo(0); + assertThat(lastCommitAttempts.getValue()).isEqualTo(0); + assertThat(lastGeneratedSnapshots.getValue()).isEqualTo(0); + assertThat(lastPartitionsWritten.getValue()).isEqualTo(0); + assertThat(lastBucketsWritten.getValue()).isEqualTo(0); + assertThat(lastTableFilesAdded.getValue()).isEqualTo(0); + assertThat(lastTableFilesDeleted.getValue()).isEqualTo(0); + assertThat(lastTableFilesAppended.getValue()).isEqualTo(0); + assertThat(lastTableFilesCompacted.getValue()).isEqualTo(0); + assertThat(lastChangelogFilesAppended.getValue()).isEqualTo(0); + assertThat(lastChangelogFilesCompacted.getValue()).isEqualTo(0); + assertThat(lastDeltaRecordsAppended.getValue()).isEqualTo(0); + assertThat(lastChangelogRecordsAppended.getValue()).isEqualTo(0); + assertThat(lastDeltaRecordsCompacted.getValue()).isEqualTo(0); + assertThat(lastChangelogRecordsCompacted.getValue()).isEqualTo(0); + + // report once + reportOnce(commitMetrics); + + // generic metrics value updated + assertThat(lastCommitDuration.getValue()).isEqualTo(200); + assertThat(commitDuration.getCount()).isEqualTo(1); + assertThat(commitDuration.getStatistics().size()).isEqualTo(1); + assertThat(commitDuration.getStatistics().getValues()[0]).isEqualTo(200L); + assertThat(commitDuration.getStatistics().getMin()).isEqualTo(200); + assertThat(commitDuration.getStatistics().getQuantile(0.5)).isCloseTo(200.0, offset(0.001)); + assertThat(commitDuration.getStatistics().getMean()).isEqualTo(200); + assertThat(commitDuration.getStatistics().getMax()).isEqualTo(200); + assertThat(commitDuration.getStatistics().getStdDev()).isEqualTo(0); + assertThat(lastCommitAttempts.getValue()).isEqualTo(1); + assertThat(lastGeneratedSnapshots.getValue()).isEqualTo(2); + assertThat(lastPartitionsWritten.getValue()).isEqualTo(3); + assertThat(lastBucketsWritten.getValue()).isEqualTo(3); + assertThat(lastTableFilesAdded.getValue()).isEqualTo(4); + assertThat(lastTableFilesDeleted.getValue()).isEqualTo(1); + assertThat(lastTableFilesAppended.getValue()).isEqualTo(2); + assertThat(lastTableFilesCompacted.getValue()).isEqualTo(3); + assertThat(lastChangelogFilesAppended.getValue()).isEqualTo(2); + assertThat(lastChangelogFilesCompacted.getValue()).isEqualTo(2); + assertThat(lastDeltaRecordsAppended.getValue()).isEqualTo(503); + assertThat(lastChangelogRecordsAppended.getValue()).isEqualTo(503); + assertThat(lastDeltaRecordsCompacted.getValue()).isEqualTo(613); + assertThat(lastChangelogRecordsCompacted.getValue()).isEqualTo(512); + + // report again + reportAgain(commitMetrics); + + // generic metrics value updated + assertThat(lastCommitDuration.getValue()).isEqualTo(500); + assertThat(commitDuration.getCount()).isEqualTo(2); + assertThat(commitDuration.getStatistics().size()).isEqualTo(2); + assertThat(commitDuration.getStatistics().getValues()[1]).isEqualTo(500L); + assertThat(commitDuration.getStatistics().getMin()).isEqualTo(200); + assertThat(commitDuration.getStatistics().getQuantile(0.5)).isCloseTo(350.0, offset(0.001)); + assertThat(commitDuration.getStatistics().getMean()).isEqualTo(350); + assertThat(commitDuration.getStatistics().getMax()).isEqualTo(500); + assertThat(commitDuration.getStatistics().getStdDev()).isCloseTo(212.132, offset(0.001)); + assertThat(lastCommitAttempts.getValue()).isEqualTo(2); + assertThat(lastGeneratedSnapshots.getValue()).isEqualTo(1); + assertThat(lastPartitionsWritten.getValue()).isEqualTo(2); + assertThat(lastBucketsWritten.getValue()).isEqualTo(3); + assertThat(lastTableFilesAdded.getValue()).isEqualTo(4); + assertThat(lastTableFilesDeleted.getValue()).isEqualTo(1); + assertThat(lastTableFilesAppended.getValue()).isEqualTo(2); + assertThat(lastTableFilesCompacted.getValue()).isEqualTo(3); + assertThat(lastChangelogFilesAppended.getValue()).isEqualTo(2); + assertThat(lastChangelogFilesCompacted.getValue()).isEqualTo(2); + assertThat(lastDeltaRecordsAppended.getValue()).isEqualTo(805); + assertThat(lastChangelogRecordsAppended.getValue()).isEqualTo(213); + assertThat(lastDeltaRecordsCompacted.getValue()).isEqualTo(506); + assertThat(lastChangelogRecordsCompacted.getValue()).isEqualTo(601); + } + + private void reportOnce(CommitMetrics commitMetrics) { + List appendTableFiles = new ArrayList<>(); + List appendChangelogFiles = new ArrayList<>(); + List compactTableFiles = new ArrayList<>(); + List compactChangelogFiles = new ArrayList<>(); + + appendTableFiles.add(makeEntry(FileKind.ADD, 1, 1, 201)); + appendTableFiles.add(makeEntry(FileKind.ADD, 2, 3, 302)); + appendChangelogFiles.add(makeEntry(FileKind.ADD, 1, 1, 202)); + appendChangelogFiles.add(makeEntry(FileKind.ADD, 2, 3, 301)); + compactTableFiles.add(makeEntry(FileKind.ADD, 1, 1, 203)); + compactTableFiles.add(makeEntry(FileKind.ADD, 2, 3, 304)); + compactTableFiles.add(makeEntry(FileKind.DELETE, 3, 5, 106)); + compactChangelogFiles.add(makeEntry(FileKind.ADD, 1, 1, 205)); + compactChangelogFiles.add(makeEntry(FileKind.ADD, 2, 3, 307)); + + CommitStats commitStats = + new CommitStats( + appendTableFiles, + appendChangelogFiles, + compactTableFiles, + compactChangelogFiles, + 200, + 2, + 1); + + commitMetrics.reportCommit(commitStats); + } + + private void reportAgain(CommitMetrics commitMetrics) { + List appendTableFiles = new ArrayList<>(); + List appendChangelogFiles = new ArrayList<>(); + List compactTableFiles = new ArrayList<>(); + List compactChangelogFiles = new ArrayList<>(); + + appendTableFiles.add(makeEntry(FileKind.ADD, 1, 1, 400)); + appendTableFiles.add(makeEntry(FileKind.ADD, 3, 4, 405)); + appendChangelogFiles.add(makeEntry(FileKind.ADD, 1, 1, 102)); + appendChangelogFiles.add(makeEntry(FileKind.ADD, 3, 4, 111)); + compactTableFiles.add(makeEntry(FileKind.ADD, 1, 1, 200)); + compactTableFiles.add(makeEntry(FileKind.ADD, 3, 4, 201)); + compactTableFiles.add(makeEntry(FileKind.DELETE, 3, 5, 105)); + compactChangelogFiles.add(makeEntry(FileKind.ADD, 1, 1, 300)); + compactChangelogFiles.add(makeEntry(FileKind.ADD, 3, 4, 301)); + + CommitStats commitStats = + new CommitStats( + appendTableFiles, + appendChangelogFiles, + compactTableFiles, + compactChangelogFiles, + 500, + 1, + 2); + + commitMetrics.reportCommit(commitStats); + } + + private CommitMetrics getCommitMetrics() { + return new CommitMetrics(TABLE_NAME); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitStatsTest.java b/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitStatsTest.java new file mode 100644 index 000000000000..640dba08f2ad --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitStatsTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.metrics.commit; + +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.ManifestEntry; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.apache.paimon.io.DataFileTestUtils.row; +import static org.apache.paimon.manifest.ManifestFileMetaTestBase.makeEntry; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link CommitStats}. */ +public class CommitStatsTest { + private static List files = new ArrayList<>(); + private static List appendDataFiles = new ArrayList<>(); + private static List appendChangelogFiles = new ArrayList<>(); + private static List compactDataFiles = new ArrayList<>(); + private static List compactChangelogFiles = new ArrayList<>(); + + @BeforeAll + public static void beforeAll() { + appendDataFiles.add(makeEntry(FileKind.ADD, 1, 1, 201)); + appendDataFiles.add(makeEntry(FileKind.ADD, 2, 3, 302)); + appendChangelogFiles.add(makeEntry(FileKind.ADD, 1, 1, 202)); + appendChangelogFiles.add(makeEntry(FileKind.ADD, 2, 3, 301)); + compactDataFiles.add(makeEntry(FileKind.ADD, 1, 1, 203)); + compactDataFiles.add(makeEntry(FileKind.ADD, 2, 3, 304)); + compactDataFiles.add(makeEntry(FileKind.DELETE, 3, 5, 106)); + compactChangelogFiles.add(makeEntry(FileKind.ADD, 1, 1, 205)); + compactChangelogFiles.add(makeEntry(FileKind.ADD, 2, 3, 307)); + files.addAll(appendDataFiles); + files.addAll(appendChangelogFiles); + files.addAll(compactDataFiles); + files.addAll(compactChangelogFiles); + } + + @Test + public void testCalcChangedPartitionsAndBuckets() { + assertThat(CommitStats.numChangedBuckets(files)).isEqualTo(3); + assertThat(CommitStats.numChangedPartitions(files)).isEqualTo(3); + assertThat(CommitStats.changedPartBuckets(files).get(row(1))).containsExactly(1); + assertThat(CommitStats.changedPartBuckets(files).get(row(2))).containsExactly(3); + assertThat(CommitStats.changedPartBuckets(files).get(row(3))).containsExactly(5); + assertThat(CommitStats.changedPartitions(files)) + .containsExactlyInAnyOrder(row(1), row(2), row(3)); + } + + @Test + public void testFailedAppendSnapshot() { + CommitStats commitStats = + new CommitStats( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + 0, + 0, + 1); + assertThat(commitStats.getTableFilesAdded()).isEqualTo(0); + assertThat(commitStats.getTableFilesDeleted()).isEqualTo(0); + assertThat(commitStats.getTableFilesAppended()).isEqualTo(0); + assertThat(commitStats.getTableFilesCompacted()).isEqualTo(0); + assertThat(commitStats.getChangelogFilesAppended()).isEqualTo(0); + assertThat(commitStats.getChangelogFilesCompacted()).isEqualTo(0); + assertThat(commitStats.getGeneratedSnapshots()).isEqualTo(0); + assertThat(commitStats.getDeltaRecordsAppended()).isEqualTo(0); + assertThat(commitStats.getChangelogRecordsAppended()).isEqualTo(0); + assertThat(commitStats.getDeltaRecordsCompacted()).isEqualTo(0); + assertThat(commitStats.getChangelogRecordsCompacted()).isEqualTo(0); + assertThat(commitStats.getNumPartitionsWritten()).isEqualTo(0); + assertThat(commitStats.getNumBucketsWritten()).isEqualTo(0); + assertThat(commitStats.getDuration()).isEqualTo(0); + assertThat(commitStats.getAttempts()).isEqualTo(1); + } + + @Test + public void testFailedCompactSnapshot() { + CommitStats commitStats = + new CommitStats( + appendDataFiles, + appendChangelogFiles, + Collections.emptyList(), + Collections.emptyList(), + 3000, + 1, + 2); + assertThat(commitStats.getTableFilesAdded()).isEqualTo(2); + assertThat(commitStats.getTableFilesDeleted()).isEqualTo(0); + assertThat(commitStats.getTableFilesAppended()).isEqualTo(2); + assertThat(commitStats.getTableFilesCompacted()).isEqualTo(0); + assertThat(commitStats.getChangelogFilesAppended()).isEqualTo(2); + assertThat(commitStats.getChangelogFilesCompacted()).isEqualTo(0); + assertThat(commitStats.getGeneratedSnapshots()).isEqualTo(1); + assertThat(commitStats.getDeltaRecordsAppended()).isEqualTo(503); + assertThat(commitStats.getChangelogRecordsAppended()).isEqualTo(503); + assertThat(commitStats.getDeltaRecordsCompacted()).isEqualTo(0); + assertThat(commitStats.getChangelogRecordsCompacted()).isEqualTo(0); + assertThat(commitStats.getNumPartitionsWritten()).isEqualTo(2); + assertThat(commitStats.getNumBucketsWritten()).isEqualTo(2); + assertThat(commitStats.getDuration()).isEqualTo(3000); + assertThat(commitStats.getAttempts()).isEqualTo(2); + } + + @Test + public void testSucceedAllSnapshot() { + CommitStats commitStats = + new CommitStats( + appendDataFiles, + appendChangelogFiles, + compactDataFiles, + compactChangelogFiles, + 3000, + 2, + 2); + assertThat(commitStats.getTableFilesAdded()).isEqualTo(4); + assertThat(commitStats.getTableFilesDeleted()).isEqualTo(1); + assertThat(commitStats.getTableFilesAppended()).isEqualTo(2); + assertThat(commitStats.getTableFilesCompacted()).isEqualTo(3); + assertThat(commitStats.getChangelogFilesAppended()).isEqualTo(2); + assertThat(commitStats.getChangelogFilesCompacted()).isEqualTo(2); + assertThat(commitStats.getGeneratedSnapshots()).isEqualTo(2); + assertThat(commitStats.getDeltaRecordsAppended()).isEqualTo(503); + assertThat(commitStats.getChangelogRecordsAppended()).isEqualTo(503); + assertThat(commitStats.getDeltaRecordsCompacted()).isEqualTo(613); + assertThat(commitStats.getChangelogRecordsCompacted()).isEqualTo(512); + assertThat(commitStats.getNumPartitionsWritten()).isEqualTo(3); + assertThat(commitStats.getNumBucketsWritten()).isEqualTo(3); + assertThat(commitStats.getDuration()).isEqualTo(3000); + assertThat(commitStats.getAttempts()).isEqualTo(2); + } +}