Skip to content

Commit

Permalink
[bug] fix connection leak while lookup join paimon table apache#924
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 committed Jun 13, 2023
1 parent 3e8318a commit 52e2f86
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateFilter;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileIOUtils;
Expand All @@ -51,7 +52,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -117,6 +117,11 @@ public FileStoreLookupFunction(

public void open(FunctionContext context) throws Exception {
String tmpDirectory = getTmpDirectory(context);
open(tmpDirectory);
}

// we tag this method friendly for testing
void open(String tmpDirectory) throws Exception {
this.path = new File(tmpDirectory, "lookup-" + UUID.randomUUID());

Options options = Options.fromMap(table.options());
Expand Down Expand Up @@ -189,12 +194,14 @@ private void checkRefresh() throws Exception {
}

private void refresh() throws Exception {
while (true) {
Iterator<InternalRow> batch = streamingReader.nextBatch();
if (!batch.hasNext()) {
return;
try (RecordReaderIterator<InternalRow> batch =
new RecordReaderIterator<>(streamingReader.getRecordReader())) {
while (true) {
if (!batch.hasNext()) {
return;
}
this.lookupTable.refresh(batch);
}
this.lookupTable.refresh(batch);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateFilter;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.ReadBuilder;
Expand All @@ -33,15 +33,13 @@
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.utils.TypeUtils;

import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.function.IntUnaryOperator;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -94,7 +92,7 @@ public TableStreamingReader(Table table, int[] projection, @Nullable Predicate p
}
}

public Iterator<InternalRow> nextBatch() throws Exception {
public RecordReader<InternalRow> getRecordReader() throws Exception {
try {
return read(scan.plan());
} catch (EndOfScanException e) {
Expand All @@ -103,18 +101,17 @@ public Iterator<InternalRow> nextBatch() throws Exception {
}
}

private Iterator<InternalRow> read(TableScan.Plan plan) throws IOException {
private RecordReader<InternalRow> read(TableScan.Plan plan) throws IOException {
TableRead read = readBuilder.newRead();

List<ConcatRecordReader.ReaderSupplier<InternalRow>> readers = new ArrayList<>();
for (Split split : plan.splits()) {
readers.add(() -> read.createReader(split));
}
Iterator<InternalRow> iterator =
new RecordReaderIterator<>(ConcatRecordReader.create(readers));
RecordReader<InternalRow> reader = ConcatRecordReader.create(readers);
if (recordFilter != null) {
return Iterators.filter(iterator, recordFilter::test);
reader = reader.filter(recordFilter::test);
}
return iterator;
return reader;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.lookup;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.TraceableFileIO;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.UUID;

/** Tests for {@link FileStoreLookupFunction}. */
public class FileStoreLookupFunctionTest {

private static final Random RANDOM = new Random();

private final String commitUser = UUID.randomUUID().toString();
private final TraceableFileIO fileIO = new TraceableFileIO();
private FileStoreLookupFunction fileStoreLookupFunction;
private FileStoreTable fileStoreTable;
@TempDir private Path tempDir;

@Test
public void testLookupScanLeak() throws Exception {
commit(writeCommit(1));
fileStoreLookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 10L)));
Assertions.assertEquals(
TraceableFileIO.openInputStreams(s -> s.toString().contains(tempDir.toString()))
.size(),
0);

commit(writeCommit(10));
fileStoreLookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 10L)));
Assertions.assertEquals(
TraceableFileIO.openInputStreams(s -> s.toString().contains(tempDir.toString()))
.size(),
0);
}

@BeforeEach
public void before() throws Exception {
createFileStoreTable();
fileStoreLookupFunction =
new FileStoreLookupFunction(fileStoreTable, new int[] {0, 1}, new int[] {1}, null);
fileStoreLookupFunction.open(tempDir.toString());
}

private static final RowType ROW_TYPE =
RowType.of(
new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT()},
new String[] {"pt", "k", "v"});

private static Schema schema() {
Options conf = new Options();
conf.set(CoreOptions.BUCKET, 2);
conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
conf.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));

return new Schema(
ROW_TYPE.getFields(),
Collections.singletonList("pt"),
Arrays.asList("pt", "k"),
conf.toMap(),
"");
}

private void commit(List<CommitMessage> messages) {
fileStoreTable.newCommit(commitUser).commit(messages);
}

private List<CommitMessage> writeCommit(int number) throws Exception {
List<CommitMessage> messages = new ArrayList<>();
StreamTableWrite writer = fileStoreTable.newStreamWriteBuilder().newWrite();
for (int i = 0; i < number; i++) {
writer.write(randomRow());
messages.addAll(writer.prepareCommit(true, i));
}
return messages;
}

private InternalRow randomRow() {
return GenericRow.of(RANDOM.nextInt(100), RANDOM.nextInt(100), RANDOM.nextLong());
}

public void createFileStoreTable() throws Exception {
org.apache.paimon.fs.Path path = new org.apache.paimon.fs.Path(tempDir.toString());
SchemaManager schemaManager = new SchemaManager(fileIO, path);
TableSchema tableSchema = schemaManager.createTable(schema());
fileStoreTable =
FileStoreTableFactory.create(
fileIO, new org.apache.paimon.fs.Path(tempDir.toString()), tableSchema);
}
}

0 comments on commit 52e2f86

Please sign in to comment.