Skip to content

Commit

Permalink
resolve flink test commit close
Browse files Browse the repository at this point in the history
  • Loading branch information
schnappi17 committed Oct 18, 2023
1 parent 1a6a9bb commit d8885b0
Show file tree
Hide file tree
Showing 41 changed files with 213 additions and 65 deletions.
30 changes: 0 additions & 30 deletions paimon-core/src/main/java/org/apache/paimon/metrics/Metrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.paimon.metrics;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.concurrent.ConcurrentLinkedQueue;

/** Core of Paimon metrics system. */
Expand All @@ -43,39 +41,11 @@ public static Metrics getInstance() {
* instances.
*/
public void addGroup(AbstractMetricGroup group) {
try {
throw new Exception();
} catch (Exception e) {
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter);
e.printStackTrace(printWriter);
String stackTraceAsString = stringWriter.toString();
System.out.println(
"[gjli] "
+ group.toString()
+ " addGroup with stack trace: "
+ stackTraceAsString);
}
metricGroups.add(group);
System.out.println("[gjli] Added current size is " + metricGroups.size());
}

/** Remove a metric group. Called when closing the corresponding instances, like committer. */
public void removeGroup(AbstractMetricGroup group) {
try {
throw new Exception();
} catch (Exception e) {
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter);
e.printStackTrace(printWriter);
String stackTraceAsString = stringWriter.toString();
System.out.println(
"[gjli] "
+ group.toString()
+ " removeGroup with stack trace: "
+ stackTraceAsString);
}
System.out.println("[gjli] Removed current size is " + metricGroups.size());
metricGroups.remove(group);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.SnapshotManager;

Expand Down Expand Up @@ -145,8 +146,10 @@ private static Schema schema() {
return schemaBuilder.build();
}

