Skip to content

Commit

Permalink
Proof-of-concept of Java extension types
Browse files Browse the repository at this point in the history
  • Loading branch information
lidavidm committed Jun 18, 2019
1 parent adba5bb commit 7c740ed
Show file tree
Hide file tree
Showing 7 changed files with 619 additions and 10 deletions.
60 changes: 60 additions & 0 deletions java/vector/src/main/codegen/templates/ArrowType.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import java.util.Objects;

import org.apache.arrow.flatbuf.Type;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.types.*;
import org.apache.arrow.vector.FieldVector;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
Expand Down Expand Up @@ -108,6 +110,9 @@ public static interface ArrowTypeVisitor<T> {
<#list arrowTypes.types as type>
T visit(${type.name?remove_ending("_")} type);
</#list>
default T visit(ExtensionType type) {
return type.storageType().accept(this);
}
}

/**
Expand Down Expand Up @@ -246,6 +251,61 @@ public <T> T accept(ArrowTypeVisitor<T> visitor) {
}
</#list>

/**
* A user-defined data type that wraps an underlying storage type.
*/
public abstract static class ExtensionType extends ComplexType {
/** The on-wire type for this user-defined type. */
public abstract ArrowType storageType();
/** The name of this user-defined type. Used to identify the type during serialization. */
public abstract String extensionName();
/** Check equality of this type to another user-defined type. */
public abstract boolean extensionEquals(ExtensionType other);
/** Save any metadata for this type. */
public abstract String serialize();
/** Given saved metadata and the underlying storage type, construct a new instance of the user type. */
public abstract ArrowType deserialize(ArrowType storageType, String serializedData);
/** Construct a vector for the user type. */
public abstract FieldVector getNewVector(String name, FieldType fieldType, BufferAllocator allocator);

/** The field metadata key storing the name of the extension type. */
public static final String EXTENSION_METADATA_KEY_NAME = "ARROW:extension:name";
/** The field metadata key storing metadata for the extension type. */
public static final String EXTENSION_METADATA_KEY_METADATA = "ARROW:extension:metadata";

@Override
public ArrowTypeID getTypeID() {
return storageType().getTypeID();
}

@Override
public int getType(FlatBufferBuilder builder) {
return storageType().getType(builder);
}

public String toString() {
return "ExtensionType(" + extensionName() + ", " + storageType().toString() + ")";
}

@Override
public int hashCode() {
return java.util.Arrays.deepHashCode(new Object[] {storageType(), extensionName()});
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof ExtensionType)) {
return false;
}
return this.extensionEquals((ExtensionType) obj);
}

@Override
public <T> T accept(ArrowTypeVisitor<T> visitor) {
return visitor.visit(this);
}
}

