Skip to content

Commit

Permalink
GH-38254: [Java] Add reusable buffer getters to char/binary vectors
Browse files Browse the repository at this point in the history
Add a reusable buffer interface that can be populated by
character and binary vectors to avoid allocations when consuming
vector content.

Optimize getObject() on VarCharVector/LargeVarCharVector to
avoid an extra allocation of a byte array (copy from ArrowBuf
directly to the resulting Text).
  • Loading branch information
jduo committed Oct 20, 2023
1 parent a376e3c commit d1471b4
Show file tree
Hide file tree
Showing 14 changed files with 492 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public InputStream getUnicodeStream() {

// Already in UTF-8
final Text textValue = new Text(value);
return new ByteArrayInputStream(textValue.getBytes(), 0, textValue.getLength());
return new ByteArrayInputStream(textValue.getBytes(), 0, (int) textValue.getLength());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.memory;

/**
* A lightweight, automatically expanding container for holding byte data.
* @param <T> The type of the underlying buffer.
*/
public interface ReusableBuffer<T> {
/**
* Get the number of valid bytes in the data.
*
* @return the number of valid bytes in the data
*/
long getLength();

/**
* Get the buffer backing this ReusableBuffer.
*/
T getBuffer();

/**
* Set the buffer to the contents of the given ArrowBuf.
* The internal buffer must resize if it cannot fit the contents
* of the data.
*
* @param srcBytes the data to copy from
* @param start the first position of the new data
* @param len the number of bytes of the new data
*/
void set(ArrowBuf srcBytes, long start, long len);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.ReusableBuffer;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.complex.impl.FixedSizeBinaryReaderImpl;
import org.apache.arrow.vector.complex.reader.FieldReader;
Expand Down Expand Up @@ -116,6 +117,18 @@ public byte[] get(int index) {
return dst;
}

/**
* Read the value at the given position to the given output buffer.
* The caller is responsible for checking for nullity first.
*
* @param index position of element.
* @param outputBuffer the buffer to write into.
*/
public void read(int index, ReusableBuffer<?> outputBuffer) {
final int startOffset = index * byteWidth;
outputBuffer.set(valueBuffer, startOffset, byteWidth);
}

/**
* Get the element at the given index from the vector and
* sets the state in holder. If element at given index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.arrow.vector;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.ReusableBuffer;
import org.apache.arrow.vector.complex.impl.LargeVarBinaryReaderImpl;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.holders.LargeVarBinaryHolder;
Expand Down Expand Up @@ -112,6 +113,20 @@ public byte[] get(int index) {
return result;
}

/**
* Read the value at the given position to the given output buffer.
* The caller is responsible for checking for nullity first.
*
* @param index position of element.
* @param outputBuffer the buffer to write into.
*/
public void read(int index, ReusableBuffer<?> outputBuffer) {
final long startOffset = getStartOffset(index);
final long dataLength =
offsetBuffer.getLong((long) (index + 1) * OFFSET_WIDTH) - startOffset;
outputBuffer.set(valueBuffer, startOffset, dataLength);
}

/**
* Get the variable length element at specified index as Text.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.arrow.vector;

import static org.apache.arrow.vector.NullCheckingForGet.NULL_CHECKING_ENABLED;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.ReusableBuffer;
import org.apache.arrow.vector.complex.impl.LargeVarCharReaderImpl;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.holders.LargeVarCharHolder;
Expand Down Expand Up @@ -120,12 +123,28 @@ public byte[] get(int index) {
* @return Text object for non-null element, null otherwise
*/
public Text getObject(int index) {
byte[] b = get(index);
if (b == null) {
assert index >= 0;
if (NULL_CHECKING_ENABLED && isSet(index) == 0) {
return null;
} else {
return new Text(b);
}

final Text result = new Text();
read(index, result);
return result;
}

/**
* Read the value at the given position to the given output buffer.
* The caller is responsible for checking for nullity first.
*
* @param index position of element.
* @param outputBuffer the buffer to write into.
*/
public void read(int index, ReusableBuffer<?> outputBuffer) {
final long startOffset = getStartOffset(index);
final long dataLength =
offsetBuffer.getLong((long) (index + 1) * OFFSET_WIDTH) - startOffset;
outputBuffer.set(valueBuffer, startOffset, dataLength);
}

/**
Expand Down Expand Up @@ -247,7 +266,7 @@ public void setSafe(int index, NullableLargeVarCharHolder holder) {
* @param text Text object with data
*/
public void set(int index, Text text) {
set(index, text.getBytes(), 0, text.getLength());
set(index, text.getBytes(), 0, (int) text.getLength());
}

/**
Expand All @@ -259,7 +278,7 @@ public void set(int index, Text text) {
* @param text Text object with data
*/
public void setSafe(int index, Text text) {
setSafe(index, text.getBytes(), 0, text.getLength());
setSafe(index, text.getBytes(), 0, (int) text.getLength());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.arrow.vector.NullCheckingForGet.NULL_CHECKING_ENABLED;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.ReusableBuffer;
import org.apache.arrow.vector.complex.impl.VarBinaryReaderImpl;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.holders.NullableVarBinaryHolder;
Expand Down Expand Up @@ -113,6 +114,20 @@ public byte[] get(int index) {
return result;
}

/**
* Read the value at the given position to the given output buffer.
* The caller is responsible for checking for nullity first.
*
* @param index position of element.
* @param outputBuffer the buffer to write into.
*/
public void read(int index, ReusableBuffer<?> outputBuffer) {
final int startOffset = getStartOffset(index);
final int dataLength =
offsetBuffer.getInt((long) (index + 1) * OFFSET_WIDTH) - startOffset;
outputBuffer.set(valueBuffer, startOffset, dataLength);
}

/**
* Get the variable length element at specified index as Text.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.arrow.vector.NullCheckingForGet.NULL_CHECKING_ENABLED;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.ReusableBuffer;
import org.apache.arrow.vector.complex.impl.VarCharReaderImpl;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.holders.NullableVarCharHolder;
Expand Down Expand Up @@ -119,12 +120,28 @@ public byte[] get(int index) {
* @return Text object for non-null element, null otherwise
*/
public Text getObject(int index) {
byte[] b = get(index);
if (b == null) {
assert index >= 0;
if (NULL_CHECKING_ENABLED && isSet(index) == 0) {
return null;
} else {
return new Text(b);
}

final Text result = new Text();
read(index, result);
return result;
}

/**
* Read the value at the given position to the given output buffer.
* The caller is responsible for checking for nullity first.
*
* @param index position of element.
* @param outputBuffer the buffer to write into.
*/
public void read(int index, ReusableBuffer<?> outputBuffer) {
final int startOffset = getStartOffset(index);
final int dataLength =
offsetBuffer.getInt((long) (index + 1) * OFFSET_WIDTH) - startOffset;
outputBuffer.set(valueBuffer, startOffset, dataLength);
}

/**
Expand Down Expand Up @@ -247,7 +264,7 @@ public void setSafe(int index, NullableVarCharHolder holder) {
* @param text Text object with data
*/
public void set(int index, Text text) {
set(index, text.getBytes(), 0, text.getLength());
set(index, text.getBytes(), 0, (int) text.getLength());
}

/**
Expand All @@ -259,7 +276,7 @@ public void set(int index, Text text) {
* @param text Text object with data
*/
public void setSafe(int index, Text text) {
setSafe(index, text.getBytes(), 0, text.getLength());
setSafe(index, text.getBytes(), 0, (int) text.getLength());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.vector.util;

import java.util.Arrays;
import java.util.Base64;

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.ReusableBuffer;

/**
* A wrapper around byte arrays for repeated writing.
*/
public class ReusableByteArray implements ReusableBuffer<byte[]> {

protected static final byte[] EMPTY_BYTES = new byte[0];

protected byte[] bytes;
protected int length;

public ReusableByteArray() {
bytes = EMPTY_BYTES;
}

/**
* Get the number of bytes in the byte array.
*
* @return the number of bytes in the byte array
*/
@Override
public long getLength() {
return length;
}

@Override
public byte[] getBuffer() {
return bytes;
}

@Override
public void set(ArrowBuf srcBytes, long start, long len) {
setCapacity((int) len, false);
srcBytes.getBytes(start, bytes, 0, (int) len);
length = (int) len;
}

@Override
public boolean equals(Object o) {
if (o == this) {
return true;
} else if (o == null) {
return false;
}
if (!(o instanceof ReusableByteArray)) {
return false;
}

final ReusableByteArray that = (ReusableByteArray) o;
if (this.getLength() != that.getLength()) {
return false;
}

// copied from Arrays.equals so we don'thave to copy the byte arrays
for (int i = 0; i < length; i++) {
if (bytes[i] != that.bytes[i]) {
return false;
}
}

return true;
}

/**
* Copied from Arrays.hashCode so we don't have to copy the byte array.
*
* @return hashCode
*/
@Override
public int hashCode() {
if (bytes == null) {
return 0;
}

int result = 1;
for (int i = 0; i < length; i++) {
result = 31 * result + bytes[i];
}

return result;
}

@Override
public String toString() {
return Base64.getEncoder().encodeToString(Arrays.copyOfRange(bytes, 0, length));
}

/**
* Sets the capacity of this object to <em>at least</em> <code>len</code> bytes. If the
* current buffer is longer, then the capacity and existing content of the buffer are unchanged.
* If <code>len</code> is larger than the current capacity, the Text object's capacity is
* increased to match.
*
* @param len the number of bytes we need
* @param keepData should the old data be kept
*/
protected void setCapacity(int len, boolean keepData) {
if (bytes == null || bytes.length < len) {
if (bytes != null && keepData) {
bytes = Arrays.copyOf(bytes, Math.max(len, length << 1));
} else {
bytes = new byte[len];
}
}
}
}
Loading

0 comments on commit d1471b4

Please sign in to comment.