private void commit(List<CommitMessage> messages) {
appendOnlyFileStoreTable.newCommit(commitUser).commit(messages);
private void commit(List<CommitMessage> messages) throws Exception {
TableCommitImpl commit = appendOnlyFileStoreTable.newCommit(commitUser);
commit.commit(messages);
commit.close();
}

private List<CommitMessage> writeCommit(int number) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.StreamTableCommit;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand All @@ -49,6 +50,11 @@ public void beforeEach() throws Exception {
commit = table.newStreamWriteBuilder().withCommitUser(commitUser).newCommit();
}

@AfterEach
public void afterEach() throws Exception {
commit.close();
}

private HashBucketAssigner createAssigner(int numAssigners, int assignId) {
return new HashBucketAssigner(
table.snapshotManager(), commitUser, fileHandler, numAssigners, assignId, 5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public void testAssignBucket() throws Exception {
.containsExactlyInAnyOrder(-771300025, 1340390384, 1465514398);

write.close();
commit.close();
}

@Test
Expand All @@ -137,5 +138,6 @@ public void testNotCreateNewFile() throws Exception {
assertThat(readIndex(commitMessages)).isEmpty();

write.close();
commit.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ public void testTolerateMetricNameCollisions() {
group.close();
}

// @Test
// public void testAddAndRemoveMetricGroups() {
// AbstractMetricGroup metricGroup =
// GenericMetricGroup.createGenericMetricGroup("myTable", "commit");
// assertThat(Metrics.getInstance().getMetricGroups()).containsExactly(metricGroup);
// metricGroup.close();
// assertThat(Metrics.getInstance().getMetricGroups()).isEmpty();
// }
@Test
public void testAddAndRemoveMetricGroups() {
AbstractMetricGroup metricGroup =
GenericMetricGroup.createGenericMetricGroup("myTable", "commit");
assertThat(Metrics.getInstance().getMetricGroups()).contains(metricGroup);
metricGroup.close();
assertThat(Metrics.getInstance().getMetricGroups()).doesNotContain(metricGroup);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.metrics.Histogram;
import org.apache.paimon.metrics.Metric;
import org.apache.paimon.metrics.MetricGroup;
import org.apache.paimon.metrics.Metrics;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -57,7 +58,7 @@ public void afterEach() {
@Test
public void testGenericMetricsRegistration() {
MetricGroup genericMetricGroup = commitMetrics.getMetricGroup();
// assertThat(Metrics.getInstance().getMetricGroups().size()).isEqualTo(1);
assertThat(Metrics.getInstance().getMetricGroups().size()).isEqualTo(1);
assertThat(genericMetricGroup.getGroupName()).isEqualTo(CommitMetrics.GROUP_NAME);
Map<String, Metric> registeredMetrics = genericMetricGroup.getMetrics();
assertThat(registeredMetrics.keySet())
Expand All @@ -80,7 +81,7 @@ public void testGenericMetricsRegistration() {
CommitMetrics.LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED);

reportOnce(commitMetrics);
// assertThat(Metrics.getInstance().getMetricGroups().size()).isEqualTo(1);
assertThat(Metrics.getInstance().getMetricGroups().size()).isEqualTo(1);
}

/** Tests that the metrics are updated properly. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,6 @@ public void testGenerateScopeDefault() {
assertThat(group.getAllTags().get("partition")).isEqualTo("dt=1");
assertThat(group.getMetricIdentifier("myMetric", ".")).isEqualTo("commit.myMetric");
assertThat(group.getGroupName()).isEqualTo("commit");
group.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;

Expand Down Expand Up @@ -183,12 +184,14 @@ public void testFilterCommittedAfterExpiring() throws Exception {
options.put(PARTITION_EXPIRATION_CHECK_INTERVAL.key(), "5 s");
options.put(PARTITION_TIMESTAMP_FORMATTER.key(), "yyyyMMdd");
table = table.copy(options);
commit.close();
commit = table.newCommit(commitUser);
commit.commit(successCommits - 2, commitMessages.get(successCommits - 2));
notCommitted.remove((long) (successCommits - 2));
Thread.sleep(5000);
commit.commit(successCommits - 1, commitMessages.get(successCommits - 1));
notCommitted.remove((long) (successCommits - 1));
commit.close();

// check whether partition expire is triggered
Snapshot snapshot = table.snapshotManager().latestSnapshot();
Expand Down Expand Up @@ -217,8 +220,10 @@ private void write(String f0, String f1) throws Exception {
StreamTableWrite write =
table.copy(Collections.singletonMap(WRITE_ONLY.key(), "true")).newWrite("");
write.write(GenericRow.of(BinaryString.fromString(f0), BinaryString.fromString(f1)));
table.newCommit("").commit(0, write.prepareCommit(true, 0));
TableCommitImpl commit = table.newCommit("");
commit.commit(0, write.prepareCommit(true, 0));
write.close();
commit.close();
}

private PartitionExpire newExpire() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public void testSplitOrder() throws Exception {
write.write(rowData(3, 33, 303L));
commit.commit(2, write.prepareCommit(true, 2));
write.close();
commit.close();

List<Split> splits = toSplits(table.newSnapshotReader().read().dataSplits());
int[] partitions =
Expand Down Expand Up @@ -163,6 +164,7 @@ public void testBatchSplitOrderByPartition() throws Exception {
commit.commit(2, write.prepareCommit(true, 2));

write.close();
commit.close();

List<Split> splits = toSplits(table.newSnapshotReader().read().dataSplits());
int[] partitions =
Expand Down Expand Up @@ -205,6 +207,7 @@ public void testStreamingSplitInUnawareBucketMode() throws Exception {
assertThat(scan.plan().splits().size()).isEqualTo(3);

write.close();
commit.close();
}

@Test
Expand Down Expand Up @@ -280,6 +283,8 @@ public void testSequentialRead() throws Exception {
dataPerBucket.clear();
}
commit.commit(0, write.prepareCommit(true, 0));
write.close();
commit.close();

int partition = random.nextInt(numOfPartition);
List<Integer> availableBucket = new ArrayList<>(dataset.get(partition).keySet());
Expand Down Expand Up @@ -320,6 +325,7 @@ public void testBatchOrderWithCompaction() throws Exception {
expected.add(i);
}
write.close();
commit.close();

ReadBuilder readBuilder = table.newReadBuilder();
List<Split> splits = readBuilder.newScan().plan().splits();
Expand All @@ -342,6 +348,7 @@ public void testBatchOrderWithCompaction() throws Exception {
expected.add(i);
}
write.close();
commit.close();

ReadBuilder readBuilder = table.newReadBuilder();
List<Split> splits = readBuilder.newScan().plan().splits();
Expand Down Expand Up @@ -375,6 +382,7 @@ private void writeData() throws Exception {
commit.commit(2, write.prepareCommit(true, 2));

write.close();
commit.close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ private void writeData() throws Exception {
commit.commit(2, write.prepareCommit(true, 2));

write.close();
commit.close();
}

@Test
Expand All @@ -200,6 +201,7 @@ public void testChangelogWithoutDataFile() throws Exception {
write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 100L));
commit.commit(0, write.prepareCommit(true, 0));
write.close();
commit.close();

// check that no data file is produced
List<Split> splits =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ public void testSequenceNumber() throws Exception {
write.write(rowData(1, 11, 55L));
commit.commit(1, write.prepareCommit(true, 1));
write.close();
commit.close();

List<Split> splits = toSplits(table.newSnapshotReader().read().dataSplits());
TableRead read = table.newRead();
Expand Down Expand Up @@ -327,6 +328,7 @@ public void testStreamingInputChangelog() throws Exception {
write.write(rowDataWithKind(RowKind.UPDATE_AFTER, 1, 10, 102L));
commit.commit(0, write.prepareCommit(true, 0));
write.close();
commit.close();

List<Split> splits =
toSplits(
Expand Down Expand Up @@ -426,6 +428,9 @@ private void innerTestStreamingFullChangelog(Consumer<Options> configure) throws
write.compact(binaryRow(2), 0, true);
commit.commit(4, write.prepareCommit(true, 4));

write.close();
commit.close();

splits =
toSplits(
table.newSnapshotReader().withMode(ScanMode.CHANGELOG).read().dataSplits());
Expand Down Expand Up @@ -530,6 +535,7 @@ public void testStreamingChangelogCompatibility02() throws Exception {
write.write(rowDataWithKind(RowKind.DELETE, 2, 10, 301L));
commit.commit(2, write.prepareCommit(true, 2));
write.close();
commit.close();

assertNextSnapshot.apply(2);

Expand Down Expand Up @@ -560,6 +566,7 @@ private void writeData() throws Exception {
commit.commit(2, write.prepareCommit(true, 2));

write.close();
commit.close();
}

@Override
Expand Down Expand Up @@ -624,6 +631,7 @@ public void testReadFilter() throws Exception {
commit.commit(3, write.prepareCommit(true, 3));

write.close();
commit.close();

// cannot push down value filter b = 600L
splits = toSplits(table.newSnapshotReader().read().dataSplits());
Expand Down Expand Up @@ -657,6 +665,7 @@ public void testPartialUpdateIgnoreDelete() throws Exception {
write.write(rowDataWithKind(RowKind.DELETE, 1, 20, 400L));
commit.commit(0, write.prepareCommit(true, 0));
write.close();
commit.close();

List<Split> splits = toSplits(table.newSnapshotReader().read().dataSplits());
TableRead read = table.newRead();
Expand Down Expand Up @@ -687,6 +696,7 @@ public void testSlowCommit() throws Exception {
commit.commit(2, committables2);

write.close();
commit.close();

List<Split> splits = toSplits(table.newSnapshotReader().read().dataSplits());
TableRead read = table.newRead();
Expand Down Expand Up @@ -728,6 +738,8 @@ public void testIncrementalSplitOverwrite() throws Exception {
assertThat(splits1.get(0).dataFiles()).hasSize(1);
assertThat(splits1.get(0).dataFiles().get(0).fileName())
.isNotEqualTo(splits0.get(0).dataFiles().get(0).fileName());
write.close();
commit.close();
}

@Test
Expand Down Expand Up @@ -900,6 +912,8 @@ public void testAggMergeFunc() throws Exception {

result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 3]");
write.close();
commit.close();
}

@Test
Expand Down Expand Up @@ -962,6 +976,7 @@ public void testFullCompactedRead() throws Exception {
commit.commit(3, write.prepareCommit(true, 3));

write.close();
commit.close();

assertThat(
getResult(
Expand Down
Loading

0 comments on commit d8885b0

Please sign in to comment.