Skip to content

Commit

Permalink
PARQUET-2432: Use ByteBufferAllocator over hardcoded heap allocation (#…
Browse files Browse the repository at this point in the history
…1278)

* PARQUET-2432: Use ByteBufferAllocator over hardcoded heap allocation

* Updated BytesInput implementations to rely on a ByteBufferAllocator
  instance for allocating/releasing ByteBuffer objects.
* Extend the usage of a ByteBufferAllocator instead of the hardcoded
  usage of heap (e.g. byte[], ByteBuffer.allocate etc.)
* parquet-cli related code parts including ParquetRewriter and tests
  are not changed in this effort
* Reuse temporary ByteBuffer instead of keep allocating/releasing
  • Loading branch information
gszadovszky authored Feb 23, 2024
1 parent d839608 commit 274dc51
Show file tree
Hide file tree
Showing 34 changed files with 1,549 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* Container which can construct writers for multiple columns to be stored
* together.
*/
public interface ColumnWriteStore {
public interface ColumnWriteStore extends AutoCloseable {
/**
* @param path the column for which to create a writer
* @return the column writer for the given column
Expand Down Expand Up @@ -63,6 +63,7 @@ public interface ColumnWriteStore {
/**
* Close the related output stream and release any resources
*/
@Override
public abstract void close();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
/**
* writer for (repetition level, definition level, values) triplets
*/
public interface ColumnWriter {
public interface ColumnWriter extends AutoCloseable {

/**
* writes the current value
Expand Down Expand Up @@ -91,6 +91,7 @@ public interface ColumnWriter {
* Close the underlying store. This should be called when there are no
* more data to be written.
*/
@Override
void close();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
/**
* Interface to read dictionary pages for all the columns of a row group
*/
public interface DictionaryPageReadStore {
public interface DictionaryPageReadStore extends AutoCloseable {

/**
* Returns a {@link DictionaryPage} for the given column descriptor.
Expand All @@ -33,4 +33,9 @@ public interface DictionaryPageReadStore {
* @return the DictionaryPage for that column, or null if there isn't one
*/
DictionaryPage readDictionaryPage(ColumnDescriptor descriptor);

@Override
default void close() {
// No-op default implementation for compatibility
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@
/**
* contains all the writers for the columns in the corresponding row group
*/
public interface PageWriteStore {
public interface PageWriteStore extends AutoCloseable {

/**
* @param path the descriptor for the column
* @return the corresponding page writer
*/
PageWriter getPageWriter(ColumnDescriptor path);

@Override
default void close() {
// No-op default implementation for compatibility
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
/**
* a writer for all the pages of a given column chunk
*/
public interface PageWriter {
public interface PageWriter extends AutoCloseable {

/**
* writes a single page
Expand Down Expand Up @@ -120,4 +120,9 @@ void writePageV2(
* @return a string presenting a summary of how memory is used
*/
String memUsageString(String prefix);

@Override
default void close() {
// No-op default implementation for compatibility
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,17 @@
/**
* Contains all writers for all columns of a row group
*/
public interface BloomFilterWriteStore {
public interface BloomFilterWriteStore extends AutoCloseable {
/**
* Get bloom filter writer of a column
*
* @param path the descriptor for the column
* @return the corresponding Bloom filter writer
*/
BloomFilterWriter getBloomFilterWriter(ColumnDescriptor path);

@Override
default void close() {
// No-op default implementation for compatibility
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@

package org.apache.parquet.column.values.bloomfilter;

public interface BloomFilterWriter {
public interface BloomFilterWriter extends AutoCloseable {
/**
* Write a Bloom filter
*
* @param bloomFilter the Bloom filter to write
*/
void writeBloomFilter(BloomFilter bloomFilter);

@Override
default void close() {
// No-op default implementation for compatibility
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ public long getRowCount() {
return rowCount;
}

@Override
public void close() {
// no-op
}

public void addRowCount(long count) {
rowCount += count;
}
Expand Down
7 changes: 7 additions & 0 deletions parquet-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.bytes;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

/**
* Convenient class for releasing {@link java.nio.ByteBuffer} objects with the corresponding allocator;
*/
public class ByteBufferReleaser implements AutoCloseable {

final ByteBufferAllocator allocator;
private final List<ByteBuffer> toRelease = new ArrayList<>();

/**
* Constructs a new {@link ByteBufferReleaser} instance with the specified {@link ByteBufferAllocator} to be used for
* releasing the buffers in {@link #close()}.
*
* @param allocator the allocator to be used for releasing the buffers
* @see #releaseLater(ByteBuffer)
* @see #close()
*/
public ByteBufferReleaser(ByteBufferAllocator allocator) {
this.allocator = allocator;
}

/**
* Adds a {@link ByteBuffer} object to the list of buffers to be released at {@link #close()}. The specified buffer
* shall be one that was allocated by the {@link ByteBufferAllocator} of this object.
*
* @param buffer the buffer to be released
*/
public void releaseLater(ByteBuffer buffer) {
toRelease.add(buffer);
}

@Override
public void close() {
for (ByteBuffer buf : toRelease) {
allocator.release(buf);
}
toRelease.clear();
}
}
Loading

0 comments on commit 274dc51

Please sign in to comment.