From 8cab62d4fe8e56c9c95a1b13e42bfff9d4727c71 Mon Sep 17 00:00:00 2001 From: "chenweiguo.vc" Date: Wed, 28 Aug 2024 00:18:11 +0800 Subject: [PATCH] RunEndEncoded initial implement --- .../binder/ColumnBinderArrowTypeVisitor.java | 5 + .../arrow/c/BufferImportTypeVisitor.java | 6 + .../jdbc/utils/AvaticaParameterBinder.java | 6 + .../arrow/driver/jdbc/utils/ConvertUtils.java | 6 + .../src/main/codegen/data/ArrowTypes.tdd | 5 + .../org/apache/arrow/vector/TypeLayout.java | 11 + .../vector/compare/RangeEqualsVisitor.java | 9 + .../vector/compare/TypeEqualsVisitor.java | 6 + .../arrow/vector/compare/VectorVisitor.java | 3 + .../vector/complex/RunEndEncodedVector.java | 647 ++++++++++++++++++ .../org/apache/arrow/vector/types/Types.java | 19 + .../arrow/vector/util/VectorAppender.java | 6 + .../validate/ValidateVectorBufferVisitor.java | 6 + .../validate/ValidateVectorDataVisitor.java | 6 + .../validate/ValidateVectorTypeVisitor.java | 10 + .../validate/ValidateVectorVisitor.java | 6 + .../arrow/vector/TestRunEndEncodedVector.java | 134 ++++ 17 files changed, 891 insertions(+) create mode 100644 java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java create mode 100644 java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/ColumnBinderArrowTypeVisitor.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/ColumnBinderArrowTypeVisitor.java index 30b2305f3f916..475247f40be47 100644 --- a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/ColumnBinderArrowTypeVisitor.java +++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/ColumnBinderArrowTypeVisitor.java @@ -96,6 +96,11 @@ public ColumnBinder visit(ArrowType.Union type) { throw new UnsupportedOperationException("No column binder implemented for type " + type); } + @Override + public ColumnBinder visit(ArrowType.RunEndEncoded type) { + throw new UnsupportedOperationException("No column binder implemented for type " + type); + } + @Override public ColumnBinder visit(ArrowType.Map type) { return new MapBinder((MapVector) vector); diff --git a/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java b/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java index 5f262d3dc3315..fc54ade6b049a 100644 --- a/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java +++ b/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java @@ -183,6 +183,12 @@ public List visit(ArrowType.Union type) { } } + + @Override + public List visit(ArrowType.RunEndEncoded type) { + throw new UnsupportedOperationException("Importing buffers for type: " + type); + } + @Override public List visit(ArrowType.Map type) { return Arrays.asList(maybeImportBitmap(type), importOffsets(type, MapVector.OFFSET_WIDTH)); diff --git a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/AvaticaParameterBinder.java b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/AvaticaParameterBinder.java index 7acffb4bc9722..5f647491ec1b4 100644 --- a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/AvaticaParameterBinder.java +++ b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/AvaticaParameterBinder.java @@ -276,5 +276,11 @@ public Boolean visit(ArrowType.Duration type) { public Boolean visit(ArrowType.ListView type) { throw new UnsupportedOperationException("Binding is not yet supported for type " + type); } + + @Override + public Boolean visit(ArrowType.RunEndEncoded type) { + throw new UnsupportedOperationException( + "No Avatica parameter binder implemented for type " + type); + } } } diff --git a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ConvertUtils.java b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ConvertUtils.java index 77b7a88536149..2d90a4366c94e 100644 --- a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ConvertUtils.java +++ b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ConvertUtils.java @@ -278,5 +278,11 @@ public AvaticaParameter visit(ArrowType.ListView type) { throw new UnsupportedOperationException( "AvaticaParameter not yet supported for type " + type); } + + @Override + public AvaticaParameter visit(ArrowType.RunEndEncoded type) { + throw new UnsupportedOperationException( + "No Avatica parameter binder implemented for type " + type); + } } } diff --git a/java/vector/src/main/codegen/data/ArrowTypes.tdd b/java/vector/src/main/codegen/data/ArrowTypes.tdd index 72df4779793f0..a204b8b0d2db7 100644 --- a/java/vector/src/main/codegen/data/ArrowTypes.tdd +++ b/java/vector/src/main/codegen/data/ArrowTypes.tdd @@ -134,6 +134,11 @@ name: "ListView", fields: [], complex: true + }, + { + name: "RunEndEncoded", + fields: [], + complex: true } ] } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/TypeLayout.java b/java/vector/src/main/java/org/apache/arrow/vector/TypeLayout.java index b8535532ea359..ae435d6f8f11e 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/TypeLayout.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/TypeLayout.java @@ -40,6 +40,7 @@ import org.apache.arrow.vector.types.pojo.ArrowType.LargeUtf8; import org.apache.arrow.vector.types.pojo.ArrowType.Map; import org.apache.arrow.vector.types.pojo.ArrowType.Null; +import org.apache.arrow.vector.types.pojo.ArrowType.RunEndEncoded; import org.apache.arrow.vector.types.pojo.ArrowType.Struct; import org.apache.arrow.vector.types.pojo.ArrowType.Time; import org.apache.arrow.vector.types.pojo.ArrowType.Timestamp; @@ -270,6 +271,11 @@ public TypeLayout visit(Interval type) { public TypeLayout visit(Duration type) { return newFixedWidthTypeLayout(BufferLayout.dataBuffer(64)); } + + @Override + public TypeLayout visit(RunEndEncoded type) { + return new TypeLayout(Collections.emptyList()); + } }); return layout; } @@ -428,6 +434,11 @@ public Integer visit(Interval type) { public Integer visit(Duration type) { return FIXED_WIDTH_BUFFER_COUNT; } + + @Override + public Integer visit(RunEndEncoded type) { + return 0; + } }); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compare/RangeEqualsVisitor.java b/java/vector/src/main/java/org/apache/arrow/vector/compare/RangeEqualsVisitor.java index 3050649737355..d76feac16bdd1 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compare/RangeEqualsVisitor.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compare/RangeEqualsVisitor.java @@ -37,6 +37,7 @@ import org.apache.arrow.vector.complex.LargeListVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.NonNullableStructVector; +import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.UnionVector; /** Visitor to compare a range of values for vectors. */ @@ -241,6 +242,14 @@ protected RangeEqualsVisitor createInnerVisitor( return new RangeEqualsVisitor(leftInner, rightInner, typeComparator); } + @Override + public Boolean visit(RunEndEncodedVector left, Range range) { + if (!validate(left)) { + return false; + } + return true; // TODO + } + protected boolean compareUnionVectors(Range range) { UnionVector leftVector = (UnionVector) left; UnionVector rightVector = (UnionVector) right; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compare/TypeEqualsVisitor.java b/java/vector/src/main/java/org/apache/arrow/vector/compare/TypeEqualsVisitor.java index 15cc2c31b8b98..fca58d11a087e 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compare/TypeEqualsVisitor.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compare/TypeEqualsVisitor.java @@ -30,6 +30,7 @@ import org.apache.arrow.vector.complex.LargeListVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.NonNullableStructVector; +import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.UnionVector; import org.apache.arrow.vector.types.pojo.Field; @@ -124,6 +125,11 @@ public Boolean visit(ExtensionTypeVector left, Void value) { return compareField(left.getField(), right.getField()); } + @Override + public Boolean visit(RunEndEncodedVector left, Void value) { + return compareField(left.getField(), right.getField()); + } + private boolean compareField(Field leftField, Field rightField) { if (leftField == rightField) { diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compare/VectorVisitor.java b/java/vector/src/main/java/org/apache/arrow/vector/compare/VectorVisitor.java index 870f015862764..7401a4e5efa81 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compare/VectorVisitor.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compare/VectorVisitor.java @@ -27,6 +27,7 @@ import org.apache.arrow.vector.complex.LargeListVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.NonNullableStructVector; +import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.UnionVector; /** @@ -60,4 +61,6 @@ public interface VectorVisitor { OUT visit(NullVector left, IN value); OUT visit(ExtensionTypeVector left, IN value); + + OUT visit(RunEndEncodedVector left, IN value); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java new file mode 100644 index 0000000000000..82ad9d19b5b4e --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java @@ -0,0 +1,647 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector.complex; + +import static org.apache.arrow.util.Preconditions.checkArgument; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.OutOfMemoryException; +import org.apache.arrow.memory.util.ByteFunctionHelpers; +import org.apache.arrow.memory.util.hash.ArrowBufHasher; +import org.apache.arrow.vector.BaseIntVector; +import org.apache.arrow.vector.BaseValueVector; +import org.apache.arrow.vector.BufferBacked; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.compare.VectorVisitor; +import org.apache.arrow.vector.complex.reader.FieldReader; +import org.apache.arrow.vector.complex.writer.FieldWriter; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; +import org.apache.arrow.vector.types.Types.MinorType; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.util.CallBack; +import org.apache.arrow.vector.util.TransferPair; + +/** + * A run-end encoded vector contains only two child vectors: a run_end vector of type int and a + * values vector of any type. There are no buffers associated with the parent vector. + */ +public class RunEndEncodedVector extends BaseValueVector implements FieldVector { + + public static RunEndEncodedVector empty(String name, BufferAllocator allocator) { + return new RunEndEncodedVector( + name, allocator, FieldType.notNullable(ArrowType.RunEndEncoded.INSTANCE), null); + } + + protected final CallBack callBack; + protected Field field; + protected BaseIntVector runEndsVector; + protected FieldVector valuesVector; + protected int valueCount; + + /** + * Constructs a new instance. + * + * @param name The name of the instance. + * @param allocator The allocator to use for allocating/reallocating buffers. + * @param fieldType The type of the array that is run-end encoded. + * @param callBack A schema change callback. + */ + public RunEndEncodedVector( + String name, BufferAllocator allocator, FieldType fieldType, CallBack callBack) { + this(new Field(name, fieldType, null), allocator, callBack); + } + + /** + * Constructs a new instance. + * + * @param field The field materialized by this vector. + * @param allocator The allocator to use for allocating/reallocating buffers. + * @param callBack A schema change callback. + */ + public RunEndEncodedVector(Field field, BufferAllocator allocator, CallBack callBack) { + super(allocator); + this.field = field; + this.callBack = callBack; + this.valueCount = 0; + } + + /** ValueVector interface */ + + /** + * Allocate new buffers. ValueVector implements logic to determine how much to allocate. + * + * @throws OutOfMemoryException Thrown if no memory can be allocated. + */ + @Override + public void allocateNew() throws OutOfMemoryException { + if (!allocateNewSafe()) { + throw new OutOfMemoryException("Failure while allocating memory"); + } + } + + /** + * Allocates new buffers. ValueVector implements logic to determine how much to allocate. + * + * @return Returns true if allocation was successful. + */ + @Override + public boolean allocateNewSafe() { + initializeChildrenFromFields(field.getChildren()); + for (FieldVector v : getChildrenFromFields()) { + boolean isAllocated = v.allocateNewSafe(); + if (!isAllocated) { + v.clear(); + return false; + } + } + return true; + } + + /** + * Allocate new buffer with double capacity, and copy data into the new buffer. Replace vector's + * buffer with new buffer, and release old one + */ + @Override + public void reAlloc() { + for (FieldVector v : getChildrenFromFields()) { + v.reAlloc(); + } + } + + @Override + public BufferAllocator getAllocator() { + return allocator; + } + + @Override + protected FieldReader getReaderImpl() { + return null; + } + + /** + * Set the initial record capacity. + * + * @param numRecords the initial record capacity. + */ + @Override + public void setInitialCapacity(int numRecords) { + // TODO: does it make sense? + for (FieldVector v : getChildrenFromFields()) { + v.setInitialCapacity(numRecords); + } + } + + /** + * Returns the maximum number of values that can be stored in this vector instance. + * + * @return the maximum number of values that can be stored in this vector instance. + */ + @Override + public int getValueCapacity() { + return getChildrenFromFields().stream() + .mapToInt(ValueVector::getValueCapacity) + .min() + .orElseThrow(NoSuchElementException::new); + } + + /** Alternative to clear(). Allows use as an AutoCloseable in try-with-resources. */ + @Override + public void close() { + for (FieldVector v : getChildrenFromFields()) { + v.close(); + } + } + + /** + * Release any owned ArrowBuf and reset the ValueVector to the initial state. If the vector has + * any child vectors, they will also be cleared. + */ + @Override + public void clear() { + for (FieldVector v : getChildrenFromFields()) { + v.clear(); + } + } + + /** + * Reset the ValueVector to the initial state without releasing any owned ArrowBuf. Buffer + * capacities will remain unchanged and any previous data will be zeroed out. This includes + * buffers for data, validity, offset, etc. If the vector has any child vectors, they will also be + * reset. + */ + @Override + public void reset() { + for (FieldVector v : getChildrenFromFields()) { + v.reset(); + } + valueCount = 0; + } + + /** + * Get information about how this field is materialized. + * + * @return the field corresponding to this vector + */ + @Override + public Field getField() { + return field; + } + + @Override + public MinorType getMinorType() { + return MinorType.RUNENDENCODED; + } + + /** + * To transfer quota responsibility. + * + * @param allocator the target allocator + * @return a {@link org.apache.arrow.vector.util.TransferPair transfer pair}, creating a new + * target vector of the same type. + */ + @Override + public TransferPair getTransferPair(BufferAllocator allocator) { + throw new UnsupportedOperationException( + "RunEndEncodedVector does not support getTransferPair(BufferAllocator)"); + } + + /** + * To transfer quota responsibility. + * + * @param ref the name of the vector + * @param allocator the target allocator + * @return a {@link org.apache.arrow.vector.util.TransferPair transfer pair}, creating a new + * target vector of the same type. + */ + @Override + public TransferPair getTransferPair(String ref, BufferAllocator allocator) { + return getTransferPair(ref, allocator, null); + } + + /** + * To transfer quota responsibility. + * + * @param field the Field object used by the target vector + * @param allocator the target allocator + * @return a {@link org.apache.arrow.vector.util.TransferPair transfer pair}, creating a new + * target vector of the same type. + */ + @Override + public TransferPair getTransferPair(Field field, BufferAllocator allocator) { + return getTransferPair(field, allocator, null); + } + + /** + * To transfer quota responsibility. + * + * @param ref the name of the vector + * @param allocator the target allocator + * @param callBack A schema change callback. + * @return a {@link org.apache.arrow.vector.util.TransferPair transfer pair}, creating a new + * target vector of the same type. + */ + @Override + public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) { + throw new UnsupportedOperationException( + "RunEndEncodedVector does not support getTransferPair(String, BufferAllocator, CallBack)"); + } + + /** + * To transfer quota responsibility. + * + * @param field the Field object used by the target vector + * @param allocator the target allocator + * @param callBack A schema change callback. + * @return a {@link org.apache.arrow.vector.util.TransferPair transfer pair}, creating a new + * target vector of the same type. + */ + @Override + public TransferPair getTransferPair(Field field, BufferAllocator allocator, CallBack callBack) { + throw new UnsupportedOperationException( + "RunEndEncodedVector does not support getTransferPair(Field, BufferAllocator, CallBack)"); + } + + /** + * Makes a new transfer pair used to transfer underlying buffers. + * + * @param target the target for the transfer + * @return a new {@link org.apache.arrow.vector.util.TransferPair transfer pair} that is used to + * transfer underlying buffers into the target vector. + */ + @Override + public TransferPair makeTransferPair(ValueVector target) { + throw new UnsupportedOperationException( + "RunEndEncodedVector does not support makeTransferPair(ValueVector)"); + } + + /** + * Get a reader for this vector. + * + * @return a {@link org.apache.arrow.vector.complex.reader.FieldReader field reader} that supports + * reading values from this vector. + */ + @Override + public FieldReader getReader() { + return null; // TODO + } + + /** + * Get a writer for this vector. + * + * @return a {@link org.apache.arrow.vector.complex.writer.FieldWriter field writer} that supports + * writing values to this vector. + */ + public FieldWriter getWriter() { + return null; // TODO + } + + /** + * Get the number of bytes used by this vector. + * + * @return the number of bytes that is used by this vector instance. + */ + @Override + public int getBufferSize() { + int bufferSize = 0; + for (FieldVector v : getChildrenFromFields()) { + bufferSize += v.getBufferSize(); + } + return bufferSize; + } + + /** + * Returns the number of bytes that is used by this vector if it holds the given number of values. + * The result will be the same as if setValueCount() were called, followed by calling + * getBufferSize(), but without any of the closing side-effects that setValueCount() implies wrt + * finishing off the population of a vector. Some operations might wish to use this to determine + * how much memory has been used by a vector so far, even though it is not finished being + * populated. + * + * @param valueCount the number of values to assume this vector contains + * @return the buffer size if this vector is holding valueCount values + */ + @Override + public int getBufferSizeFor(int valueCount) { + return 0; + } + + /** + * Return the underlying buffers associated with this vector. Note that this doesn't impact the + * reference counts for this buffer so it only should be used for in-context access. Also note + * that this buffer changes regularly thus external classes shouldn't hold a reference to it + * (unless they change it). + * + * @param clear Whether to clear vector before returning; the buffers will still be refcounted; + * but the returned array will be the only reference to them + * @return The underlying {@link ArrowBuf buffers} that is used by this vector instance. + */ + @Override + public ArrowBuf[] getBuffers(boolean clear) { + return null; + } + + /** + * Gets the underlying buffer associated with validity vector. + * + * @return buffer + */ + @Override + public ArrowBuf getValidityBuffer() { + return null; + } + + /** + * Gets the underlying buffer associated with data vector. + * + * @return buffer + */ + @Override + public ArrowBuf getDataBuffer() { + return null; + } + + /** + * Gets the underlying buffer associated with offset vector. + * + * @return buffer + */ + @Override + public ArrowBuf getOffsetBuffer() { + return null; + } + + /** + * Gets the number of values. + * + * @return number of values in the vector + */ + @Override + public int getValueCount() { + return valueCount; + } + + /** Set number of values in the vector. */ + @Override + public void setValueCount(int valueCount) { + this.valueCount = valueCount; + } + + /** + * Get friendly type object from the vector. + * + * @param index index of object to get + * @return friendly type object + */ + @Override + public Object getObject(int index) { + int physicalIndex = findFirstLargerThan(runEndsVector, index); + return valuesVector.getObject(physicalIndex); + } + + /** + * Returns number of null elements in the vector. + * + * @return number of null elements + */ + @Override + public int getNullCount() { + if (valuesVector.getNullCount() != 0) { + long nullCount = 0; + // TODO: make it more efficient + for (int i = 0; i < valuesVector.getValueCount(); i++) { + if (valuesVector.isNull(i)) { + long lastEnd = i > 0 ? runEndsVector.getValueAsLong(i - 1) : 0; + long nullEunLength = runEndsVector.getValueAsLong(i) - lastEnd; + nullCount += nullEunLength; + } + } + return (int) nullCount; + } + return 0; + } + + /** + * Check whether an element in the vector is null. + * + * @param index index to check for null + * @return true if element is null + */ + @Override + public boolean isNull(int index) { + int physicalIndex = findFirstLargerThan(runEndsVector, index); + return valuesVector.isNull(physicalIndex); + } + + /** Returns hashCode of element in index with the default hasher. */ + @Override + public int hashCode(int index) { + return hashCode(index, null); + } + + /** Returns hashCode of element in index with the given hasher. */ + @Override + public int hashCode(int index, ArrowBufHasher hasher) { + int hash = 0; + for (FieldVector v : getChildrenFromFields()) { + if (index < v.getValueCount()) { + hash = ByteFunctionHelpers.combineHash(hash, v.hashCode(index, hasher)); + } + } + return hash; + } + + /** + * Accept a generic {@link VectorVisitor} and return the result. + * + * @param the output result type. + * @param the input data together with visitor. + */ + @Override + public OUT accept(VectorVisitor visitor, IN value) { + return visitor.visit(this, value); + } + + /** + * Gets the name of the vector. + * + * @return the name of the vector. + */ + @Override + public String getName() { + return this.field.getName(); + } + + @Override + public Iterator iterator() { + return Collections.unmodifiableCollection(getChildrenFromFields()).iterator(); + } + + /** FieldVector interface */ + + /** + * Initializes the child vectors to be later loaded with loadBuffers. + * + * @param children the schema containing the run_ends column first and the values column second + */ + @Override + public void initializeChildrenFromFields(List children) { + checkArgument( + children.size() == 2, + "Run-end encoded vectors must have two child Fields. Found: %s", + children.isEmpty() ? "none" : children); + checkArgument( + Arrays.asList( + MinorType.SMALLINT.getType(), MinorType.INT.getType(), MinorType.BIGINT.getType()) + .contains(children.get(0).getType()), + "The first field represents the run-end vector and must be of type int with size 16, 32, or 64 bits. Found: %s", + children.get(0).getType()); + runEndsVector = (BaseIntVector) children.get(0).createVector(allocator); + valuesVector = children.get(1).createVector(allocator); + field = new Field(field.getName(), field.getFieldType(), children); + } + + /** + * The returned list is the same size as the list passed to initializeChildrenFromFields. + * + * @return the children according to schema (empty for primitive types) + */ + @Override + public List getChildrenFromFields() { + return Arrays.asList(runEndsVector, valuesVector); + } + + /** + * Loads data in the vectors. (ownBuffers must be the same size as getFieldVectors()) + * + * @param fieldNode the fieldNode + * @param ownBuffers the buffers for this Field (own buffers only, children not included) + */ + @Override + public void loadFieldBuffers(ArrowFieldNode fieldNode, List ownBuffers) { + throw new UnsupportedOperationException( + "Run-end encoded vectors do not have any associated buffers."); + } + + /** + * Get the buffers of the fields, (same size as getFieldVectors() since it is their content). + * + * @return the buffers containing the data for this vector (ready for reading) + */ + @Override + public List getFieldBuffers() { + return null; + } + + /** + * Get the inner vectors. + * + * @return the inner vectors for this field as defined by the TypeLayout + * @deprecated This API will be removed as the current implementations no longer support inner + * vectors. + */ + @Deprecated + @Override + public List getFieldInnerVectors() { + throw new UnsupportedOperationException("There are no inner vectors. Use getFieldBuffers()."); + } + + /** + * Gets the starting address of the underlying buffer associated with validity vector. + * + * @return buffer address + */ + @Override + public long getValidityBufferAddress() { + throw new UnsupportedOperationException( + "Run-end encoded vectors do not have a validity buffer."); + } + + /** + * Gets the starting address of the underlying buffer associated with data vector. + * + * @return buffer address + */ + @Override + public long getDataBufferAddress() { + throw new UnsupportedOperationException("Run-end encoded vectors do not have a data buffer."); + } + + /** + * Gets the starting address of the underlying buffer associated with offset vector. + * + * @return buffer address + */ + @Override + public long getOffsetBufferAddress() { + throw new UnsupportedOperationException( + "Run-end encoded vectors do not have an offset buffer."); + } + + /** + * Set the element at the given index to null. + * + * @param index the value to change + */ + @Override + public void setNull(int index) { + throw new UnsupportedOperationException( + "Run-end encoded vectors do not have a validity buffer."); + } + + public FieldVector getRunEndsVector() { + return runEndsVector; + } + + public FieldVector getValuesVector() { + return valuesVector; + } + + static int findFirstLargerThan(BaseIntVector runEndVector, int logicalIndex) { + int target = logicalIndex + 1; + if (runEndVector == null || runEndVector.getValueCount() == 0) { + return -1; + } + + int low = 0; + int high = runEndVector.getValueCount() - 1; + int result = -1; + + while (low <= high) { + int mid = low + (high - low) / 2; + long valueAsLong = runEndVector.getValueAsLong(mid); + if (valueAsLong == target) { + return mid; + } else if (valueAsLong > target) { + result = mid; + high = mid - 1; + } else { + low = mid + 1; + } + } + + return result; + } +} diff --git a/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java b/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java index ed099890e1b08..4cbd292343dae 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java @@ -72,6 +72,7 @@ import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.ListViewVector; import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.StructVector; import org.apache.arrow.vector.complex.UnionVector; import org.apache.arrow.vector.complex.impl.BigIntWriterImpl; @@ -139,6 +140,7 @@ import org.apache.arrow.vector.types.pojo.ArrowType.ListView; import org.apache.arrow.vector.types.pojo.ArrowType.Map; import org.apache.arrow.vector.types.pojo.ArrowType.Null; +import org.apache.arrow.vector.types.pojo.ArrowType.RunEndEncoded; import org.apache.arrow.vector.types.pojo.ArrowType.Struct; import org.apache.arrow.vector.types.pojo.ArrowType.Time; import org.apache.arrow.vector.types.pojo.ArrowType.Timestamp; @@ -770,6 +772,18 @@ public FieldWriter getNewFieldWriter(ValueVector vector) { .getNewFieldWriter(vector); } }, + RUNENDENCODED(RunEndEncoded.INSTANCE) { + @Override + public FieldVector getNewVector( + Field field, BufferAllocator allocator, CallBack schemaChangeCallback) { + return new RunEndEncodedVector(field, allocator, schemaChangeCallback); + } + + @Override + public FieldWriter getNewFieldWriter(ValueVector vector) { + return null; // TODO + } + }, ; private final ArrowType type; @@ -1000,6 +1014,11 @@ public MinorType visit(ListView type) { public MinorType visit(ExtensionType type) { return MinorType.EXTENSIONTYPE; } + + @Override + public MinorType visit(RunEndEncoded type) { + return MinorType.RUNENDENCODED; + } }); } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java b/java/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java index 4f81cba55f1b3..a2db252664252 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java @@ -37,6 +37,7 @@ import org.apache.arrow.vector.complex.LargeListVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.NonNullableStructVector; +import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.UnionVector; /** Utility to append two vectors together. */ @@ -639,4 +640,9 @@ public ValueVector visit(ExtensionTypeVector deltaVector, Void value) { deltaVector.getUnderlyingVector().accept(underlyingAppender, null); return targetVector; } + + @Override + public ValueVector visit(RunEndEncodedVector deltaVector, Void value) { + return targetVector; // TODO + } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java index 0c9140c360d15..47e32fa1c6358 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java @@ -35,6 +35,7 @@ import org.apache.arrow.vector.complex.LargeListVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.NonNullableStructVector; +import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.UnionVector; import org.apache.arrow.vector.types.pojo.ArrowType; @@ -287,4 +288,9 @@ public Void visit(ExtensionTypeVector vector, Void value) { vector.getUnderlyingVector().accept(this, value); return null; } + + @Override + public Void visit(RunEndEncodedVector vector, Void value) { + return null; // TODO + } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorDataVisitor.java b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorDataVisitor.java index c62bff79f7710..0e2cab39d48a2 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorDataVisitor.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorDataVisitor.java @@ -32,6 +32,7 @@ import org.apache.arrow.vector.complex.LargeListVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.NonNullableStructVector; +import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.UnionVector; /** Utility for validating vector data. */ @@ -206,4 +207,9 @@ public Void visit(ExtensionTypeVector vector, Void value) { vector.getUnderlyingVector().accept(this, value); return null; } + + @Override + public Void visit(RunEndEncodedVector vector, Void value) { + return null; // TODO + } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorTypeVisitor.java b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorTypeVisitor.java index f947dcf41342f..674929606d5b8 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorTypeVisitor.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorTypeVisitor.java @@ -67,6 +67,7 @@ import org.apache.arrow.vector.complex.LargeListVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.NonNullableStructVector; +import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.UnionVector; import org.apache.arrow.vector.types.DateUnit; import org.apache.arrow.vector.types.FloatingPointPrecision; @@ -478,4 +479,13 @@ public Void visit(ExtensionTypeVector vector, Void value) { validateExtensionTypeVector(vector); return null; } + + @Override + public Void visit(RunEndEncodedVector vector, Void value) { + validateVectorCommon(vector, ArrowType.RunEndEncoded.class); + for (ValueVector subVector : vector.getChildrenFromFields()) { + subVector.accept(this, null); + } + return null; + } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorVisitor.java b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorVisitor.java index 5004ba488cacd..294add01c652c 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorVisitor.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorVisitor.java @@ -32,6 +32,7 @@ import org.apache.arrow.vector.complex.LargeListVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.NonNullableStructVector; +import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.UnionVector; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.util.ValueVectorUtility; @@ -318,4 +319,9 @@ public Void visit(ExtensionTypeVector vector, Void value) { vector.getUnderlyingVector().accept(this, value); return null; } + + @Override + public Void visit(RunEndEncodedVector vector, Void value) { + return null; // TODO + } } diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java new file mode 100644 index 0000000000000..77d00c76131b9 --- /dev/null +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.List; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.complex.RunEndEncodedVector; +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.pojo.ArrowType.RunEndEncoded; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class TestRunEndEncodedVector { + + private BufferAllocator allocator; + + @BeforeEach + public void init() { + allocator = new DirtyRootAllocator(Long.MAX_VALUE, (byte) 100); + } + + @AfterEach + public void terminate() throws Exception { + allocator.close(); + } + + @Test + public void testInitializeChildrenFromFields() throws Exception { + final FieldType valueType = FieldType.notNullable(Types.MinorType.BIGINT.getType()); + final FieldType runEndType = FieldType.notNullable(Types.MinorType.INT.getType()); + + final Field valueField = new Field("value", valueType, null); + final Field runEndField = new Field("ree", runEndType, null); + + try (RunEndEncodedVector reeVector = RunEndEncodedVector.empty("empty", allocator)) { + reeVector.initializeChildrenFromFields(List.of(runEndField, valueField)); + + reeVector.validate(); + } + } + + /** Create REE vector with constant value. */ + @Test + public void testConstantValueVector() throws Exception { + final FieldType valueType = FieldType.notNullable(Types.MinorType.BIGINT.getType()); + final FieldType runEndType = FieldType.notNullable(Types.MinorType.INT.getType()); + + final Field valueField = new Field("value", valueType, null); + final Field runEndField = new Field("ree", runEndType, null); + final Field runEndEncodedField = + new Field( + "constant", + FieldType.notNullable(RunEndEncoded.INSTANCE), + List.of(runEndField, valueField)); + + try (RunEndEncodedVector reeVector = + new RunEndEncodedVector(runEndEncodedField, allocator, null)) { + int runCount = 1; + int logicalValueCount = 100; + + reeVector.allocateNew(); + reeVector.setInitialCapacity(runCount); + ((BigIntVector) reeVector.getValuesVector()).set(0, 65536); + ((IntVector) reeVector.getRunEndsVector()).set(0, logicalValueCount); + reeVector.getValuesVector().setValueCount(runCount); + reeVector.getRunEndsVector().setValueCount(runCount); + reeVector.setValueCount(logicalValueCount); + + assertEquals(logicalValueCount, reeVector.getValueCount()); + for (int i = 0; i < logicalValueCount; i++) { + assertEquals(65536L, reeVector.getObject(i)); + } + } + } + + /** Create REE vector representing: [1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5]. */ + @Test + public void testBasicRunEndEncodedVector() throws Exception { + + final FieldType valueType = FieldType.notNullable(Types.MinorType.BIGINT.getType()); + final FieldType runEndType = FieldType.notNullable(Types.MinorType.INT.getType()); + + final Field valueField = new Field("value", valueType, null); + final Field runEndField = new Field("ree", runEndType, null); + final Field runEndEncodedField = + new Field( + "ree", FieldType.notNullable(RunEndEncoded.INSTANCE), List.of(runEndField, valueField)); + + try (RunEndEncodedVector reeVector = + new RunEndEncodedVector(runEndEncodedField, allocator, null)) { + int runCount = 5; + reeVector.allocateNew(); + reeVector.setInitialCapacity(runCount); + int end = 0; + for (int i = 1; i <= runCount; i++) { + end += i; + ((BigIntVector) reeVector.getValuesVector()).set(i - 1, i); + ((IntVector) reeVector.getRunEndsVector()).set(i - 1, end); + } + + reeVector.getValuesVector().setValueCount(runCount); + reeVector.getRunEndsVector().setValueCount(runCount); + reeVector.setValueCount(end); + + assertEquals(15, reeVector.getValueCount()); + int index = 0; + for (int i = 1; i < runCount + 1; i++) { + for (int j = 0; j < i; j++) { + assertEquals((long) i, reeVector.getObject(index)); + index++; + } + } + } + } +}