Skip to content

Commit

Permalink
[core] Support commit metrics (apache#1638)
Browse files Browse the repository at this point in the history
This closes apache#1638.
  • Loading branch information
schnappi17 authored Oct 20, 2023
1 parent b3fab72 commit dbec0c3
Show file tree
Hide file tree
Showing 12 changed files with 1,024 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,4 +190,14 @@ public void close() {
public final boolean isClosed() {
return closed;
}

@Override
public String toString() {
return "MetricGroup{"
+ "groupName="
+ getGroupName()
+ ", metrics="
+ String.join(",", metrics.keySet())
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.metrics;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;

/** Core of Paimon metrics system. */
public class Metrics {
Expand Down Expand Up @@ -53,4 +54,10 @@ public void removeGroup(AbstractMetricGroup group) {
public ConcurrentLinkedQueue<MetricGroup> getMetricGroups() {
return metricGroups;
}

public static String groupsInfo() {
return getInstance().getMetricGroups().stream()
.map(Object::toString)
.collect(Collectors.joining(", ", "[", "]"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.metrics.commit;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.metrics.AbstractMetricGroup;
import org.apache.paimon.metrics.DescriptiveStatisticsHistogram;
import org.apache.paimon.metrics.Histogram;
import org.apache.paimon.metrics.groups.GenericMetricGroup;

/** Metrics to measure a commit. */
public class CommitMetrics {
private static final int HISTOGRAM_WINDOW_SIZE = 10_000;
protected static final String GROUP_NAME = "commit";

private final AbstractMetricGroup genericMetricGroup;

public CommitMetrics(String tableName) {
this.genericMetricGroup =
GenericMetricGroup.createGenericMetricGroup(tableName, GROUP_NAME);
registerGenericCommitMetrics();
}

@VisibleForTesting
public AbstractMetricGroup getMetricGroup() {
return genericMetricGroup;
}

private final Histogram durationHistogram =
new DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE);

private CommitStats latestCommit;

@VisibleForTesting static final String LAST_COMMIT_DURATION = "lastCommitDuration";
@VisibleForTesting static final String COMMIT_DURATION = "commitDuration";
@VisibleForTesting static final String LAST_COMMIT_ATTEMPTS = "lastCommitAttempts";
@VisibleForTesting static final String LAST_TABLE_FILES_ADDED = "lastTableFilesAdded";
@VisibleForTesting static final String LAST_TABLE_FILES_DELETED = "lastTableFilesDeleted";
@VisibleForTesting static final String LAST_TABLE_FILES_APPENDED = "lastTableFilesAppended";

@VisibleForTesting
static final String LAST_TABLE_FILES_COMMIT_COMPACTED = "lastTableFilesCommitCompacted";

@VisibleForTesting
static final String LAST_CHANGELOG_FILES_APPENDED = "lastChangelogFilesAppended";

@VisibleForTesting
static final String LAST_CHANGELOG_FILES_COMMIT_COMPACTED = "lastChangelogFileCommitCompacted";

@VisibleForTesting static final String LAST_GENERATED_SNAPSHOTS = "lastGeneratedSnapshots";
@VisibleForTesting static final String LAST_DELTA_RECORDS_APPENDED = "lastDeltaRecordsAppended";

@VisibleForTesting
static final String LAST_CHANGELOG_RECORDS_APPENDED = "lastChangelogRecordsAppended";

@VisibleForTesting
static final String LAST_DELTA_RECORDS_COMMIT_COMPACTED = "lastDeltaRecordsCommitCompacted";

@VisibleForTesting
static final String LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED =
"lastChangelogRecordsCommitCompacted";

@VisibleForTesting static final String LAST_PARTITIONS_WRITTEN = "lastPartitionsWritten";
@VisibleForTesting static final String LAST_BUCKETS_WRITTEN = "lastBucketsWritten";

private void registerGenericCommitMetrics() {
genericMetricGroup.gauge(
LAST_COMMIT_DURATION, () -> latestCommit == null ? 0L : latestCommit.getDuration());
genericMetricGroup.gauge(
LAST_COMMIT_ATTEMPTS, () -> latestCommit == null ? 0L : latestCommit.getAttempts());
genericMetricGroup.gauge(
LAST_GENERATED_SNAPSHOTS,
() -> latestCommit == null ? 0L : latestCommit.getGeneratedSnapshots());
genericMetricGroup.gauge(
LAST_PARTITIONS_WRITTEN,
() -> latestCommit == null ? 0L : latestCommit.getNumPartitionsWritten());
genericMetricGroup.gauge(
LAST_BUCKETS_WRITTEN,
() -> latestCommit == null ? 0L : latestCommit.getNumBucketsWritten());
genericMetricGroup.histogram(COMMIT_DURATION, durationHistogram);
genericMetricGroup.gauge(
LAST_TABLE_FILES_ADDED,
() -> latestCommit == null ? 0L : latestCommit.getTableFilesAdded());
genericMetricGroup.gauge(
LAST_TABLE_FILES_DELETED,
() -> latestCommit == null ? 0L : latestCommit.getTableFilesDeleted());
genericMetricGroup.gauge(
LAST_TABLE_FILES_APPENDED,
() -> latestCommit == null ? 0L : latestCommit.getTableFilesAppended());
genericMetricGroup.gauge(
LAST_TABLE_FILES_COMMIT_COMPACTED,
() -> latestCommit == null ? 0L : latestCommit.getTableFilesCompacted());
genericMetricGroup.gauge(
LAST_CHANGELOG_FILES_APPENDED,
() -> latestCommit == null ? 0L : latestCommit.getChangelogFilesAppended());
genericMetricGroup.gauge(
LAST_CHANGELOG_FILES_COMMIT_COMPACTED,
() -> latestCommit == null ? 0L : latestCommit.getChangelogFilesCompacted());
genericMetricGroup.gauge(
LAST_DELTA_RECORDS_APPENDED,
() -> latestCommit == null ? 0L : latestCommit.getDeltaRecordsAppended());
genericMetricGroup.gauge(
LAST_CHANGELOG_RECORDS_APPENDED,
() -> latestCommit == null ? 0L : latestCommit.getChangelogRecordsAppended());
genericMetricGroup.gauge(
LAST_DELTA_RECORDS_COMMIT_COMPACTED,
() -> latestCommit == null ? 0L : latestCommit.getDeltaRecordsCompacted());
genericMetricGroup.gauge(
LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED,
() -> latestCommit == null ? 0L : latestCommit.getChangelogRecordsCompacted());
}

public void reportCommit(CommitStats commitStats) {
latestCommit = commitStats;
durationHistogram.update(commitStats.getDuration());
}

public void close() {
this.genericMetricGroup.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.metrics.commit;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/** Statistics for a commit. */
public class CommitStats {
private final long duration;
private final int attempts;
private final long tableFilesAdded;
private final long tableFilesAppended;
private final long tableFilesDeleted;
private final long changelogFilesAppended;
private final long changelogFilesCompacted;
private final long changelogRecordsCompacted;

private final long deltaRecordsCompacted;
private final long changelogRecordsAppended;
private final long deltaRecordsAppended;
private final long tableFilesCompacted;
private final long generatedSnapshots;
private final long numPartitionsWritten;
private final long numBucketsWritten;

public CommitStats(
List<ManifestEntry> appendTableFiles,
List<ManifestEntry> appendChangelogFiles,
List<ManifestEntry> compactTableFiles,
List<ManifestEntry> compactChangelogFiles,
long commitDuration,
int generatedSnapshots,
int attempts) {
List<ManifestEntry> addedTableFiles = new ArrayList<>(appendTableFiles);
addedTableFiles.addAll(
compactTableFiles.stream()
.filter(f -> FileKind.ADD.equals(f.kind()))
.collect(Collectors.toList()));
List<ManifestEntry> deletedTableFiles =
compactTableFiles.stream()
.filter(f -> FileKind.DELETE.equals(f.kind()))
.collect(Collectors.toList());

this.tableFilesAdded = addedTableFiles.size();
this.tableFilesAppended = appendTableFiles.size();
this.tableFilesDeleted = deletedTableFiles.size();
this.tableFilesCompacted = compactTableFiles.size();
this.changelogFilesAppended = appendChangelogFiles.size();
this.changelogFilesCompacted = compactChangelogFiles.size();
this.numPartitionsWritten = numChangedPartitions(appendTableFiles, compactTableFiles);
this.numBucketsWritten = numChangedBuckets(appendTableFiles, compactTableFiles);
this.changelogRecordsCompacted = getRowCounts(compactChangelogFiles);
this.deltaRecordsCompacted = getRowCounts(compactTableFiles);
this.changelogRecordsAppended = getRowCounts(appendChangelogFiles);
this.deltaRecordsAppended = getRowCounts(appendTableFiles);
this.duration = commitDuration;
this.generatedSnapshots = generatedSnapshots;
this.attempts = attempts;
}

@VisibleForTesting
protected static long numChangedPartitions(List<ManifestEntry>... changes) {
return Arrays.stream(changes)
.flatMap(Collection::stream)
.map(ManifestEntry::partition)
.distinct()
.count();
}

@VisibleForTesting
protected static long numChangedBuckets(List<ManifestEntry>... changes) {
return changedPartBuckets(changes).values().stream().mapToLong(Set::size).sum();
}

@VisibleForTesting
protected static List<BinaryRow> changedPartitions(List<ManifestEntry>... changes) {
return Arrays.stream(changes)
.flatMap(Collection::stream)
.map(ManifestEntry::partition)
.distinct()
.collect(Collectors.toList());
}

@VisibleForTesting
protected static Map<BinaryRow, Set<Integer>> changedPartBuckets(
List<ManifestEntry>... changes) {
Map<BinaryRow, Set<Integer>> changedPartBuckets = new LinkedHashMap<>();
Arrays.stream(changes)
.flatMap(Collection::stream)
.forEach(
entry ->
changedPartBuckets
.computeIfAbsent(
entry.partition(), k -> new LinkedHashSet<>())
.add(entry.bucket()));
return changedPartBuckets;
}

private long getRowCounts(List<ManifestEntry> files) {
return files.stream().mapToLong(file -> file.file().rowCount()).sum();
}

@VisibleForTesting
protected long getTableFilesAdded() {
return tableFilesAdded;
}

@VisibleForTesting
protected long getTableFilesDeleted() {
return tableFilesDeleted;
}

@VisibleForTesting
protected long getTableFilesAppended() {
return tableFilesAppended;
}

@VisibleForTesting
protected long getTableFilesCompacted() {
return tableFilesCompacted;
}

@VisibleForTesting
protected long getChangelogFilesAppended() {
return changelogFilesAppended;
}

@VisibleForTesting
protected long getChangelogFilesCompacted() {
return changelogFilesCompacted;
}

@VisibleForTesting
protected long getGeneratedSnapshots() {
return generatedSnapshots;
}

@VisibleForTesting
protected long getDeltaRecordsAppended() {
return deltaRecordsAppended;
}

@VisibleForTesting
protected long getChangelogRecordsAppended() {
return changelogRecordsAppended;
}

@VisibleForTesting
protected long getDeltaRecordsCompacted() {
return deltaRecordsCompacted;
}

@VisibleForTesting
protected long getChangelogRecordsCompacted() {
return changelogRecordsCompacted;
}

@VisibleForTesting
protected long getNumPartitionsWritten() {
return numPartitionsWritten;
}

@VisibleForTesting
protected long getNumBucketsWritten() {
return numBucketsWritten;
}

@VisibleForTesting
protected long getDuration() {
return duration;
}

@VisibleForTesting
protected int getAttempts() {
return attempts;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ public class GenericMetricGroup extends AbstractMetricGroup {
}

public static GenericMetricGroup createGenericMetricGroup(
final String table, final String groupName) {
final String tableName, final String groupName) {
Map<String, String> tags = new HashMap<>();
tags.put("table", table);
tags.put("table", tableName);
return new GenericMetricGroup(tags, groupName);
}

Expand Down
Loading

0 comments on commit dbec0c3

Please sign in to comment.