Skip to content

Commit

Permalink
[Core] Add close() for FileStoreCommit and close all the commit object
Browse files Browse the repository at this point in the history
  • Loading branch information
schnappi17 committed Oct 10, 2023
1 parent 896530f commit 890d434
Show file tree
Hide file tree
Showing 14 changed files with 101 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public AbstractMetricGroup getMetricGroup() {

@VisibleForTesting
static final String LAST_CHANGELOG_FILES_COMMIT_COMPACTED = "lastChangelogFileCommitCompacted";

@VisibleForTesting static final String LAST_GENERATED_SNAPSHOTS = "lastGeneratedSnapshots";
@VisibleForTesting static final String LAST_DELTA_RECORDS_APPENDED = "lastDeltaRecordsAppended";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,7 @@ protected static long numChangedPartitions(List<ManifestEntry>... changes) {

@VisibleForTesting
protected static long numChangedBuckets(List<ManifestEntry>... changes) {
return changedPartBuckets(changes).values().stream()
.map(Set::size)
.reduce((a, b) -> a + b)
.orElseGet(() -> 0);
return changedPartBuckets(changes).values().stream().mapToLong(Set::size).sum();
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,7 @@ void overwrite(

/** Abort an unsuccessful commit. The data files will be deleted. */
void abort(List<CommitMessage> commitMessages);

/** Close the commit. */
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,99 +216,80 @@ public void commit(ManifestCommittable committable, Map<String, String> properti
compactTableFiles,
compactChangelog,
appendIndexFiles);

if (!ignoreEmptyCommit
|| !appendTableFiles.isEmpty()
|| !appendChangelog.isEmpty()
|| !appendIndexFiles.isEmpty()) {
// Optimization for common path.
// Step 1:
// Read manifest entries from changed partitions here and check for conflicts.
// If there are no other jobs committing at the same time,
// we can skip conflict checking in tryCommit method.
// This optimization is mainly used to decrease the number of times we read from files.
latestSnapshot = snapshotManager.latestSnapshot();
if (latestSnapshot != null) {
// it is possible that some partitions only have compact changes,
// so we need to contain all changes
baseEntries.addAll(
readAllEntriesFromChangedPartitions(
latestSnapshot, appendTableFiles, compactTableFiles));
try {
try {
if (!ignoreEmptyCommit
|| !appendTableFiles.isEmpty()
|| !appendChangelog.isEmpty()
|| !appendIndexFiles.isEmpty()) {
// Optimization for common path.
// Step 1:
// Read manifest entries from changed partitions here and check for conflicts.
// If there are no other jobs committing at the same time,
// we can skip conflict checking in tryCommit method.
// This optimization is mainly used to decrease the number of times we read from
// files.
latestSnapshot = snapshotManager.latestSnapshot();
if (latestSnapshot != null) {
// it is possible that some partitions only have compact changes,
// so we need to contain all changes
baseEntries.addAll(
readAllEntriesFromChangedPartitions(
latestSnapshot, appendTableFiles, compactTableFiles));
noConflictsOrFail(latestSnapshot.commitUser(), baseEntries, appendTableFiles);
} finally {
reportCommit(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
0,
0,
1);
safeLatestSnapshotId = latestSnapshot.id();
}
safeLatestSnapshotId = latestSnapshot.id();
}

attempts +=
tryCommit(
appendTableFiles,
appendChangelog,
appendIndexFiles,
committable.identifier(),
committable.watermark(),
committable.logOffsets(),
Snapshot.CommitKind.APPEND,
safeLatestSnapshotId);
generatedSnapshot += 1;
}
attempts +=
tryCommit(
appendTableFiles,
appendChangelog,
appendIndexFiles,
committable.identifier(),
committable.watermark(),
committable.logOffsets(),
Snapshot.CommitKind.APPEND,
safeLatestSnapshotId);
generatedSnapshot += 1;
}

if (!compactTableFiles.isEmpty() || !compactChangelog.isEmpty()) {
// Optimization for common path.
// Step 2:
// Add appendChanges to the manifest entries read above and check for conflicts.
// If there are no other jobs committing at the same time,
// we can skip conflict checking in tryCommit method.
// This optimization is mainly used to decrease the number of times we read from files.
if (safeLatestSnapshotId != null) {
baseEntries.addAll(appendTableFiles);
try {
if (!compactTableFiles.isEmpty() || !compactChangelog.isEmpty()) {
// Optimization for common path.
// Step 2:
// Add appendChanges to the manifest entries read above and check for conflicts.
// If there are no other jobs committing at the same time,
// we can skip conflict checking in tryCommit method.
// This optimization is mainly used to decrease the number of times we read from
// files.
if (safeLatestSnapshotId != null) {
baseEntries.addAll(appendTableFiles);
noConflictsOrFail(latestSnapshot.commitUser(), baseEntries, compactTableFiles);
} finally {
long commitDuration = (System.nanoTime() - started) / 1_000_000;
reportCommit(
appendTableFiles,
appendChangelog,
Collections.emptyList(),
Collections.emptyList(),
commitDuration,
generatedSnapshot,
attempts + 1);
// assume this compact commit follows just after the append commit created above
safeLatestSnapshotId += 1;
}
// assume this compact commit follows just after the append commit created above
safeLatestSnapshotId += 1;
}

attempts +=
tryCommit(
compactTableFiles,
compactChangelog,
Collections.emptyList(),
committable.identifier(),
committable.watermark(),
committable.logOffsets(),
Snapshot.CommitKind.COMPACT,
safeLatestSnapshotId);
generatedSnapshot += 1;
attempts +=
tryCommit(
compactTableFiles,
compactChangelog,
Collections.emptyList(),
committable.identifier(),
committable.watermark(),
committable.logOffsets(),
Snapshot.CommitKind.COMPACT,
safeLatestSnapshotId);
generatedSnapshot += 1;
}
} finally {
long commitDuration = (System.nanoTime() - started) / 1_000_000;
reportCommit(
appendTableFiles,
appendChangelog,
compactTableFiles,
compactChangelog,
commitDuration,
generatedSnapshot,
attempts);
}
long commitDuration = (System.nanoTime() - started) / 1_000_000;
reportCommit(
appendTableFiles,
appendChangelog,
compactTableFiles,
compactChangelog,
commitDuration,
generatedSnapshot,
attempts);
}

private void reportCommit(
Expand Down Expand Up @@ -494,6 +475,11 @@ public void abort(List<CommitMessage> commitMessages) {
}
}

@Override
public void close() {
commitMetrics.getMetricGroup().close();
}

private void collectChanges(
List<CommitMessage> commitMessages,
List<ManifestEntry> appendTableFiles,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ private void doExpire(LocalDateTime expireDateTime, long commitIdentifier) {
if (expired.size() > 0) {
commit.dropPartitions(expired, commitIdentifier);
}
commit.close();
}

private Map<String, String> toPartitionString(Object[] array) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.FileStoreCommitImpl;
import org.apache.paimon.operation.FileStoreExpire;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.operation.PartitionExpire;
Expand Down Expand Up @@ -262,9 +261,7 @@ public void close() throws Exception {
}
IOUtils.closeQuietly(lock);
expireMainExecutor.shutdownNow();
if (commit instanceof FileStoreCommitImpl) {
((FileStoreCommitImpl) commit).getCommitMetrics().getMetricGroup().close();
}
commit.close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ public Snapshot dropPartitions(List<Map<String, String>> partitions) {
snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
}
commit.dropPartitions(partitions, Long.MAX_VALUE);
commit.close();

Long snapshotIdAfterCommit = snapshotManager.latestSnapshotId();
assertThat(snapshotIdAfterCommit).isNotNull();
Expand Down Expand Up @@ -299,6 +300,8 @@ public List<Snapshot> commitDataImpl(
snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
}
commitFunction.accept(commit, committable);
commit.close();

Long snapshotIdAfterCommit = snapshotManager.latestSnapshotId();
if (snapshotIdAfterCommit == null) {
snapshotIdAfterCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ public void testGenericMetricsRegistration() {
/** Tests that the metrics are updated properly. */
@Test
public void testMetricsAreUpdated() {
Map<String, Metric> registeredGenericMetrics =
commitMetrics.getMetricGroup().getMetrics();
Map<String, Metric> registeredGenericMetrics = commitMetrics.getMetricGroup().getMetrics();

// Check initial values
Gauge<Long> lastCommitDuration =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public static void commitData(
}

commit.commit(committable, Collections.emptyMap());
commit.close();

writers.values().stream()
.flatMap(m -> m.values().stream())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public void run() {
throw new RuntimeException(e);
}
}
commit.close();
}

private void doCommit() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,6 @@ public DropPartitionAction(
@Override
public void run() throws Exception {
commit.dropPartitions(partitions, BatchWriteBuilder.COMMIT_IDENTIFIER);
commit.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public String[] call(
(FileStoreTable) catalog.getTable(Identifier.fromString(tableId));
FileStoreCommit commit = fileStoreTable.store().newCommit(UUID.randomUUID().toString());
commit.dropPartitions(getPartitions(partitionStrings), BatchWriteBuilder.COMMIT_IDENTIFIER);

commit.close();
return new String[] {"Success"};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,16 +172,20 @@ public boolean applyDeleteFilters(List<ResolvedExpression> list) {
public Optional<Long> executeDeletion() {
FileStoreCommit commit =
((AbstractFileStoreTable) table).store().newCommit(UUID.randomUUID().toString());
long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER;
if (deletePredicate == null) {
commit.purgeTable(identifier);
return Optional.empty();
} else if (deleteIsDropPartition()) {
commit.dropPartitions(Collections.singletonList(deletePartitions()), identifier);
return Optional.empty();
} else {
return Optional.of(
TableUtils.deleteWhere(table, Collections.singletonList(deletePredicate)));
try {
long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER;
if (deletePredicate == null) {
commit.purgeTable(identifier);
return Optional.empty();
} else if (deleteIsDropPartition()) {
commit.dropPartitions(Collections.singletonList(deletePartitions()), identifier);
return Optional.empty();
} else {
return Optional.of(
TableUtils.deleteWhere(table, Collections.singletonList(deletePredicate)));
}
} finally {
commit.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ trait PaimonPartitionManagement extends SupportsPartitionManagement {
commit.dropPartitions(
Collections.singletonList(partitionMap),
BatchWriteBuilder.COMMIT_IDENTIFIER)
commit.close();
case _ =>
throw new UnsupportedOperationException(
"Only AbstractFileStoreTable supports drop partitions.")
Expand Down

0 comments on commit 890d434

Please sign in to comment.