diff --git a/paimon-core/src/main/java/org/apache/paimon/metrics/Metrics.java b/paimon-core/src/main/java/org/apache/paimon/metrics/Metrics.java index bbf093c45785..6ef28749bb61 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metrics/Metrics.java +++ b/paimon-core/src/main/java/org/apache/paimon/metrics/Metrics.java @@ -18,8 +18,6 @@ package org.apache.paimon.metrics; -import java.io.PrintWriter; -import java.io.StringWriter; import java.util.concurrent.ConcurrentLinkedQueue; /** Core of Paimon metrics system. */ @@ -43,39 +41,11 @@ public static Metrics getInstance() { * instances. */ public void addGroup(AbstractMetricGroup group) { - try { - throw new Exception(); - } catch (Exception e) { - StringWriter stringWriter = new StringWriter(); - PrintWriter printWriter = new PrintWriter(stringWriter); - e.printStackTrace(printWriter); - String stackTraceAsString = stringWriter.toString(); - System.out.println( - "[gjli] " - + group.toString() - + " addGroup with stack trace: " - + stackTraceAsString); - } metricGroups.add(group); - System.out.println("[gjli] Added current size is " + metricGroups.size()); } /** Remove a metric group. Called when closing the corresponding instances, like committer. */ public void removeGroup(AbstractMetricGroup group) { - try { - throw new Exception(); - } catch (Exception e) { - StringWriter stringWriter = new StringWriter(); - PrintWriter printWriter = new PrintWriter(stringWriter); - e.printStackTrace(printWriter); - String stackTraceAsString = stringWriter.toString(); - System.out.println( - "[gjli] " - + group.toString() - + " removeGroup with stack trace: " - + stackTraceAsString); - } - System.out.println("[gjli] Removed current size is " + metricGroups.size()); metricGroups.remove(group); } diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionITTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionITTest.java index fc6cc92d42a7..f0a13d9fa803 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionITTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionITTest.java @@ -33,6 +33,7 @@ import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.table.sink.StreamTableWrite; +import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.types.DataTypes; import org.apache.paimon.utils.SnapshotManager; @@ -145,8 +146,10 @@ private static Schema schema() { return schemaBuilder.build(); } - private void commit(List messages) { - appendOnlyFileStoreTable.newCommit(commitUser).commit(messages); + private void commit(List messages) throws Exception { + TableCommitImpl commit = appendOnlyFileStoreTable.newCommit(commitUser); + commit.commit(messages); + commit.close(); } private List writeCommit(int number) throws Exception { diff --git a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java index 34e5052a12b9..38f2f70087e4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java @@ -27,6 +27,7 @@ import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.table.sink.StreamTableCommit; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -49,6 +50,11 @@ public void beforeEach() throws Exception { commit = table.newStreamWriteBuilder().withCommitUser(commitUser).newCommit(); } + @AfterEach + public void afterEach() throws Exception { + commit.close(); + } + private HashBucketAssigner createAssigner(int numAssigners, int assignId) { return new HashBucketAssigner( table.snapshotManager(), commitUser, fileHandler, numAssigners, assignId, 5); diff --git a/paimon-core/src/test/java/org/apache/paimon/index/HashIndexMaintainerTest.java b/paimon-core/src/test/java/org/apache/paimon/index/HashIndexMaintainerTest.java index 92915b458c53..af8bc72051bf 100644 --- a/paimon-core/src/test/java/org/apache/paimon/index/HashIndexMaintainerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/index/HashIndexMaintainerTest.java @@ -122,6 +122,7 @@ public void testAssignBucket() throws Exception { .containsExactlyInAnyOrder(-771300025, 1340390384, 1465514398); write.close(); + commit.close(); } @Test @@ -137,5 +138,6 @@ public void testNotCreateNewFile() throws Exception { assertThat(readIndex(commitMessages)).isEmpty(); write.close(); + commit.close(); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/metrics/MetricGroupTest.java b/paimon-core/src/test/java/org/apache/paimon/metrics/MetricGroupTest.java index 4d63058ab39f..07c46afa078a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/metrics/MetricGroupTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/metrics/MetricGroupTest.java @@ -61,12 +61,12 @@ public void testTolerateMetricNameCollisions() { group.close(); } - // @Test - // public void testAddAndRemoveMetricGroups() { - // AbstractMetricGroup metricGroup = - // GenericMetricGroup.createGenericMetricGroup("myTable", "commit"); - // assertThat(Metrics.getInstance().getMetricGroups()).containsExactly(metricGroup); - // metricGroup.close(); - // assertThat(Metrics.getInstance().getMetricGroups()).isEmpty(); - // } + @Test + public void testAddAndRemoveMetricGroups() { + AbstractMetricGroup metricGroup = + GenericMetricGroup.createGenericMetricGroup("myTable", "commit"); + assertThat(Metrics.getInstance().getMetricGroups()).contains(metricGroup); + metricGroup.close(); + assertThat(Metrics.getInstance().getMetricGroups()).doesNotContain(metricGroup); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitMetricsTest.java b/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitMetricsTest.java index 99ee12d03d9f..b2599238aae3 100644 --- a/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitMetricsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitMetricsTest.java @@ -24,6 +24,7 @@ import org.apache.paimon.metrics.Histogram; import org.apache.paimon.metrics.Metric; import org.apache.paimon.metrics.MetricGroup; +import org.apache.paimon.metrics.Metrics; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -57,7 +58,7 @@ public void afterEach() { @Test public void testGenericMetricsRegistration() { MetricGroup genericMetricGroup = commitMetrics.getMetricGroup(); - // assertThat(Metrics.getInstance().getMetricGroups().size()).isEqualTo(1); + assertThat(Metrics.getInstance().getMetricGroups().size()).isEqualTo(1); assertThat(genericMetricGroup.getGroupName()).isEqualTo(CommitMetrics.GROUP_NAME); Map registeredMetrics = genericMetricGroup.getMetrics(); assertThat(registeredMetrics.keySet()) @@ -80,7 +81,7 @@ public void testGenericMetricsRegistration() { CommitMetrics.LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED); reportOnce(commitMetrics); - // assertThat(Metrics.getInstance().getMetricGroups().size()).isEqualTo(1); + assertThat(Metrics.getInstance().getMetricGroups().size()).isEqualTo(1); } /** Tests that the metrics are updated properly. */ diff --git a/paimon-core/src/test/java/org/apache/paimon/metrics/groups/BucketMetricGroupTest.java b/paimon-core/src/test/java/org/apache/paimon/metrics/groups/BucketMetricGroupTest.java index 3977ade03746..ba32302d987a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/metrics/groups/BucketMetricGroupTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/metrics/groups/BucketMetricGroupTest.java @@ -39,5 +39,6 @@ public void testGenerateScopeDefault() { assertThat(group.getAllTags().get("partition")).isEqualTo("dt=1"); assertThat(group.getMetricIdentifier("myMetric", ".")).isEqualTo("commit.myMetric"); assertThat(group.getGroupName()).isEqualTo("commit"); + group.close(); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java index 1b143150ec43..19018d93b466 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java @@ -32,6 +32,7 @@ import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.StreamTableCommit; import org.apache.paimon.table.sink.StreamTableWrite; +import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.types.RowType; import org.apache.paimon.types.VarCharType; @@ -183,12 +184,14 @@ public void testFilterCommittedAfterExpiring() throws Exception { options.put(PARTITION_EXPIRATION_CHECK_INTERVAL.key(), "5 s"); options.put(PARTITION_TIMESTAMP_FORMATTER.key(), "yyyyMMdd"); table = table.copy(options); + commit.close(); commit = table.newCommit(commitUser); commit.commit(successCommits - 2, commitMessages.get(successCommits - 2)); notCommitted.remove((long) (successCommits - 2)); Thread.sleep(5000); commit.commit(successCommits - 1, commitMessages.get(successCommits - 1)); notCommitted.remove((long) (successCommits - 1)); + commit.close(); // check whether partition expire is triggered Snapshot snapshot = table.snapshotManager().latestSnapshot(); @@ -217,8 +220,10 @@ private void write(String f0, String f1) throws Exception { StreamTableWrite write = table.copy(Collections.singletonMap(WRITE_ONLY.key(), "true")).newWrite(""); write.write(GenericRow.of(BinaryString.fromString(f0), BinaryString.fromString(f1))); - table.newCommit("").commit(0, write.prepareCommit(true, 0)); + TableCommitImpl commit = table.newCommit(""); + commit.commit(0, write.prepareCommit(true, 0)); write.close(); + commit.close(); } private PartitionExpire newExpire() { diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java index 5c99df4aab9b..8c509d9838cd 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java @@ -134,6 +134,7 @@ public void testSplitOrder() throws Exception { write.write(rowData(3, 33, 303L)); commit.commit(2, write.prepareCommit(true, 2)); write.close(); + commit.close(); List splits = toSplits(table.newSnapshotReader().read().dataSplits()); int[] partitions = @@ -163,6 +164,7 @@ public void testBatchSplitOrderByPartition() throws Exception { commit.commit(2, write.prepareCommit(true, 2)); write.close(); + commit.close(); List splits = toSplits(table.newSnapshotReader().read().dataSplits()); int[] partitions = @@ -205,6 +207,7 @@ public void testStreamingSplitInUnawareBucketMode() throws Exception { assertThat(scan.plan().splits().size()).isEqualTo(3); write.close(); + commit.close(); } @Test @@ -280,6 +283,8 @@ public void testSequentialRead() throws Exception { dataPerBucket.clear(); } commit.commit(0, write.prepareCommit(true, 0)); + write.close(); + commit.close(); int partition = random.nextInt(numOfPartition); List availableBucket = new ArrayList<>(dataset.get(partition).keySet()); @@ -320,6 +325,7 @@ public void testBatchOrderWithCompaction() throws Exception { expected.add(i); } write.close(); + commit.close(); ReadBuilder readBuilder = table.newReadBuilder(); List splits = readBuilder.newScan().plan().splits(); @@ -342,6 +348,7 @@ public void testBatchOrderWithCompaction() throws Exception { expected.add(i); } write.close(); + commit.close(); ReadBuilder readBuilder = table.newReadBuilder(); List splits = readBuilder.newScan().plan().splits(); @@ -375,6 +382,7 @@ private void writeData() throws Exception { commit.commit(2, write.prepareCommit(true, 2)); write.close(); + commit.close(); } @Override diff --git a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogValueCountFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogValueCountFileStoreTableTest.java index 6fd275083a7a..6f396695475f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogValueCountFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogValueCountFileStoreTableTest.java @@ -187,6 +187,7 @@ private void writeData() throws Exception { commit.commit(2, write.prepareCommit(true, 2)); write.close(); + commit.close(); } @Test @@ -200,6 +201,7 @@ public void testChangelogWithoutDataFile() throws Exception { write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 100L)); commit.commit(0, write.prepareCommit(true, 0)); write.close(); + commit.close(); // check that no data file is produced List splits = diff --git a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java index ae5e6c7ef376..89df3a98ea2c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java @@ -151,6 +151,7 @@ public void testSequenceNumber() throws Exception { write.write(rowData(1, 11, 55L)); commit.commit(1, write.prepareCommit(true, 1)); write.close(); + commit.close(); List splits = toSplits(table.newSnapshotReader().read().dataSplits()); TableRead read = table.newRead(); @@ -327,6 +328,7 @@ public void testStreamingInputChangelog() throws Exception { write.write(rowDataWithKind(RowKind.UPDATE_AFTER, 1, 10, 102L)); commit.commit(0, write.prepareCommit(true, 0)); write.close(); + commit.close(); List splits = toSplits( @@ -426,6 +428,9 @@ private void innerTestStreamingFullChangelog(Consumer configure) throws write.compact(binaryRow(2), 0, true); commit.commit(4, write.prepareCommit(true, 4)); + write.close(); + commit.close(); + splits = toSplits( table.newSnapshotReader().withMode(ScanMode.CHANGELOG).read().dataSplits()); @@ -530,6 +535,7 @@ public void testStreamingChangelogCompatibility02() throws Exception { write.write(rowDataWithKind(RowKind.DELETE, 2, 10, 301L)); commit.commit(2, write.prepareCommit(true, 2)); write.close(); + commit.close(); assertNextSnapshot.apply(2); @@ -560,6 +566,7 @@ private void writeData() throws Exception { commit.commit(2, write.prepareCommit(true, 2)); write.close(); + commit.close(); } @Override @@ -624,6 +631,7 @@ public void testReadFilter() throws Exception { commit.commit(3, write.prepareCommit(true, 3)); write.close(); + commit.close(); // cannot push down value filter b = 600L splits = toSplits(table.newSnapshotReader().read().dataSplits()); @@ -657,6 +665,7 @@ public void testPartialUpdateIgnoreDelete() throws Exception { write.write(rowDataWithKind(RowKind.DELETE, 1, 20, 400L)); commit.commit(0, write.prepareCommit(true, 0)); write.close(); + commit.close(); List splits = toSplits(table.newSnapshotReader().read().dataSplits()); TableRead read = table.newRead(); @@ -687,6 +696,7 @@ public void testSlowCommit() throws Exception { commit.commit(2, committables2); write.close(); + commit.close(); List splits = toSplits(table.newSnapshotReader().read().dataSplits()); TableRead read = table.newRead(); @@ -728,6 +738,8 @@ public void testIncrementalSplitOverwrite() throws Exception { assertThat(splits1.get(0).dataFiles()).hasSize(1); assertThat(splits1.get(0).dataFiles().get(0).fileName()) .isNotEqualTo(splits0.get(0).dataFiles().get(0).fileName()); + write.close(); + commit.close(); } @Test @@ -900,6 +912,8 @@ public void testAggMergeFunc() throws Exception { result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString); assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 3]"); + write.close(); + commit.close(); } @Test @@ -962,6 +976,7 @@ public void testFullCompactedRead() throws Exception { commit.commit(3, write.prepareCommit(true, 3)); write.close(); + commit.close(); assertThat( getResult( diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index 37547c0cbde9..a9d3732de8e3 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -326,6 +326,7 @@ public void testOverwrite() throws Exception { write.write(rowData(2, 20, 200L)); commit.commit(0, write.prepareCommit(true, 0)); write.close(); + commit.close(); write = table.newWrite(commitUser).withIgnorePreviousFiles(true); commit = table.newCommit(commitUser); @@ -334,6 +335,7 @@ public void testOverwrite() throws Exception { overwritePartition.put("pt", "2"); commit.withOverwrite(overwritePartition).commit(1, write.prepareCommit(true, 1)); write.close(); + commit.close(); List splits = toSplits(table.newSnapshotReader().read().dataSplits()); TableRead read = table.newRead(); @@ -362,8 +364,10 @@ public void testBucketFilter() throws Exception { write.write(rowData(1, 5, 6L)); write.write(rowData(1, 7, 8L)); write.write(rowData(1, 9, 10L)); - table.newCommit(commitUser).commit(0, write.prepareCommit(true, 0)); + TableCommitImpl commit = table.newCommit(commitUser); + commit.commit(0, write.prepareCommit(true, 0)); write.close(); + commit.close(); List splits = toSplits( @@ -381,12 +385,14 @@ public void testAbort() throws Exception { StreamTableWrite write = table.newWrite(commitUser); write.write(rowData(1, 2, 3L)); List messages = write.prepareCommit(true, 0); - table.newCommit(commitUser).abort(messages); + TableCommitImpl commit = table.newCommit(commitUser); + commit.abort(messages); FileStatus[] files = LocalFileIO.create().listStatus(new Path(tablePath + "/pt=1/bucket-0")); assertThat(files).isEmpty(); write.close(); + commit.close(); } @Test @@ -413,6 +419,7 @@ public void testReadFilter() throws Exception { commit.commit(2, write.prepareCommit(true, 2)); write.close(); + commit.close(); PredicateBuilder builder = new PredicateBuilder(ROW_TYPE); List splits = toSplits(table.newSnapshotReader().read().dataSplits()); @@ -471,6 +478,7 @@ public void testPartitionEmptyWriter() throws Exception { } write.close(); + commit.close(); } @Test @@ -504,6 +512,7 @@ public void testManifestCache() throws Exception { "%s|%s|%s|binary|varbinary|mapKey:mapVal|multiset", i, i + 1, i + 1)); } commit.commit(cnt, write.prepareCommit(false, cnt)); + commit.close(); // check result List result = @@ -532,6 +541,7 @@ public void testWriteWithoutCompactionAndExpiration() throws Exception { commit.commit(i, write.prepareCommit(true, i)); } write.close(); + commit.close(); List files = table.newSnapshotReader().read().dataSplits().stream() @@ -571,6 +581,8 @@ public void testCopyWithLatestSchema() throws Exception { write.write(new JoinedRow(rowData(1, 30, 300L), GenericRow.of(3000))); write.write(new JoinedRow(rowData(1, 40, 400L), GenericRow.of(4000))); commit.commit(1, write.prepareCommit(true, 1)); + write.close(); + commit.close(); List splits = table.newScan().plan().splits(); TableRead read = table.newRead(); @@ -644,6 +656,8 @@ public void testConsumeId() throws Exception { result = getResult(read, scan.plan().splits(), STREAMING_ROW_TO_STRING); assertThat(result) .containsExactlyInAnyOrder("+1|40|400|binary|varbinary|mapKey:mapVal|multiset"); + write.close(); + commit.close(); // expire consumer and then test snapshot expiration Thread.sleep(1000); @@ -666,6 +680,7 @@ public void testConsumeId() throws Exception { OutOfRangeException.class, "The snapshot with id 5 has expired.")); write.close(); + commit.close(); } // All tags are after the rollback snapshot @@ -848,6 +863,7 @@ private FileStoreTable prepareRollbackTable(int commitTimes) throws Exception { commit.commit(i, write.prepareCommit(false, i)); } write.close(); + commit.close(); return table; } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java index 3d7b061185d8..7fc7c76cada1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java @@ -221,6 +221,9 @@ private void prepareExpireTable() throws Exception { // commit bucket 2 only write.write(createRow(2, 2, 5, 5)); commit.commit(5, write.prepareCommit(true, 5)); + + write.close(); + commit.close(); } private void checkIndexFiles(long snapshotId) { diff --git a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java index d3f4ddb86af6..f5874bed7eac 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java @@ -181,6 +181,7 @@ public static void writeAndCheckFileResult( write.write(rowData("S006", 2, 16, "S16", 116L, "S116")); commit.commit(0, write.prepareCommit(true, 0)); write.close(); + commit.close(); R result = firstChecker.apply(tableSchemas); /** @@ -219,6 +220,7 @@ public static void writeAndCheckFileResult( write.write(rowData(1, 22, 122L, 1122, "S012", "S22")); commit.commit(0, write.prepareCommit(true, 0)); write.close(); + commit.close(); secondChecker.accept(result, tableSchemas); } @@ -306,6 +308,8 @@ public static void writeAndCheckFileResultForColumnType( toBytes("310"))); commit.commit(0, write.prepareCommit(true, 0)); write.close(); + commit.close(); + R result = firstChecker.apply(tableSchemas); /** @@ -400,6 +404,7 @@ public static void writeAndCheckFileResultForColumnType( toBytes("610"))); commit.commit(1, write.prepareCommit(true, 1)); write.close(); + commit.close(); secondChecker.accept(result, tableSchemas); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java index a5f4cb64504c..bdf1c1f8042d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java @@ -33,6 +33,7 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.sink.StreamTableWrite; +import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.table.source.InnerTableRead; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.snapshot.SnapshotReader; @@ -216,8 +217,10 @@ public void testAddField() throws Exception { StreamTableWrite write = table.newWrite(commitUser); write.write(GenericRow.of(1, 1L)); write.write(GenericRow.of(2, 2L)); - table.newCommit(commitUser).commit(0, write.prepareCommit(true, 0)); + TableCommitImpl commit = table.newCommit(commitUser); + commit.commit(0, write.prepareCommit(true, 0)); write.close(); + commit.close(); schemaManager.commitChanges( Collections.singletonList(SchemaChange.addColumn("f3", DataTypes.BIGINT()))); @@ -226,8 +229,10 @@ public void testAddField() throws Exception { write = table.newWrite(commitUser); write.write(GenericRow.of(3, 3L, 3L)); write.write(GenericRow.of(4, 4L, 4L)); - table.newCommit(commitUser).commit(1, write.prepareCommit(true, 1)); + commit = table.newCommit(commitUser); + commit.commit(1, write.prepareCommit(true, 1)); write.close(); + commit.close(); // read all List rows = readRecords(table, null); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java index 8bd03ca7f76d..0e597b81549c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java @@ -153,7 +153,9 @@ public void createTableDefault() throws Exception { } protected void commitDefault(List messages) throws Exception { - getTableDefault().newBatchWriteBuilder().newCommit().commit(messages); + BatchTableCommit commit = getTableDefault().newBatchWriteBuilder().newCommit(); + commit.commit(messages); + commit.close(); } protected List writeDataDefault(int size, int times) throws Exception { diff --git a/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java b/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java index 638ed504b5aa..60cae358f863 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java @@ -76,6 +76,7 @@ private void testWritePreemptMemory(boolean singlePartition) throws Exception { } commit.commit(0, write.prepareCommit(true, 0)); write.close(); + commit.close(); // read List splits = toSplits(table.newSnapshotReader().read().dataSplits()); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/StreamTableScanTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/StreamTableScanTest.java index d7eedc7a7773..8456ba2ca65d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/StreamTableScanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/StreamTableScanTest.java @@ -254,6 +254,9 @@ public void testStartingFromNonExistingSnapshot() throws Exception { StreamTableScan scan = table.newReadBuilder().newStreamScan(); TableScan.Plan plan = scan.plan(); assertThat(plan.splits()).isEmpty(); + + write.close(); + commit.close(); } @Test diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanListPartitionsTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanListPartitionsTest.java index 0356fd3e1f11..19c2c0b3f269 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanListPartitionsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanListPartitionsTest.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.table.sink.BatchTableWrite; import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.table.source.snapshot.ScannerTestBase; import org.junit.jupiter.api.Test; @@ -46,7 +47,8 @@ public void testListPartitions() throws Exception { write.write(row); } List result = write.prepareCommit(); - table.newCommit(commitUser).commit(result); + TableCommitImpl commit = table.newCommit(commitUser); + commit.commit(result); AtomicInteger ai = new AtomicInteger(0); @@ -58,5 +60,6 @@ public void testListPartitions() throws Exception { for (BinaryRow row : rows) { assertThat(row.getInt(0)).isEqualTo(ai.getAndIncrement()); } + commit.close(); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/BoundedWatermarkCheckerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/BoundedWatermarkCheckerTest.java index 3fc7ea97180a..eb0ce01ae3cb 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/BoundedWatermarkCheckerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/BoundedWatermarkCheckerTest.java @@ -31,7 +31,7 @@ public class BoundedWatermarkCheckerTest extends ScannerTestBase { @Test - public void testBounded() { + public void testBounded() throws Exception { SnapshotManager snapshotManager = table.snapshotManager(); TableCommitImpl commit = table.newCommit(commitUser).ignoreEmptyCommit(false); BoundedChecker checker = BoundedChecker.watermark(2000L); @@ -47,5 +47,6 @@ public void testBounded() { commit.commit(new ManifestCommittable(0, 2001L)); snapshot = snapshotManager.latestSnapshot(); assertThat(checker.shouldEndInput(snapshot)).isTrue(); + commit.close(); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java index 843f74a1a39b..ac2e21118f52 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java @@ -75,6 +75,8 @@ public void testDefaultValue() throws Exception { List result = getResult(read, plan.splits()); assertThat(result).hasSameElementsAs(Arrays.asList("+I 2|11|200", "+I 2|12|100")); } + write.close(); + commit.close(); } protected FileStoreTable createFileStoreTable() throws Exception { diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java index fe734c8d9d04..2d7b771e0c29 100644 --- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java @@ -47,7 +47,7 @@ public class TagAutoCreationTest extends PrimaryKeyTableTestBase { @Test - public void testTag() { + public void testTag() throws Exception { Options options = new Options(); options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK); options.set(TAG_CREATION_PERIOD, TagCreationPeriod.HOURLY); @@ -81,6 +81,8 @@ public void testTag() { Options expireSetting = new Options(); expireSetting.set(SNAPSHOT_NUM_RETAINED_MIN, 1); expireSetting.set(SNAPSHOT_NUM_RETAINED_MAX, 1); + commit.close(); + commit = table.copy(expireSetting.toMap()).newCommit(commitUser).ignoreEmptyCommit(false); // trigger snapshot expiration @@ -91,10 +93,11 @@ public void testTag() { commit.commit(new ManifestCommittable(9, utcMills("2023-07-18T16:00:00"))); assertThat(tagManager.tags().values()) .containsOnly("2023-07-18 13", "2023-07-18 14", "2023-07-18 15"); + commit.close(); } @Test - public void testTagDelay() { + public void testTagDelay() throws Exception { Options options = new Options(); options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK); options.set(TAG_CREATION_PERIOD, TagCreationPeriod.HOURLY); @@ -114,10 +117,11 @@ public void testTagDelay() { // test create commit.commit(new ManifestCommittable(0, utcMills("2023-07-18T13:00:10"))); assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11", "2023-07-18 12"); + commit.close(); } @Test - public void testTagSinkWatermark() { + public void testTagSinkWatermark() throws Exception { Options options = new Options(); options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK); options.set(TAG_CREATION_PERIOD, TagCreationPeriod.HOURLY); @@ -133,10 +137,11 @@ public void testTagSinkWatermark() { // test second create commit.commit(new ManifestCommittable(0, localZoneMills("2023-07-18T13:00:10"))); assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11", "2023-07-18 12"); + commit.close(); } @Test - public void testTagTwoHour() { + public void testTagTwoHour() throws Exception { Options options = new Options(); options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK); options.set(TAG_CREATION_PERIOD, TagCreationPeriod.TWO_HOURS); @@ -155,6 +160,7 @@ public void testTagTwoHour() { // test second create commit.commit(new ManifestCommittable(0, utcMills("2023-07-18T14:00:09"))); assertThat(tagManager.tags().values()).containsOnly("2023-07-18 10", "2023-07-18 12"); + commit.close(); } @Test @@ -180,6 +186,7 @@ public void testTagDaily() throws Exception { commit.commit(new ManifestCommittable(0, utcMills("2023-07-20T12:00:01"))); assertThat(tagManager.tags().values()) .containsOnly("2023-07-17", "2023-07-18", "2023-07-19"); + commit.close(); } private long utcMills(String timestamp) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java index 492d98434061..9cda919d6d0b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java @@ -125,6 +125,8 @@ public void testBatchCompact(String mode) throws Exception { Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); assertThat(snapshot.id()).isEqualTo(2); assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND); + write.close(); + commit.close(); } } @@ -198,6 +200,8 @@ public void testStreamingCompact(String mode) throws Exception { StreamTableScan scan = table.newReadBuilder().newStreamScan(); TableScan.Plan plan = scan.plan(); assertThat(plan.splits()).isEmpty(); + write.close(); + commit.close(); } } @@ -275,6 +279,8 @@ public void testStreamingCompact(String mode) throws Exception { Duration.ofSeconds(100), String.format( "Cannot validate snapshot expiration in %s milliseconds.", 60_000)); + write.close(); + commit.close(); } // In combined mode, check whether newly created table can be detected @@ -310,6 +316,8 @@ public void testStreamingCompact(String mode) throws Exception { snapshotManager.snapshot(snapshotManager.latestSnapshotId()); assertThat(snapshot.id()).isEqualTo(1); assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND); + write.close(); + commit.close(); } } @@ -364,6 +372,8 @@ public void testStreamingCompact(String mode) throws Exception { Duration.ofSeconds(100), String.format( "Cannot validate snapshot expiration in %s milliseconds.", 60_000)); + write.close(); + commit.close(); } } } @@ -457,6 +467,8 @@ private void includingAndExcludingTablesImpl( Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); assertThat(snapshot.id()).isEqualTo(2); assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND); + write.close(); + commit.close(); } } @@ -566,6 +578,8 @@ public void testUnawareBucketStreamingCompact() throws Exception { Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); assertThat(snapshot.id()).isEqualTo(2); assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND); + write.close(); + commit.close(); } if (ThreadLocalRandom.current().nextBoolean()) { @@ -592,6 +606,8 @@ public void testUnawareBucketStreamingCompact() throws Exception { // second compaction, snapshot will be 5 checkFileAndRowSize(table, 5L, 30_000L, 1, 9); + write.close(); + commit.close(); } } @@ -633,6 +649,8 @@ public void testUnawareBucketBatchCompact() throws Exception { Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); assertThat(snapshot.id()).isEqualTo(2); assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND); + write.close(); + commit.close(); } if (ThreadLocalRandom.current().nextBoolean()) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java index 104962f3a56e..4cda130ad36e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java @@ -30,6 +30,7 @@ import org.apache.paimon.table.ChangelogWithKeyFileStoreTable; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.sink.BatchTableWrite; import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.sink.CommitMessage; @@ -227,7 +228,9 @@ private List writeData(int size) throws Exception { } private void commit(List messages) throws Exception { - getTable().newBatchWriteBuilder().newCommit().commit(messages); + BatchTableCommit commit = getTable().newBatchWriteBuilder().newCommit(); + commit.commit(messages); + commit.close(); } private void createTable() throws Exception { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java index 00bdbecae78d..75c0a199962f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java @@ -31,6 +31,7 @@ import org.apache.paimon.schema.Schema; import org.apache.paimon.table.AppendOnlyFileStoreTable; import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.sink.BatchTableWrite; import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.sink.CommitMessage; @@ -280,7 +281,9 @@ private Identifier identifier() { } private void commit(List messages) throws Exception { - getTable().newBatchWriteBuilder().newCommit().commit(messages); + BatchTableCommit commit = getTable().newBatchWriteBuilder().newCommit(); + commit.commit(messages); + commit.close(); } // schema with all the basic types. diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java index 5c666c7fe4b9..8ff6c00895a0 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java @@ -32,6 +32,7 @@ import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.StreamTableWrite; +import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; @@ -136,8 +137,10 @@ public void testLookupExpiredSnapshot() throws Exception { fileStoreLookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 10L))); } - private void commit(List messages) { - fileStoreTable.newCommit(commitUser).commit(messages); + private void commit(List messages) throws Exception { + TableCommitImpl commit = fileStoreTable.newCommit(commitUser); + commit.commit(messages); + commit.close(); } private List writeCommit(int number) throws Exception { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java index f68c00b11d98..60839805f8a4 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java @@ -81,6 +81,7 @@ public void testAutoTagForSavepoint() throws Exception { // notify checkpoint success and tag for savepoint-2 testHarness.notifyOfCompletedCheckpoint(checkpointId); + testHarness.close(); assertThat(table.snapshotManager().snapshotCount()).isEqualTo(3); assertThat(table.tagManager().tagCount()).isEqualTo(1); @@ -116,6 +117,7 @@ public void testRestore() throws Exception { .getJobManagerOwnedState(); assertThat(table.snapshotManager().latestSnapshot()).isNull(); assertThat(table.tagManager().tagCount()).isEqualTo(0); + testHarness.close(); testHarness = createRecoverableTestHarness(table); try { @@ -131,6 +133,8 @@ public void testRestore() throws Exception { + "By restarting the job we hope that " + "writers can start writing based on these new commits."); } + testHarness.close(); + Snapshot snapshot = table.snapshotManager().latestSnapshot(); assertThat(snapshot).isNotNull(); assertThat(snapshot.id()).isEqualTo(checkpointId); @@ -170,6 +174,8 @@ public void testAbortSavepointAndCleanTag() throws Exception { // abort savepoint 1 testHarness.getOneInputOperator().notifyCheckpointAborted(1); + testHarness.close(); + assertThat(table.snapshotManager().snapshotCount()).isEqualTo(2); assertThat(table.tagManager().tagCount()).isEqualTo(0); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java index 447d7972d8da..c5703991f79a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java @@ -91,6 +91,7 @@ public void testFailIntentionallyAfterRestore() throws Exception { // checkpoint is completed but not notified, so no snapshot is committed OperatorSubtaskState snapshot = testHarness.snapshot(0, timestamp++); assertThat(table.snapshotManager().latestSnapshotId()).isNull(); + testHarness.close(); testHarness = createRecoverableTestHarness(table); try { @@ -107,12 +108,14 @@ public void testFailIntentionallyAfterRestore() throws Exception { + "writers can start writing based on these new commits."); } assertResults(table, "1, 10", "2, 20"); + testHarness.close(); // snapshot is successfully committed, no failure is needed testHarness = createRecoverableTestHarness(table); testHarness.initializeState(snapshot); testHarness.open(); assertResults(table, "1, 10", "2, 20"); + testHarness.close(); } @Test @@ -145,6 +148,7 @@ public void testCheckpointAbort() throws Exception { // should create 10 snapshots assertThat(snapshotManager.latestSnapshotId()).isEqualTo(cpId); + testHarness.close(); } // ------------------------------------------------------------------------ @@ -227,6 +231,7 @@ public void testRestoreCommitUser() throws Exception { OperatorSubtaskState snapshot = writeAndSnapshot(table, commitUser, timestamp, ++checkpoint, testHarness); operatorSubtaskStates.add(snapshot); + testHarness.close(); } // 2. Clearing redundant union list state @@ -239,6 +244,7 @@ public void testRestoreCommitUser() throws Exception { testHarness.initializeState(operatorSubtaskState); OperatorSubtaskState snapshot = writeAndSnapshot(table, initialCommitUser, timestamp, ++checkpoint, testHarness); + testHarness.close(); // 3. Check whether success List actual = new ArrayList<>(); @@ -260,6 +266,7 @@ public void testRestoreCommitUser() throws Exception { OneInputStreamOperatorTestHarness testHarness1 = createTestHarness(operator); testHarness1.initializeState(snapshot); + testHarness1.close(); Assertions.assertThat(actual.size()).isEqualTo(1); @@ -316,6 +323,7 @@ public void testWatermarkCommit() throws Exception { testHarness.processWatermark(new Watermark(Long.MAX_VALUE)); testHarness.snapshot(cpId, timestamp++); testHarness.notifyOfCompletedCheckpoint(cpId); + testHarness.close(); assertThat(table.snapshotManager().latestSnapshot().watermark()).isEqualTo(1024L); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java index 806cec8f301d..6cb37cd996cf 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java @@ -159,6 +159,7 @@ firstTable, new Committable(8, Committable.Kind.FILE, committable)), // checkpoint is completed but not notified, so no snapshot is committed OperatorSubtaskState snapshot = testHarness.snapshot(0, timestamp++); assertThat(table.snapshotManager().latestSnapshotId()).isNull(); + testHarness.close(); testHarness = createRecoverableTestHarness(); try { @@ -175,6 +176,7 @@ firstTable, new Committable(8, Committable.Kind.FILE, committable)), + "writers can start writing based on these new commits."); } assertResultsForFirstTable(table, "1, 10", "2, 20"); + testHarness.close(); // snapshot is successfully committed, no failure is needed testHarness = createRecoverableTestHarness(); @@ -200,6 +202,7 @@ secondTable, new Committable(9, Committable.Kind.FILE, committable)), // checkpoint is completed but not notified, so no snapshot is committed snapshot = testHarness.snapshot(1, timestamp++); assertThat(table.snapshotManager().latestSnapshotId()).isNull(); + testHarness.close(); testHarness = createRecoverableTestHarness(); try { @@ -216,12 +219,14 @@ secondTable, new Committable(9, Committable.Kind.FILE, committable)), + "writers can start writing based on these new commits."); } assertResultsForSecondTable(table, "3, 30.0, s3", "4, 40.0, s4"); + testHarness.close(); // snapshot is successfully committed, no failure is needed testHarness = createRecoverableTestHarness(); testHarness.initializeState(snapshot); testHarness.open(); assertResultsForSecondTable(table, "3, 30.0, s3", "4, 40.0, s4"); + testHarness.close(); } @Test @@ -300,6 +305,7 @@ public void testCheckpointAbort() throws Exception { assertThat(snapshotManager1.latestSnapshotId()).isEqualTo(20); // should create 10 snapshots for second table assertThat(snapshotManager2.latestSnapshotId()).isEqualTo(10); + testHarness.close(); } // ------------------------------------------------------------------------ @@ -445,6 +451,7 @@ public void testWatermarkCommit() throws Exception { testHarness.processWatermark(new Watermark(2048)); testHarness.snapshot(cpId, timestamp++); testHarness.notifyOfCompletedCheckpoint(cpId); + testHarness.close(); assertThat(table1.snapshotManager().latestSnapshot().watermark()).isEqualTo(2048L); assertThat(table1.snapshotManager().latestSnapshot().watermark()).isEqualTo(2048L); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java index 1a7127305a19..6b998c43ee84 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java @@ -301,6 +301,8 @@ public void testStreamingRead(boolean defaultOptions) throws Exception { rowData(2, 1520, 15, BinaryString.fromString("20221209")), rowData(1, 1510, 16, BinaryString.fromString("20221208")), rowData(1, 1511, 15, BinaryString.fromString("20221209"))); + write.close(); + commit.close(); } actual.clear(); for (int i = 0; i < 8; i++) { @@ -340,6 +342,8 @@ public void testStreamingRead(boolean defaultOptions) throws Exception { 0, rowData(2, 1520, 15, BinaryString.fromString("20221209")), rowData(1, 1510, 16, BinaryString.fromString("20221208"))); + write.close(); + commit.close(); } } actual.clear(); @@ -466,6 +470,8 @@ public void testIncludeAndExcludeTableRead(boolean defaultOptions) throws Except 0, rowData(2, 1520, 15, BinaryString.fromString("20221209")), rowData(1, 1510, 16, BinaryString.fromString("20221208"))); + write.close(); + commit.close(); } } actual.clear(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java index 10a7f7364d37..d91fe9b499b8 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java @@ -25,6 +25,7 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.sink.BatchTableWrite; import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.source.Split; @@ -92,8 +93,10 @@ private void writeToTable(int a, int b, int c) throws Exception { BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); BatchTableWrite write = writeBuilder.newWrite(); write.write(GenericRow.of(a, b, c)); - writeBuilder.newCommit().commit(write.prepareCommit()); + BatchTableCommit commit = writeBuilder.newCommit(); + commit.commit(write.prepareCommit()); write.close(); + commit.close(); } private List> readSplit(Split split) throws IOException { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java index 56bfb53bd632..4ba24ba1efca 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java @@ -151,6 +151,7 @@ protected FileStoreTable writeData() throws Exception { commit.commit(2, write.prepareCommit(true, 2)); write.close(); + commit.close(); return table; } diff --git a/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java b/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java index 4f4e87f179bc..1ff31dc46347 100644 --- a/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java +++ b/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java @@ -172,6 +172,7 @@ private String writeData(Table table, String path, List data) throw "CREATE EXTERNAL TABLE " + tableName + " ", "STORED BY '" + PaimonStorageHandler.class.getName() + "'", "LOCATION '" + path + "'"))); + commit.close(); return tableName; } diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java index 9156e6d0feae..6a29b0c762f0 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java @@ -182,6 +182,7 @@ private String writeData(Table table, String path, List data) throw commit.commit(commitIdentifier, write.prepareCommit(true, commitIdentifier)); commitIdentifier++; write.close(); + commit.close(); String tableName = "test_table_" + (UUID.randomUUID().toString().substring(0, 4)); hiveShell.execute( @@ -646,6 +647,7 @@ public void testInsertAllSupportedTypes() throws Exception { } commit.commit(0, write.prepareCommit(true, 0)); write.close(); + commit.close(); hiveShell.execute( String.join( diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java index 17bb98bc87e6..c88c62afd864 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java @@ -723,6 +723,7 @@ private void writeData(Table table, List data) throws Exception { commit.commit(commitIdentifier, write.prepareCommit(true, commitIdentifier)); commitIdentifier++; write.close(); + commit.close(); } @Test @@ -757,6 +758,7 @@ public void testReadAllSupportedTypes() throws Exception { } commit.commit(0, write.prepareCommit(true, 0)); write.close(); + commit.close(); createExternalTable(); List actual = @@ -865,7 +867,7 @@ public void testPredicatePushDown() throws Exception { write.write(GenericRow.of(6)); commit.commit(3, write.prepareCommit(true, 3)); write.close(); - + commit.close(); hiveShell.execute( String.join( "\n", @@ -952,6 +954,7 @@ public void testDateAndTimestamp() throws Exception { LocalDateTime.of(2022, 6, 18, 8, 30, 0, 100_000_000)))); commit.commit(2, write.prepareCommit(true, 2)); write.close(); + commit.close(); createExternalTable(); assertThat( @@ -1006,6 +1009,7 @@ public void testTime() throws Exception { commit.commit(4, write.prepareCommit(true, 3)); write.close(); + commit.close(); createExternalTable(); @@ -1096,6 +1100,7 @@ public void testMapKey() throws Exception { new GenericMap(varcharMap))); commit.commit(0, write.prepareCommit(true, 0)); write.close(); + commit.close(); createExternalTable(); diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java index e95ce3f9ebe9..ab37478768da 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java @@ -82,6 +82,9 @@ public void testPk() throws Exception { write.write(GenericRow.ofKind(RowKind.DELETE, 2L, BinaryString.fromString("Hello"))); commit.commit(0, write.prepareCommit(true, 0)); + write.close(); + commit.close(); + PaimonRecordReader reader = read(table, BinaryRow.EMPTY_ROW, 0); RowDataContainer container = reader.createValue(); Set actual = new HashSet<>(); @@ -123,6 +126,9 @@ public void testValueCount() throws Exception { write.write(GenericRow.of(1, BinaryString.fromString("Hi"))); commit.commit(0, write.prepareCommit(true, 0)); + write.close(); + commit.close(); + PaimonRecordReader reader = read(table, BinaryRow.EMPTY_ROW, 0); RowDataContainer container = reader.createValue(); Map actual = new HashMap<>(); @@ -162,6 +168,9 @@ public void testProjectionPushdown() throws Exception { write.write(GenericRow.of(1, 10L, BinaryString.fromString("Hi"))); commit.commit(0, write.prepareCommit(true, 0)); + write.close(); + commit.close(); + PaimonRecordReader reader = read(table, BinaryRow.EMPTY_ROW, 0, Arrays.asList("c", "a")); RowDataContainer container = reader.createValue(); Map actual = new HashMap<>(); diff --git a/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SimpleTableTestHelper.java b/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SimpleTableTestHelper.java index 829f4fc00c4d..e38860aee0f1 100644 --- a/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SimpleTableTestHelper.java +++ b/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SimpleTableTestHelper.java @@ -77,4 +77,9 @@ public void commit() throws Exception { commit.commit(commitIdentifier, writer.prepareCommit(true, commitIdentifier)); commitIdentifier++; } + + public void close() throws Exception { + writer.close(); + commit.close(); + } } diff --git a/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkReadITCase.java b/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkReadITCase.java index b374966a8abc..85de8baa0ccb 100644 --- a/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkReadITCase.java +++ b/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkReadITCase.java @@ -70,6 +70,7 @@ public static void startSpark() throws Exception { testHelper1.write(GenericRow.of(5, 6L, BinaryString.fromString("3"))); testHelper1.write(GenericRow.ofKind(RowKind.DELETE, 3, 4L, BinaryString.fromString("2"))); testHelper1.commit(); + testHelper1.close(); tablePath2 = new Path(warehousePath, "default.db/t2"); SimpleTableTestHelper testHelper2 = createTestHelper(tablePath2); @@ -79,6 +80,7 @@ public static void startSpark() throws Exception { testHelper2.write(GenericRow.of(5, 6L, BinaryString.fromString("3"))); testHelper2.write(GenericRow.of(7, 8L, BinaryString.fromString("4"))); testHelper2.commit(); + testHelper2.close(); } private static SimpleTableTestHelper createTestHelper(Path tablePath) throws Exception { diff --git a/paimon-spark/paimon-spark-3.1/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java b/paimon-spark/paimon-spark-3.1/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java index 31fae2ebe1e6..f632998c59cd 100644 --- a/paimon-spark/paimon-spark-3.1/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java +++ b/paimon-spark/paimon-spark-3.1/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java @@ -114,5 +114,6 @@ private static void writeTable(String tableName, GenericRow... rows) throws Exce } commit.commit(writer.prepareCommit()); writer.close(); + commit.close(); } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala index d16ef746daec..1770dc9650fc 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala @@ -140,15 +140,17 @@ case class WriteIntoPaimonTable( .collect() .map(deserializeCommitMessage(serializer, _)) + val tableCommit = if (overwritePartition == null) { + writeBuilder.newCommit() + } else { + writeBuilder.withOverwrite(overwritePartition.asJava).newCommit() + } try { - val tableCommit = if (overwritePartition == null) { - writeBuilder.newCommit() - } else { - writeBuilder.withOverwrite(overwritePartition.asJava).newCommit() - } tableCommit.commit(commitMessages.toList.asJava) } catch { case e: Throwable => throw new RuntimeException(e); + } finally { + tableCommit.close(); } Seq.empty diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java index fb7f151eb86c..1ebbe4068b05 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java @@ -184,6 +184,8 @@ protected static void writeTable(String tableName, GenericRow... rows) throws Ex } long commitIdentifier = COMMIT_IDENTIFIER.getAndIncrement(); commit.commit(commitIdentifier, writer.prepareCommit(true, commitIdentifier)); + writer.close(); + commit.close(); } protected static void writeTable(String tableName, String... values) {