Skip to content

Commit

Permalink
GH-38254: [Java] Add reusable buffer getters to char/binary vectors
Browse files Browse the repository at this point in the history
Add a reusable buffer interface that can be populated by
character and binary vectors to avoid allocations when consuming
vector content.

Optimize getObject() on VarCharVector/LargeVarCharVector to
avoid an extra allocation of a byte array (copy from ArrowBuf
directly to the resulting Text).
  • Loading branch information
jduo committed Oct 14, 2023
1 parent c5bce96 commit fcdf8cf
Show file tree
Hide file tree
Showing 9 changed files with 248 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.memory;

/**
* A lightweight, automatically expanding container for holding byte data.
* @param <ReturnTypeT> The type of the underlying buffer.
*/
public interface ReusableBuffer<ReturnTypeT> {

/**
* Get the buffer backing this ReusableBuffer.
*/
ReturnTypeT getBuffer();

/**
* Set the buffer to the contents of the given ArrowBuf.
* The internal buffer must resize if it cannot fit the contents
* of the data.
*
* @param bytes the data to copy from
* @param start the first position of the new string
* @param len the number of bytes of the new string
*/
void set(ArrowBuf bytes, long start, int len);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.ReusableBuffer;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.complex.impl.FixedSizeBinaryReaderImpl;
import org.apache.arrow.vector.complex.reader.FieldReader;
Expand Down Expand Up @@ -116,6 +117,18 @@ public byte[] get(int index) {
return dst;
}

/**
* Read the value at the given position to the given output buffer.
* The caller is responsible for checking for nullity first.
*
* @param index position of element.
* @param outputBuffer the buffer to write into.
*/
public void readBytes(int index, ReusableBuffer<?> outputBuffer) {
final int startOffset = index * byteWidth;
outputBuffer.set(valueBuffer, startOffset, byteWidth);
}

/**
* Get the element at the given index from the vector and
* sets the state in holder. If element at given index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.arrow.vector;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.ReusableBuffer;
import org.apache.arrow.vector.complex.impl.LargeVarBinaryReaderImpl;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.holders.LargeVarBinaryHolder;
Expand Down Expand Up @@ -112,6 +113,20 @@ public byte[] get(int index) {
return result;
}

/**
* Read the value at the given position to the given output buffer.
* The caller is responsible for checking for nullity first.
*
* @param index position of element.
* @param outputBuffer the buffer to write into.
*/
public void readBytes(int index, ReusableBuffer<?> outputBuffer) {
final long startOffset = getStartOffset(index);
final int dataLength =
(int) (offsetBuffer.getLong((long) (index + 1) * OFFSET_WIDTH) - startOffset);
outputBuffer.set(valueBuffer, startOffset, dataLength);
}

/**
* Get the variable length element at specified index as Text.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.arrow.vector;

import static org.apache.arrow.vector.NullCheckingForGet.NULL_CHECKING_ENABLED;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.ReusableBuffer;
import org.apache.arrow.vector.complex.impl.LargeVarCharReaderImpl;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.holders.LargeVarCharHolder;
Expand Down Expand Up @@ -120,12 +123,28 @@ public byte[] get(int index) {
* @return Text object for non-null element, null otherwise
*/
public Text getObject(int index) {
byte[] b = get(index);
if (b == null) {
assert index >= 0;
if (NULL_CHECKING_ENABLED && isSet(index) == 0) {
return null;
} else {
return new Text(b);
}

final Text result = new Text();
readBytes(index, result);
return result;
}

/**
* Read the value at the given position to the given output buffer.
* The caller is responsible for checking for nullity first.
*
* @param index position of element.
* @param outputBuffer the buffer to write into.
*/
public void readBytes(int index, ReusableBuffer<?> outputBuffer) {
final long startOffset = getStartOffset(index);
final int dataLength =
(int) (offsetBuffer.getLong((long) (index + 1) * OFFSET_WIDTH) - startOffset);
outputBuffer.set(valueBuffer, startOffset, dataLength);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.arrow.vector.NullCheckingForGet.NULL_CHECKING_ENABLED;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.ReusableBuffer;
import org.apache.arrow.vector.complex.impl.VarBinaryReaderImpl;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.holders.NullableVarBinaryHolder;
Expand Down Expand Up @@ -113,6 +114,20 @@ public byte[] get(int index) {
return result;
}

/**
* Read the value at the given position to the given output buffer.
* The caller is responsible for checking for nullity first.
*
* @param index position of element.
* @param outputBuffer the buffer to write into.
*/
public void readBytes(int index, ReusableBuffer<?> outputBuffer) {
final int startOffset = getStartOffset(index);
final int dataLength =
offsetBuffer.getInt((long) (index + 1) * OFFSET_WIDTH) - startOffset;
outputBuffer.set(valueBuffer, startOffset, dataLength);
}

/**
* Get the variable length element at specified index as Text.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.arrow.vector.NullCheckingForGet.NULL_CHECKING_ENABLED;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.ReusableBuffer;
import org.apache.arrow.vector.complex.impl.VarCharReaderImpl;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.holders.NullableVarCharHolder;
Expand Down Expand Up @@ -119,12 +120,28 @@ public byte[] get(int index) {
* @return Text object for non-null element, null otherwise
*/
public Text getObject(int index) {
byte[] b = get(index);
if (b == null) {
assert index >= 0;
if (NULL_CHECKING_ENABLED && isSet(index) == 0) {
return null;
} else {
return new Text(b);
}

final Text result = new Text();
readBytes(index, result);
return result;
}

/**
* Read the value at the given position to the given output buffer.
* The caller is responsible for checking for nullity first.
*
* @param index position of element.
* @param outputBuffer the buffer to write into.
*/
public void readBytes(int index, ReusableBuffer<?> outputBuffer) {
final int startOffset = getStartOffset(index);
final int dataLength =
offsetBuffer.getInt((long) (index + 1) * OFFSET_WIDTH) - startOffset;
outputBuffer.set(valueBuffer, startOffset, dataLength);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.vector.util;

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.ReusableBuffer;

/**
* A writable text buffer backed by an Arrow buffer.
*/
public class ReusableArrowBuf implements ReusableBuffer<ArrowBuf> {
private ArrowBuf buffer;

/**
* Creates a new instance that attaches to the given ArrowBuf.
* The caller is responsible for managing the ArrowBuf.
* @param buffer The buffer to attach.
*/
public ReusableArrowBuf(ArrowBuf buffer) {
this.buffer = buffer;
}

@Override
public ArrowBuf getBuffer() {
return buffer;
}

@Override
public void set(ArrowBuf utf8, long start, int len) {
buffer = buffer.reallocIfNeeded(len);
buffer.getBytes(0, utf8, 0, (int) buffer.writerIndex());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.vector.util;

import java.util.Arrays;

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.ReusableBuffer;

/**
* A wrapper around byte arrays for repeated writing.
*/
public class ReusableByteArray implements ReusableBuffer<byte[]> {

protected static final byte[] EMPTY_BYTES = new byte[0];

protected byte[] bytes;
protected int length;

public ReusableByteArray() {
bytes = EMPTY_BYTES;
}

@Override
public byte[] getBuffer() {
return bytes;
}

@Override
public void set(ArrowBuf utf8, long start, int len) {
setCapacity(len, false);
utf8.getBytes(start, bytes, 0, len);
length = len;
}

/**
* Sets the capacity of this object to <em>at least</em> <code>len</code> bytes. If the
* current buffer is longer, then the capacity and existing content of the buffer are unchanged.
* If <code>len</code> is larger than the current capacity, the Text object's capacity is
* increased to match.
*
* @param len the number of bytes we need
* @param keepData should the old data be kept
*/
protected void setCapacity(int len, boolean keepData) {
if (bytes == null || bytes.length < len) {
if (bytes != null && keepData) {
bytes = Arrays.copyOf(bytes, Math.max(len, length << 1));
} else {
bytes = new byte[len];
}
}
}

}
28 changes: 2 additions & 26 deletions java/vector/src/main/java/org/apache/arrow/vector/util/Text.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.nio.charset.MalformedInputException;
import java.text.CharacterIterator;
import java.text.StringCharacterIterator;
import java.util.Arrays;
import java.util.Optional;

import com.fasterxml.jackson.core.JsonGenerationException;
Expand All @@ -43,7 +42,7 @@
* Lifted from Hadoop 2.7.1
*/
@JsonSerialize(using = Text.TextSerializer.class)
public class Text {
public class Text extends ReusableByteArray {

private static ThreadLocal<CharsetEncoder> ENCODER_FACTORY =
new ThreadLocal<CharsetEncoder>() {
Expand All @@ -65,13 +64,9 @@ protected CharsetDecoder initialValue() {
}
};

private static final byte[] EMPTY_BYTES = new byte[0];

private byte[] bytes;
private int length;

public Text() {
bytes = EMPTY_BYTES;
super();
}

/**
Expand Down Expand Up @@ -278,25 +273,6 @@ public void clear() {
length = 0;
}

/**
* Sets the capacity of this Text object to <em>at least</em> <code>len</code> bytes. If the
* current buffer is longer, then the capacity and existing content of the buffer are unchanged.
* If <code>len</code> is larger than the current capacity, the Text object's capacity is
* increased to match.
*
* @param len the number of bytes we need
* @param keepData should the old data be kept
*/
private void setCapacity(int len, boolean keepData) {
if (bytes == null || bytes.length < len) {
if (bytes != null && keepData) {
bytes = Arrays.copyOf(bytes, Math.max(len, length << 1));
} else {
bytes = new byte[len];
}
}
}

@Override
public String toString() {
try {
Expand Down

0 comments on commit fcdf8cf

Please sign in to comment.