public static org.apache.arrow.vector.types.pojo.ArrowType getTypeForField(org.apache.arrow.flatbuf.Field field) {
switch(field.typeType()) {
<#list arrowTypes.types as type>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.vector;

import java.util.Iterator;
import java.util.List;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.OutOfMemoryException;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.util.CallBack;
import org.apache.arrow.vector.util.TransferPair;

import io.netty.buffer.ArrowBuf;

/**
* A vector that wraps an underlying vector, used to help implement extension types.
* @param <T> The wrapped vector type.
*/
public abstract class ExtensionTypeVector<T extends BaseValueVector & FieldVector> extends BaseValueVector implements
FieldVector {

private final T underlyingVector;

public ExtensionTypeVector(String name, BufferAllocator allocator, T underlyingVector) {
super(name, allocator);
this.underlyingVector = underlyingVector;
}

/** Get the underlying vector. */
public T getUnderlyingVector() {
return underlyingVector;
}

@Override
public void allocateNew() throws OutOfMemoryException {
this.underlyingVector.allocateNew();
}

@Override
public boolean allocateNewSafe() {
return this.underlyingVector.allocateNewSafe();
}

@Override
public void reAlloc() {
this.underlyingVector.reAlloc();
}

@Override
public void setInitialCapacity(int numRecords) {
this.underlyingVector.setInitialCapacity(numRecords);
}

@Override
public int getValueCapacity() {
return this.underlyingVector.getValueCapacity();
}

@Override
public void reset() {
this.underlyingVector.reset();
}

@Override
public Field getField() {
return this.underlyingVector.getField();
}

@Override
public MinorType getMinorType() {
return MinorType.EXTENSIONTYPE;
}

@Override
public TransferPair getTransferPair(String ref, BufferAllocator allocator) {
return underlyingVector.getTransferPair(ref, allocator);
}

@Override
public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) {
return underlyingVector.getTransferPair(ref, allocator, callBack);
}

@Override
public TransferPair makeTransferPair(ValueVector target) {
return underlyingVector.makeTransferPair(target);
}

@Override
public FieldReader getReader() {
return underlyingVector.getReader();
}

@Override
public int getBufferSize() {
return underlyingVector.getBufferSize();
}

@Override
public int getBufferSizeFor(int valueCount) {
return underlyingVector.getBufferSizeFor(valueCount);
}

@Override
public ArrowBuf[] getBuffers(boolean clear) {
return underlyingVector.getBuffers(clear);
}

@Override
public ArrowBuf getValidityBuffer() {
return underlyingVector.getValidityBuffer();
}

@Override
public ArrowBuf getDataBuffer() {
return underlyingVector.getDataBuffer();
}

@Override
public ArrowBuf getOffsetBuffer() {
return underlyingVector.getOffsetBuffer();
}

@Override
public int getValueCount() {
return underlyingVector.getValueCount();
}

@Override
public void setValueCount(int valueCount) {
underlyingVector.setValueCount(valueCount);
}

/**
* Get the extension object at the specified index.
*
* <p>Generally, this should access the underlying vector and construct the corresponding Java object from the raw
* data.
*/
@Override
public abstract Object getObject(int index);

@Override
public int getNullCount() {
return underlyingVector.getNullCount();
}

@Override
public boolean isNull(int index) {
return underlyingVector.isNull(index);
}

@Override
public void initializeChildrenFromFields(List<Field> children) {
underlyingVector.initializeChildrenFromFields(children);
}

@Override
public List<FieldVector> getChildrenFromFields() {
return underlyingVector.getChildrenFromFields();
}

@Override
public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) {
underlyingVector.loadFieldBuffers(fieldNode, ownBuffers);
}

@Override
public List<ArrowBuf> getFieldBuffers() {
return underlyingVector.getFieldBuffers();
}

@Override
public List<BufferBacked> getFieldInnerVectors() {
return underlyingVector.getFieldInnerVectors();
}

@Override
public long getValidityBufferAddress() {
return underlyingVector.getValidityBufferAddress();
}

@Override
public long getDataBufferAddress() {
return underlyingVector.getDataBufferAddress();
}

@Override
public long getOffsetBufferAddress() {
return underlyingVector.getOffsetBufferAddress();
}

@Override
public void clear() {
underlyingVector.clear();
}

@Override
public void close() {
underlyingVector.close();
}

@Override
public TransferPair getTransferPair(BufferAllocator allocator) {
return underlyingVector.getTransferPair(allocator);
}

@Override
public Iterator<ValueVector> iterator() {
return underlyingVector.iterator();
}

@Override
public BufferAllocator getAllocator() {
return underlyingVector.getAllocator();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.arrow.vector.DateMilliVector;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.DurationVector;
import org.apache.arrow.vector.ExtensionTypeVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.arrow.vector.Float4Vector;
Expand Down Expand Up @@ -105,6 +106,7 @@
import org.apache.arrow.vector.types.pojo.ArrowType.Date;
import org.apache.arrow.vector.types.pojo.ArrowType.Decimal;
import org.apache.arrow.vector.types.pojo.ArrowType.Duration;
import org.apache.arrow.vector.types.pojo.ArrowType.ExtensionType;
import org.apache.arrow.vector.types.pojo.ArrowType.FixedSizeBinary;
import org.apache.arrow.vector.types.pojo.ArrowType.FixedSizeList;
import org.apache.arrow.vector.types.pojo.ArrowType.FloatingPoint;
Expand Down Expand Up @@ -710,7 +712,23 @@ public FieldVector getNewVector(
public FieldWriter getNewFieldWriter(ValueVector vector) {
return new TimeStampNanoTZWriterImpl((TimeStampNanoTZVector) vector);
}
};
},
EXTENSIONTYPE(null) {
@Override
public FieldVector getNewVector(
String name,
FieldType fieldType,
BufferAllocator allocator,
CallBack schemaChangeCallback) {
return ((ExtensionType) fieldType.getType()).getNewVector(name, fieldType, allocator);
}

@Override
public FieldWriter getNewFieldWriter(ValueVector vector) {
return ((ExtensionTypeVector) vector).getUnderlyingVector().getMinorType().getNewFieldWriter(vector);
}
},
;

private final ArrowType type;

Expand Down Expand Up @@ -889,6 +907,11 @@ public MinorType visit(Interval type) {
public MinorType visit(Duration type) {
return MinorType.DURATION;
}

@Override
public MinorType visit(ExtensionType type) {
return MinorType.EXTENSIONTYPE;
}
});
}

Expand Down
Loading

0 comments on commit 7c740ed

Please sign in to comment.