From 890d43424b0fc3924bd7636dfb178ea852316944 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=9B=BD=E5=90=9B?= Date: Tue, 10 Oct 2023 15:46:12 +0800 Subject: [PATCH] [Core] Add close() for FileStoreCommit and close all the commit object --- .../paimon/metrics/commit/CommitMetrics.java | 1 + .../paimon/metrics/commit/CommitStats.java | 5 +- .../paimon/operation/FileStoreCommit.java | 3 + .../paimon/operation/FileStoreCommitImpl.java | 156 ++++++++---------- .../paimon/operation/PartitionExpire.java | 1 + .../paimon/table/sink/TableCommitImpl.java | 5 +- .../java/org/apache/paimon/TestFileStore.java | 3 + .../metrics/commit/CommitMetricsTest.java | 3 +- .../paimon/operation/FileStoreTestUtils.java | 1 + .../paimon/operation/TestCommitThread.java | 1 + .../flink/action/DropPartitionAction.java | 1 + .../procedure/DropPartitionProcedure.java | 2 +- .../paimon/flink/sink/FlinkTableSink.java | 24 +-- .../spark/PaimonPartitionManagement.scala | 1 + 14 files changed, 101 insertions(+), 106 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitMetrics.java b/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitMetrics.java index ed9d85796cab..56d10fbfc126 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitMetrics.java +++ b/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitMetrics.java @@ -63,6 +63,7 @@ public AbstractMetricGroup getMetricGroup() { @VisibleForTesting static final String LAST_CHANGELOG_FILES_COMMIT_COMPACTED = "lastChangelogFileCommitCompacted"; + @VisibleForTesting static final String LAST_GENERATED_SNAPSHOTS = "lastGeneratedSnapshots"; @VisibleForTesting static final String LAST_DELTA_RECORDS_APPENDED = "lastDeltaRecordsAppended"; diff --git a/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitStats.java b/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitStats.java index a232f50f73bf..0a2772f633d2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitStats.java +++ b/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitStats.java @@ -98,10 +98,7 @@ protected static long numChangedPartitions(List... changes) { @VisibleForTesting protected static long numChangedBuckets(List... changes) { - return changedPartBuckets(changes).values().stream() - .map(Set::size) - .reduce((a, b) -> a + b) - .orElseGet(() -> 0); + return changedPartBuckets(changes).values().stream().mapToLong(Set::size).sum(); } @VisibleForTesting diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java index a585d681255b..754425b35e0a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java @@ -78,4 +78,7 @@ void overwrite( /** Abort an unsuccessful commit. The data files will be deleted. */ void abort(List commitMessages); + + /** Close the commit. */ + void close(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 3683b5e358ce..ae75d70bb832 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -216,99 +216,80 @@ public void commit(ManifestCommittable committable, Map properti compactTableFiles, compactChangelog, appendIndexFiles); - - if (!ignoreEmptyCommit - || !appendTableFiles.isEmpty() - || !appendChangelog.isEmpty() - || !appendIndexFiles.isEmpty()) { - // Optimization for common path. - // Step 1: - // Read manifest entries from changed partitions here and check for conflicts. - // If there are no other jobs committing at the same time, - // we can skip conflict checking in tryCommit method. - // This optimization is mainly used to decrease the number of times we read from files. - latestSnapshot = snapshotManager.latestSnapshot(); - if (latestSnapshot != null) { - // it is possible that some partitions only have compact changes, - // so we need to contain all changes - baseEntries.addAll( - readAllEntriesFromChangedPartitions( - latestSnapshot, appendTableFiles, compactTableFiles)); - try { + try { + if (!ignoreEmptyCommit + || !appendTableFiles.isEmpty() + || !appendChangelog.isEmpty() + || !appendIndexFiles.isEmpty()) { + // Optimization for common path. + // Step 1: + // Read manifest entries from changed partitions here and check for conflicts. + // If there are no other jobs committing at the same time, + // we can skip conflict checking in tryCommit method. + // This optimization is mainly used to decrease the number of times we read from + // files. + latestSnapshot = snapshotManager.latestSnapshot(); + if (latestSnapshot != null) { + // it is possible that some partitions only have compact changes, + // so we need to contain all changes + baseEntries.addAll( + readAllEntriesFromChangedPartitions( + latestSnapshot, appendTableFiles, compactTableFiles)); noConflictsOrFail(latestSnapshot.commitUser(), baseEntries, appendTableFiles); - } finally { - reportCommit( - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - 0, - 0, - 1); + safeLatestSnapshotId = latestSnapshot.id(); } - safeLatestSnapshotId = latestSnapshot.id(); - } - attempts += - tryCommit( - appendTableFiles, - appendChangelog, - appendIndexFiles, - committable.identifier(), - committable.watermark(), - committable.logOffsets(), - Snapshot.CommitKind.APPEND, - safeLatestSnapshotId); - generatedSnapshot += 1; - } + attempts += + tryCommit( + appendTableFiles, + appendChangelog, + appendIndexFiles, + committable.identifier(), + committable.watermark(), + committable.logOffsets(), + Snapshot.CommitKind.APPEND, + safeLatestSnapshotId); + generatedSnapshot += 1; + } - if (!compactTableFiles.isEmpty() || !compactChangelog.isEmpty()) { - // Optimization for common path. - // Step 2: - // Add appendChanges to the manifest entries read above and check for conflicts. - // If there are no other jobs committing at the same time, - // we can skip conflict checking in tryCommit method. - // This optimization is mainly used to decrease the number of times we read from files. - if (safeLatestSnapshotId != null) { - baseEntries.addAll(appendTableFiles); - try { + if (!compactTableFiles.isEmpty() || !compactChangelog.isEmpty()) { + // Optimization for common path. + // Step 2: + // Add appendChanges to the manifest entries read above and check for conflicts. + // If there are no other jobs committing at the same time, + // we can skip conflict checking in tryCommit method. + // This optimization is mainly used to decrease the number of times we read from + // files. + if (safeLatestSnapshotId != null) { + baseEntries.addAll(appendTableFiles); noConflictsOrFail(latestSnapshot.commitUser(), baseEntries, compactTableFiles); - } finally { - long commitDuration = (System.nanoTime() - started) / 1_000_000; - reportCommit( - appendTableFiles, - appendChangelog, - Collections.emptyList(), - Collections.emptyList(), - commitDuration, - generatedSnapshot, - attempts + 1); + // assume this compact commit follows just after the append commit created above + safeLatestSnapshotId += 1; } - // assume this compact commit follows just after the append commit created above - safeLatestSnapshotId += 1; - } - attempts += - tryCommit( - compactTableFiles, - compactChangelog, - Collections.emptyList(), - committable.identifier(), - committable.watermark(), - committable.logOffsets(), - Snapshot.CommitKind.COMPACT, - safeLatestSnapshotId); - generatedSnapshot += 1; + attempts += + tryCommit( + compactTableFiles, + compactChangelog, + Collections.emptyList(), + committable.identifier(), + committable.watermark(), + committable.logOffsets(), + Snapshot.CommitKind.COMPACT, + safeLatestSnapshotId); + generatedSnapshot += 1; + } + } finally { + long commitDuration = (System.nanoTime() - started) / 1_000_000; + reportCommit( + appendTableFiles, + appendChangelog, + compactTableFiles, + compactChangelog, + commitDuration, + generatedSnapshot, + attempts); } - long commitDuration = (System.nanoTime() - started) / 1_000_000; - reportCommit( - appendTableFiles, - appendChangelog, - compactTableFiles, - compactChangelog, - commitDuration, - generatedSnapshot, - attempts); } private void reportCommit( @@ -494,6 +475,11 @@ public void abort(List commitMessages) { } } + @Override + public void close() { + commitMetrics.getMetricGroup().close(); + } + private void collectChanges( List commitMessages, List appendTableFiles, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java index e20c47eea235..46ffaa56f2fa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java @@ -101,6 +101,7 @@ private void doExpire(LocalDateTime expireDateTime, long commitIdentifier) { if (expired.size() > 0) { commit.dropPartitions(expired, commitIdentifier); } + commit.close(); } private Map toPartitionString(Object[] array) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java index 40cbf5f5fbbf..9bca81b2e6e2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java @@ -22,7 +22,6 @@ import org.apache.paimon.consumer.ConsumerManager; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.operation.FileStoreCommit; -import org.apache.paimon.operation.FileStoreCommitImpl; import org.apache.paimon.operation.FileStoreExpire; import org.apache.paimon.operation.Lock; import org.apache.paimon.operation.PartitionExpire; @@ -262,9 +261,7 @@ public void close() throws Exception { } IOUtils.closeQuietly(lock); expireMainExecutor.shutdownNow(); - if (commit instanceof FileStoreCommitImpl) { - ((FileStoreCommitImpl) commit).getCommitMetrics().getMetricGroup().close(); - } + commit.close(); } @Override diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index 6b3b3f8a0439..6b014da18adb 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -211,6 +211,7 @@ public Snapshot dropPartitions(List> partitions) { snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1; } commit.dropPartitions(partitions, Long.MAX_VALUE); + commit.close(); Long snapshotIdAfterCommit = snapshotManager.latestSnapshotId(); assertThat(snapshotIdAfterCommit).isNotNull(); @@ -299,6 +300,8 @@ public List commitDataImpl( snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1; } commitFunction.accept(commit, committable); + commit.close(); + Long snapshotIdAfterCommit = snapshotManager.latestSnapshotId(); if (snapshotIdAfterCommit == null) { snapshotIdAfterCommit = Snapshot.FIRST_SNAPSHOT_ID - 1; diff --git a/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitMetricsTest.java b/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitMetricsTest.java index 191a23eb8cb2..440c2f6499cb 100644 --- a/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitMetricsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitMetricsTest.java @@ -89,8 +89,7 @@ public void testGenericMetricsRegistration() { /** Tests that the metrics are updated properly. */ @Test public void testMetricsAreUpdated() { - Map registeredGenericMetrics = - commitMetrics.getMetricGroup().getMetrics(); + Map registeredGenericMetrics = commitMetrics.getMetricGroup().getMetrics(); // Check initial values Gauge lastCommitDuration = diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java index e4743f77025e..a31eb712a499 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java @@ -107,6 +107,7 @@ public static void commitData( } commit.commit(committable, Collections.emptyMap()); + commit.close(); writers.values().stream() .flatMap(m -> m.values().stream()) diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java index 872cb554f566..eb4b6589b6b1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java @@ -128,6 +128,7 @@ public void run() { throw new RuntimeException(e); } } + commit.close(); } private void doCommit() throws Exception { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java index a1bd77544e2f..8e446cc8a478 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java @@ -56,5 +56,6 @@ public DropPartitionAction( @Override public void run() throws Exception { commit.dropPartitions(partitions, BatchWriteBuilder.COMMIT_IDENTIFIER); + commit.close(); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java index 26d8f291927f..36f85965ebb8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java @@ -55,7 +55,7 @@ public String[] call( (FileStoreTable) catalog.getTable(Identifier.fromString(tableId)); FileStoreCommit commit = fileStoreTable.store().newCommit(UUID.randomUUID().toString()); commit.dropPartitions(getPartitions(partitionStrings), BatchWriteBuilder.COMMIT_IDENTIFIER); - + commit.close(); return new String[] {"Success"}; } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java index 96044bbf7c30..581f8bcecb3e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java @@ -172,16 +172,20 @@ public boolean applyDeleteFilters(List list) { public Optional executeDeletion() { FileStoreCommit commit = ((AbstractFileStoreTable) table).store().newCommit(UUID.randomUUID().toString()); - long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER; - if (deletePredicate == null) { - commit.purgeTable(identifier); - return Optional.empty(); - } else if (deleteIsDropPartition()) { - commit.dropPartitions(Collections.singletonList(deletePartitions()), identifier); - return Optional.empty(); - } else { - return Optional.of( - TableUtils.deleteWhere(table, Collections.singletonList(deletePredicate))); + try { + long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER; + if (deletePredicate == null) { + commit.purgeTable(identifier); + return Optional.empty(); + } else if (deleteIsDropPartition()) { + commit.dropPartitions(Collections.singletonList(deletePartitions()), identifier); + return Optional.empty(); + } else { + return Optional.of( + TableUtils.deleteWhere(table, Collections.singletonList(deletePredicate))); + } + } finally { + commit.close(); } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala index d4461ff1c394..f12a6dfba419 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala @@ -64,6 +64,7 @@ trait PaimonPartitionManagement extends SupportsPartitionManagement { commit.dropPartitions( Collections.singletonList(partitionMap), BatchWriteBuilder.COMMIT_IDENTIFIER) + commit.close(); case _ => throw new UnsupportedOperationException( "Only AbstractFileStoreTable supports drop partitions.")