Skip to content

Commit

Permalink
Rename to Opaque
Browse files Browse the repository at this point in the history
  • Loading branch information
lidavidm committed Jun 27, 2024
1 parent 0c932b5 commit 25926fa
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.extension.UnknownType;
import org.apache.arrow.vector.extension.OpaqueType;
import org.apache.arrow.vector.types.DateUnit;
import org.apache.arrow.vector.types.TimeUnit;
import org.apache.arrow.vector.types.pojo.ArrowType;
Expand Down Expand Up @@ -225,18 +225,18 @@ public static ArrowType getArrowTypeFromJdbcType(

/**
* Wrap a JDBC to Arrow type converter such that {@link UnsupportedOperationException} becomes
* {@link org.apache.arrow.vector.extension.UnknownType}.
* {@link OpaqueType}.
*
* @param typeConverter The type converter to wrap.
* @param vendorName The database name to report as the Unknown type's vendor name.
* @param vendorName The database name to report as the Opaque type's vendor name.
*/
public static Function<JdbcFieldInfo, ArrowType> reportUnsupportedTypesAsUnknown(
public static Function<JdbcFieldInfo, ArrowType> reportUnsupportedTypesAsOpaque(
Function<JdbcFieldInfo, ArrowType> typeConverter, String vendorName) {
return (final JdbcFieldInfo fieldInfo) -> {
try {
return typeConverter.apply(fieldInfo);
} catch (UnsupportedOperationException e) {
return new UnknownType(MinorType.NULL.getType(), fieldInfo.getTypeName(), vendorName);
return new OpaqueType(MinorType.NULL.getType(), fieldInfo.getTypeName(), vendorName);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.extension.UnknownType;
import org.apache.arrow.vector.extension.OpaqueType;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
Expand Down Expand Up @@ -204,7 +204,7 @@ public void testJdbcSchemaMetadata(Table table) throws SQLException, ClassNotFou
}

@Test
void testUnknownType() throws SQLException, ClassNotFoundException {
void testOpaqueType() throws SQLException, ClassNotFoundException {
try (BufferAllocator allocator = new RootAllocator()) {
String url = "jdbc:h2:mem:JdbcToArrowTest";
String driver = "org.h2.Driver";
Expand All @@ -222,7 +222,7 @@ void testUnknownType() throws SQLException, ClassNotFoundException {
new JdbcToArrowConfigBuilder()
.setAllocator(allocator)
.setJdbcToArrowTypeConverter(
JdbcToArrowUtils.reportUnsupportedTypesAsUnknown(typeConverter, "H2"))
JdbcToArrowUtils.reportUnsupportedTypesAsOpaque(typeConverter, "H2"))
.build();
Schema schema;
try (Statement stmt = conn.createStatement();
Expand All @@ -235,7 +235,7 @@ void testUnknownType() throws SQLException, ClassNotFoundException {
new Schema(
List.of(
Field.nullable(
"A", new UnknownType(Types.MinorType.NULL.getType(), "GEOMETRY", "H2")),
"A", new OpaqueType(Types.MinorType.NULL.getType(), "GEOMETRY", "H2")),
Field.nullable("B", Types.MinorType.INT.getType())));
assertEquals(expected, schema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,13 @@
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;

/** Unknown represents */
public class UnknownType extends ArrowType.ExtensionType {
/**
* Opaque is a placeholder for a type from an external (usually non-Arrow) system that could not be
* interpreted.
*/
public class OpaqueType extends ArrowType.ExtensionType {
private static final AtomicBoolean registered = new AtomicBoolean(false);
public static final String EXTENSION_NAME = "arrow.unknown";
public static final String EXTENSION_NAME = "arrow.opaque";
private final ArrowType storageType;
private final String typeName;
private final String vendorName;
Expand All @@ -76,7 +79,7 @@ public class UnknownType extends ArrowType.ExtensionType {
public static void ensureRegistered() {
if (!registered.getAndSet(true)) {
// The values don't matter, we just need an instance
ExtensionTypeRegistry.register(new UnknownType(Types.MinorType.NULL.getType(), "", ""));
ExtensionTypeRegistry.register(new OpaqueType(Types.MinorType.NULL.getType(), "", ""));
}
}

Expand All @@ -87,7 +90,7 @@ public static void ensureRegistered() {
* @param typeName The name of the unknown type.
* @param vendorName The name of the originating system of the unknown type.
*/
public UnknownType(ArrowType storageType, String typeName, String vendorName) {
public OpaqueType(ArrowType storageType, String typeName, String vendorName) {
this.storageType = Objects.requireNonNull(storageType, "storageType");
this.typeName = Objects.requireNonNull(typeName, "typeName");
this.vendorName = Objects.requireNonNull(vendorName, "vendorName");
Expand Down Expand Up @@ -115,10 +118,10 @@ public String extensionName() {
public boolean extensionEquals(ExtensionType other) {
return other != null
&& EXTENSION_NAME.equals(other.extensionName())
&& other instanceof UnknownType
&& other instanceof OpaqueType
&& storageType.equals(other.storageType())
&& typeName.equals(((UnknownType) other).typeName())
&& vendorName.equals(((UnknownType) other).vendorName());
&& typeName.equals(((OpaqueType) other).typeName())
&& vendorName.equals(((OpaqueType) other).vendorName());
}

@Override
Expand Down Expand Up @@ -157,7 +160,7 @@ public ArrowType deserialize(ArrowType storageType, String serializedData) {
if (!vendorName.isTextual()) {
throw new InvalidExtensionMetadataException("vendorName should be string, was " + vendorName);
}
return new UnknownType(storageType, typeName.asText(), vendorName.asText());
return new OpaqueType(storageType, typeName.asText(), vendorName.asText());
}

@Override
Expand All @@ -166,7 +169,7 @@ public FieldVector getNewVector(String name, FieldType fieldType, BufferAllocato
final Field field = new Field(name, fieldType, Collections.emptyList());
final FieldVector underlyingVector =
storageType.accept(new UnderlyingVectorTypeVisitor(name, allocator));
return new UnknownVector(field, allocator, underlyingVector);
return new OpaqueVector(field, allocator, underlyingVector);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,23 @@
import org.apache.arrow.vector.ValueIterableVector;
import org.apache.arrow.vector.types.pojo.Field;

public class UnknownVector extends ExtensionTypeVector<FieldVector>
/**
* Opaque is a wrapper for (usually binary) data from an external (often non-Arrow) system that
* could not be interpreted.
*/
public class OpaqueVector extends ExtensionTypeVector<FieldVector>
implements ValueIterableVector<Object> {
public UnknownVector(String name, BufferAllocator allocator, FieldVector underlyingVector) {
super(name, allocator, underlyingVector);
}
private final Field field;

public UnknownVector(Field field, BufferAllocator allocator, FieldVector underlyingVector) {
public OpaqueVector(Field field, BufferAllocator allocator, FieldVector underlyingVector) {
super(field, allocator, underlyingVector);
this.field = field;
}

// TODO: getField
@Override
public Field getField() {
return field;
}

@Override
public Object getObject(int index) {
Expand All @@ -42,13 +48,11 @@ public Object getObject(int index) {

@Override
public int hashCode(int index) {
// TODO:
return 0;
return hashCode(index, null);
}

@Override
public int hashCode(int index, ArrowBufHasher hasher) {
// TODO:
return 0;
return getUnderlyingVector().hashCode(index, hasher);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,43 @@
*/
package org.apache.arrow.vector;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.stream.Stream;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.extension.InvalidExtensionMetadataException;
import org.apache.arrow.vector.extension.UnknownType;
import org.apache.arrow.vector.extension.OpaqueType;
import org.apache.arrow.vector.extension.OpaqueVector;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

class TestUnknownExtensionType {
class TestOpaqueExtensionType {
BufferAllocator allocator;

@BeforeEach
Expand All @@ -62,7 +75,7 @@ void afterEach() {
})
void testDeserializeValid(String serialized) {
ArrowType storageType = Types.MinorType.NULL.getType();
UnknownType type = new UnknownType(storageType, "", "");
OpaqueType type = new OpaqueType(storageType, "", "");

assertDoesNotThrow(() -> type.deserialize(storageType, serialized));
}
Expand All @@ -81,7 +94,7 @@ void testDeserializeValid(String serialized) {
})
void testDeserializeInvalid(String serialized) {
ArrowType storageType = Types.MinorType.NULL.getType();
UnknownType type = new UnknownType(storageType, "", "");
OpaqueType type = new OpaqueType(storageType, "", "");

assertThrows(
InvalidExtensionMetadataException.class, () -> type.deserialize(storageType, serialized));
Expand All @@ -90,7 +103,7 @@ void testDeserializeInvalid(String serialized) {
@ParameterizedTest
@MethodSource("storageType")
void testRoundTrip(ArrowType storageType) {
UnknownType type = new UnknownType(storageType, "foo", "bar");
OpaqueType type = new OpaqueType(storageType, "foo", "bar");
assertEquals(storageType, type.storageType());
assertEquals("foo", type.typeName());
if (storageType.isComplex()) {
Expand All @@ -103,29 +116,71 @@ void testRoundTrip(ArrowType storageType) {
}

String serialized = assertDoesNotThrow(type::serialize);
UnknownType holder = new UnknownType(Types.MinorType.NULL.getType(), "", "");
UnknownType deserialized = (UnknownType) holder.deserialize(storageType, serialized);
OpaqueType holder = new OpaqueType(Types.MinorType.NULL.getType(), "", "");
OpaqueType deserialized = (OpaqueType) holder.deserialize(storageType, serialized);
assertEquals(type, deserialized);
assertNotEquals(holder, deserialized);
}

@ParameterizedTest
@MethodSource("storageType")
void testIpcRoundTrip(ArrowType storageType) {
UnknownType.ensureRegistered();
OpaqueType.ensureRegistered();

UnknownType type = new UnknownType(storageType, "foo", "bar");
OpaqueType type = new OpaqueType(storageType, "foo", "bar");
Schema schema = new Schema(Collections.singletonList(Field.nullable("unknown", type)));
byte[] serialized = schema.serializeAsMessage();
Schema deseralized = Schema.deserializeMessage(ByteBuffer.wrap(serialized));
assertEquals(schema, deseralized);
}

@Test
void testVectorType() throws IOException {
OpaqueType.ensureRegistered();

ArrowType storageType = Types.MinorType.VARBINARY.getType();
OpaqueType type = new OpaqueType(storageType, "foo", "bar");
try (FieldVector vector = type.getNewVector("field", FieldType.nullable(type), allocator)) {
OpaqueVector opaque = assertInstanceOf(OpaqueVector.class, vector);
assertEquals("field", opaque.getField().getName());
assertEquals(type, opaque.getField().getType());

VarBinaryVector binary =
assertInstanceOf(VarBinaryVector.class, opaque.getUnderlyingVector());
binary.setSafe(0, new byte[] {0, 1, 2, 3});
binary.setNull(1);
opaque.setValueCount(2);

ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(opaque));
ArrowStreamWriter writer =
new ArrowStreamWriter(root, new DictionaryProvider.MapDictionaryProvider(), baos)) {
writer.start();
writer.writeBatch();
}

try (ArrowStreamReader reader =
new ArrowStreamReader(new ByteArrayInputStream(baos.toByteArray()), allocator)) {
assertTrue(reader.loadNextBatch());
VectorSchemaRoot root = reader.getVectorSchemaRoot();
assertEquals(2, root.getRowCount());
assertEquals(new Schema(Collections.singletonList(opaque.getField())), root.getSchema());

OpaqueVector actual = assertInstanceOf(OpaqueVector.class, root.getVector("field"));
assertFalse(actual.isNull(0));
assertTrue(actual.isNull(1));
assertArrayEquals(new byte[] {0, 1, 2, 3}, (byte[]) actual.getObject(0));
assertNull(actual.getObject(1));
}
}
}

static Stream<ArrowType> storageType() {
return Stream.of(
Types.MinorType.NULL.getType(),
Types.MinorType.BIGINT.getType(),
Types.MinorType.BIT.getType(),
Types.MinorType.VARBINARY.getType(),
Types.MinorType.VARCHAR.getType(),
Types.MinorType.LIST.getType(),
new ArrowType.Decimal(12, 4, 128));
Expand Down

0 comments on commit 25926fa

Please sign in to comment.