Skip to content

Commit

Permalink
[Core] Only create commit metrics from TableCommitImpl and won't clos…
Browse files Browse the repository at this point in the history
…e metrics group from other commit objs
  • Loading branch information
schnappi17 committed Oct 10, 2023
1 parent 7330c2a commit 45ea034
Show file tree
Hide file tree
Showing 15 changed files with 41 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.paimon.metrics.DescriptiveStatisticsHistogram;
import org.apache.paimon.metrics.Histogram;
import org.apache.paimon.metrics.groups.GenericMetricGroup;
import org.apache.paimon.utils.FileStorePathFactory;

/** Metrics to measure a commit. */
public class CommitMetrics {
Expand All @@ -32,10 +31,8 @@ public class CommitMetrics {

private final AbstractMetricGroup genericMetricGroup;

public CommitMetrics(FileStorePathFactory pathFactory) {
this.genericMetricGroup =
GenericMetricGroup.createGenericMetricGroup(
pathFactory.root().getName(), GROUP_NAME);
public CommitMetrics(String table) {
this.genericMetricGroup = GenericMetricGroup.createGenericMetricGroup(table, GROUP_NAME);
registerGenericCommitMetrics();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.Snapshot;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.metrics.commit.CommitMetrics;
import org.apache.paimon.table.sink.CommitMessage;

import java.util.List;
Expand Down Expand Up @@ -79,6 +80,6 @@ void overwrite(
/** Abort an unsuccessful commit. The data files will be deleted. */
void abort(List<CommitMessage> commitMessages);

/** Close the commit. */
void close();
/** With metrics to measure commits. */
FileStoreCommit withMetrics(CommitMetrics metrics);
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
@Nullable private Lock lock;
private boolean ignoreEmptyCommit;

private final CommitMetrics commitMetrics;
private CommitMetrics commitMetrics;

public FileStoreCommitImpl(
FileIO fileIO,
Expand Down Expand Up @@ -153,7 +153,7 @@ public FileStoreCommitImpl(

this.lock = null;
this.ignoreEmptyCommit = true;
this.commitMetrics = new CommitMetrics(pathFactory);
this.commitMetrics = null;
}

@Override
Expand Down Expand Up @@ -281,14 +281,16 @@ public void commit(ManifestCommittable committable, Map<String, String> properti
}
} finally {
long commitDuration = (System.nanoTime() - started) / 1_000_000;
reportCommit(
appendTableFiles,
appendChangelog,
compactTableFiles,
compactChangelog,
commitDuration,
generatedSnapshot,
attempts);
if (this.commitMetrics != null) {
reportCommit(
appendTableFiles,
appendChangelog,
compactTableFiles,
compactChangelog,
commitDuration,
generatedSnapshot,
attempts);
}
}
}

Expand Down Expand Up @@ -476,8 +478,9 @@ public void abort(List<CommitMessage> commitMessages) {
}

@Override
public void close() {
commitMetrics.getMetricGroup().close();
public FileStoreCommit withMetrics(CommitMetrics metrics) {
this.commitMetrics = metrics;
return this;
}

private void collectChanges(
Expand Down Expand Up @@ -1002,10 +1005,6 @@ private void cleanUpTmpManifests(
}
}

public CommitMetrics getCommitMetrics() {
return commitMetrics;
}

private static class LevelIdentifier {

private final BinaryRow partition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ private void doExpire(LocalDateTime expireDateTime, long commitIdentifier) {
if (expired.size() > 0) {
commit.dropPartitions(expired, commitIdentifier);
}
commit.close();
}

private Map<String, String> toPartitionString(Object[] array) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,8 @@ public TableCommitImpl newCommit(String commitUser) {
catalogEnvironment.lockFactory().create(),
CoreOptions.fromMap(options()).consumerExpireTime(),
new ConsumerManager(fileIO, path),
coreOptions().snapshotExpireExecutionMode());
coreOptions().snapshotExpireExecutionMode(),
name());
}

private List<CommitCallback> createCommitCallbacks() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.metrics.commit.CommitMetrics;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.FileStoreExpire;
import org.apache.paimon.operation.Lock;
Expand Down Expand Up @@ -75,6 +76,7 @@ public class TableCommitImpl implements InnerTableCommit {

private ExecutorService expireMainExecutor;
private AtomicReference<Throwable> expireError;
private final CommitMetrics commitMetrics;

public TableCommitImpl(
FileStoreCommit commit,
Expand All @@ -85,8 +87,10 @@ public TableCommitImpl(
Lock lock,
@Nullable Duration consumerExpireTime,
ConsumerManager consumerManager,
ExpireExecutionMode expireExecutionMode) {
commit.withLock(lock);
ExpireExecutionMode expireExecutionMode,
String table) {
this.commitMetrics = new CommitMetrics(table);
commit.withLock(lock).withMetrics(commitMetrics);
if (expire != null) {
expire.withLock(lock);
}
Expand Down Expand Up @@ -261,7 +265,7 @@ public void close() throws Exception {
}
IOUtils.closeQuietly(lock);
expireMainExecutor.shutdownNow();
commit.close();
commitMetrics.getMetricGroup().close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ public Snapshot dropPartitions(List<Map<String, String>> partitions) {
snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
}
commit.dropPartitions(partitions, Long.MAX_VALUE);
commit.close();

Long snapshotIdAfterCommit = snapshotManager.latestSnapshotId();
assertThat(snapshotIdAfterCommit).isNotNull();
Expand Down Expand Up @@ -300,7 +299,6 @@ public List<Snapshot> commitDataImpl(
snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
}
commitFunction.accept(commit, committable);
commit.close();

Long snapshotIdAfterCommit = snapshotManager.latestSnapshotId();
if (snapshotIdAfterCommit == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,13 @@

package org.apache.paimon.metrics.commit;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.metrics.Gauge;
import org.apache.paimon.metrics.Histogram;
import org.apache.paimon.metrics.Metric;
import org.apache.paimon.metrics.MetricGroup;
import org.apache.paimon.metrics.Metrics;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -278,13 +273,6 @@ private void reportAgain(CommitMetrics commitMetrics) {
}

private CommitMetrics getCommitMetrics() {
Path path = new Path(tempDir.toString(), TABLE_NAME);
FileStorePathFactory pathFactory =
new FileStorePathFactory(
path,
RowType.of(new IntType()),
"default",
CoreOptions.FILE_FORMAT.defaultValue().toString());
return new CommitMetrics(pathFactory);
return new CommitMetrics(TABLE_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ public void testMultiPartitions() throws Exception {
partitionSpec.put("hr", "8");
commit.overwrite(
partitionSpec, new ManifestCommittable(commitIdentifier++), Collections.emptyMap());
commit.close();
// step 4: generate snapshot 4 by cleaning dt=0402/hr=12/bucket-0
BinaryRow partition = partitions.get(7);
cleanBucket(store, partition, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ public static void commitData(
}

commit.commit(committable, Collections.emptyMap());
commit.close();

writers.values().stream()
.flatMap(m -> m.values().stream())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ public void run() {
throw new RuntimeException(e);
}
}
commit.close();
}

private void doCommit() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,5 @@ public DropPartitionAction(
@Override
public void run() throws Exception {
commit.dropPartitions(partitions, BatchWriteBuilder.COMMIT_IDENTIFIER);
commit.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public String[] call(
(FileStoreTable) catalog.getTable(Identifier.fromString(tableId));
FileStoreCommit commit = fileStoreTable.store().newCommit(UUID.randomUUID().toString());
commit.dropPartitions(getPartitions(partitionStrings), BatchWriteBuilder.COMMIT_IDENTIFIER);
commit.close();

return new String[] {"Success"};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,20 +172,16 @@ public boolean applyDeleteFilters(List<ResolvedExpression> list) {
public Optional<Long> executeDeletion() {
FileStoreCommit commit =
((AbstractFileStoreTable) table).store().newCommit(UUID.randomUUID().toString());
try {
long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER;
if (deletePredicate == null) {
commit.purgeTable(identifier);
return Optional.empty();
} else if (deleteIsDropPartition()) {
commit.dropPartitions(Collections.singletonList(deletePartitions()), identifier);
return Optional.empty();
} else {
return Optional.of(
TableUtils.deleteWhere(table, Collections.singletonList(deletePredicate)));
}
} finally {
commit.close();
long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER;
if (deletePredicate == null) {
commit.purgeTable(identifier);
return Optional.empty();
} else if (deleteIsDropPartition()) {
commit.dropPartitions(Collections.singletonList(deletePartitions()), identifier);
return Optional.empty();
} else {
return Optional.of(
TableUtils.deleteWhere(table, Collections.singletonList(deletePredicate)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ trait PaimonPartitionManagement extends SupportsPartitionManagement {
commit.dropPartitions(
Collections.singletonList(partitionMap),
BatchWriteBuilder.COMMIT_IDENTIFIER)
commit.close();
case _ =>
throw new UnsupportedOperationException(
"Only AbstractFileStoreTable supports drop partitions.")
Expand Down

0 comments on commit 45ea034

Please sign in to comment.