Skip to content

Commit

Permalink
GH-38414 [Java] [Vector] Add Delta dictionary support.
Browse files Browse the repository at this point in the history
Add a delta encoding flag to the DictionaryEncoding class.
Add a BaseDictionary interface (poor name but provides backwards compatibility) that
Dictionary implements it and a new BatchedDictionary class that handles writing data
to a dictionary and index allowing for flushing in a writer writeBatch call and either
replacing or delta encoding the dictionary.

Fix for #38414
  • Loading branch information
manolama committed Nov 8, 2023
1 parent 9e7991b commit c177d19
Show file tree
Hide file tree
Showing 39 changed files with 1,480 additions and 156 deletions.
4 changes: 2 additions & 2 deletions java/c/src/main/java/org/apache/arrow/c/ArrayExporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.BaseDictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;

Expand Down Expand Up @@ -110,7 +110,7 @@ void export(ArrowArray array, FieldVector vector, DictionaryProvider dictionaryP
}

if (dictionaryEncoding != null) {
Dictionary dictionary = dictionaryProvider.lookup(dictionaryEncoding.getId());
BaseDictionary dictionary = dictionaryProvider.lookup(dictionaryEncoding.getId());
checkNotNull(dictionary, "Dictionary lookup failed on export of dictionary encoded array");

data.dictionary = ArrowArray.allocateNew(allocator);
Expand Down
4 changes: 2 additions & 2 deletions java/c/src/main/java/org/apache/arrow/c/ArrayImporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.BaseDictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
Expand Down Expand Up @@ -103,7 +103,7 @@ private void doImport(ArrowArray.Snapshot snapshot) {
DictionaryEncoding encoding = vector.getField().getDictionary();
checkNotNull(encoding, "Missing encoding on import of ArrowArray with dictionary");

Dictionary dictionary = dictionaryProvider.lookup(encoding.getId());
BaseDictionary dictionary = dictionaryProvider.lookup(encoding.getId());
checkNotNull(dictionary, "Dictionary lookup failed on import of ArrowArray with dictionary");

// reset the dictionary vector to the initial state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.stream.Collectors;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.BaseDictionary;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.types.pojo.Schema;

Expand All @@ -52,12 +52,12 @@ final class ArrowArrayStreamReader extends ArrowReader {
}

@Override
public Map<Long, Dictionary> getDictionaryVectors() {
public Map<Long, BaseDictionary> getDictionaryVectors() {
return provider.getDictionaryIds().stream().collect(Collectors.toMap(Function.identity(), provider::lookup));
}

@Override
public Dictionary lookup(long id) {
public BaseDictionary lookup(long id) {
return provider.lookup(id);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
import java.util.Map;
import java.util.Set;

import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.BaseDictionary;
import org.apache.arrow.vector.dictionary.BatchedDictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;

/**
Expand All @@ -39,14 +40,14 @@
*/
public class CDataDictionaryProvider implements DictionaryProvider, AutoCloseable {

private final Map<Long, Dictionary> map;
private final Map<Long, BaseDictionary> map;

public CDataDictionaryProvider() {
this.map = new HashMap<>();
}

void put(Dictionary dictionary) {
Dictionary previous = map.put(dictionary.getEncoding().getId(), dictionary);
void put(BaseDictionary dictionary) {
BaseDictionary previous = map.put(dictionary.getEncoding().getId(), dictionary);
if (previous != null) {
previous.getVector().close();
}
Expand All @@ -58,16 +59,25 @@ public final Set<Long> getDictionaryIds() {
}

@Override
public Dictionary lookup(long id) {
public BaseDictionary lookup(long id) {
return map.get(id);
}

@Override
public void close() {
for (Dictionary dictionary : map.values()) {
for (BaseDictionary dictionary : map.values()) {
dictionary.getVector().close();
}
map.clear();
}

@Override
public void resetBatchedDictionaries() {
map.values().forEach( dictionary -> {
if (dictionary instanceof BatchedDictionary) {
((BatchedDictionary) dictionary).reset();
}
});
}

}
4 changes: 2 additions & 2 deletions java/c/src/main/java/org/apache/arrow/c/SchemaExporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.arrow.c.jni.PrivateData;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.BaseDictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
import org.apache.arrow.vector.types.pojo.Field;
Expand Down Expand Up @@ -95,7 +95,7 @@ void export(ArrowSchema schema, Field field, DictionaryProvider dictionaryProvid
}

if (dictionaryEncoding != null) {
Dictionary dictionary = dictionaryProvider.lookup(dictionaryEncoding.getId());
BaseDictionary dictionary = dictionaryProvider.lookup(dictionaryEncoding.getId());
checkNotNull(dictionary, "Dictionary lookup failed on export of field with dictionary");

data.dictionary = ArrowSchema.allocateNew(allocator);
Expand Down
7 changes: 4 additions & 3 deletions java/c/src/test/java/org/apache/arrow/c/StreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.compare.Range;
import org.apache.arrow.vector.compare.RangeEqualsVisitor;
import org.apache.arrow.vector.dictionary.BaseDictionary;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.ArrowReader;
Expand Down Expand Up @@ -244,7 +245,7 @@ void roundtrip(Schema schema, List<ArrowRecordBatch> batches, DictionaryProvider
}
assertThat(reader.loadNextBatch()).isFalse();
assertThat(reader.getDictionaryIds()).isEqualTo(provider.getDictionaryIds());
for (Map.Entry<Long, Dictionary> entry : reader.getDictionaryVectors().entrySet()) {
for (Map.Entry<Long, BaseDictionary> entry : reader.getDictionaryVectors().entrySet()) {
final FieldVector expected = provider.lookup(entry.getKey()).getVector();
final FieldVector actual = entry.getValue().getVector();
assertVectorsEqual(expected, actual);
Expand Down Expand Up @@ -286,7 +287,7 @@ static class InMemoryArrowReader extends ArrowReader {
}

@Override
public Dictionary lookup(long id) {
public BaseDictionary lookup(long id) {
return provider.lookup(id);
}

Expand All @@ -296,7 +297,7 @@ public Set<Long> getDictionaryIds() {
}

@Override
public Map<Long, Dictionary> getDictionaryVectors() {
public Map<Long, BaseDictionary> getDictionaryVectors() {
return getDictionaryIds().stream().collect(Collectors.toMap(Function.identity(), this::lookup));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.BaseDictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
import org.apache.arrow.vector.ipc.message.IpcOption;
Expand Down Expand Up @@ -66,7 +66,7 @@ static Schema generateSchemaMessages(final Schema originalSchema, final FlightDe
}
// Create and write dictionary batches
for (Long id : dictionaryIds) {
final Dictionary dictionary = provider.lookup(id);
final BaseDictionary dictionary = provider.lookup(id);
final FieldVector vector = dictionary.getVector();
final int count = vector.getValueCount();
// Do NOT close this root, as it does not actually own the vector.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.BaseDictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
Expand Down Expand Up @@ -268,7 +268,7 @@ public boolean next() {
if (dictionaries == null) {
throw new IllegalStateException("Dictionary ownership was claimed by the application.");
}
final Dictionary dictionary = dictionaries.lookup(id);
final BaseDictionary dictionary = dictionaries.lookup(id);
if (dictionary == null) {
throw new IllegalArgumentException("Dictionary not defined in schema: ID " + id);
}
Expand Down Expand Up @@ -410,12 +410,12 @@ public void onNext(ArrowMessage msg) {
}

final List<Field> fields = new ArrayList<>();
final Map<Long, Dictionary> dictionaryMap = new HashMap<>();
final Map<Long, BaseDictionary> dictionaryMap = new HashMap<>();
for (final Field originalField : schema.getFields()) {
final Field updatedField = DictionaryUtility.toMemoryFormat(originalField, allocator, dictionaryMap);
fields.add(updatedField);
}
for (final Map.Entry<Long, Dictionary> entry : dictionaryMap.entrySet()) {
for (final Map.Entry<Long, BaseDictionary> entry : dictionaryMap.entrySet()) {
dictionaries.put(entry.getValue());
}
schema = new Schema(fields, schema.getCustomMetadata());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public int hashCode(ArrowBuf buf, long offset, long length) {

@Override
public int hashCode(byte[] buf, int offset, int length) {
return 0;
throw new UnsupportedOperationException("Not used in UT.");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,11 @@ private void verifyHashCodeNotEqual(ArrowBuf buf1, byte[] ba1, int offset1, int
@Parameterized.Parameters(name = "hasher = {0}")
public static Collection<Object[]> getHasher() {
return Arrays.asList(
new Object[] {SimpleHasher.class.getSimpleName(),
SimpleHasher.INSTANCE},
new Object[] {MurmurHasher.class.getSimpleName(),
new MurmurHasher()
}
new Object[] {SimpleHasher.class.getSimpleName(),
SimpleHasher.INSTANCE},
new Object[] {MurmurHasher.class.getSimpleName(),
new MurmurHasher()
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.impl.UnionListWriter;
import org.apache.arrow.vector.dictionary.BaseDictionary;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider;
Expand Down Expand Up @@ -207,7 +208,7 @@ public void testFlatDictionary() throws IOException {
Assert.assertEquals(1, readVector.getObject(4));
Assert.assertEquals(2, readVector.getObject(5));

Dictionary dictionary = reader.lookup(1L);
BaseDictionary dictionary = reader.lookup(1L);
Assert.assertNotNull(dictionary);
VarCharVector dictionaryVector = ((VarCharVector) dictionary.getVector());
Assert.assertEquals(3, dictionaryVector.getValueCount());
Expand Down Expand Up @@ -289,7 +290,7 @@ public void testNestedDictionary() throws IOException {
Assert.assertEquals(Arrays.asList(0), readVector.getObject(1));
Assert.assertEquals(Arrays.asList(1), readVector.getObject(2));

Dictionary readDictionary = reader.lookup(2L);
BaseDictionary readDictionary = reader.lookup(2L);
Assert.assertNotNull(readDictionary);
VarCharVector dictionaryVector = ((VarCharVector) readDictionary.getVector());
Assert.assertEquals(2, dictionaryVector.getValueCount());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.vector.dictionary;

import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;

/**
* Base interface for various dictionary implementations. Implementations include
* {@link Dictionary} for encoding a complete vector and {@link BatchedDictionary}
* for continuous encoding of a vector.
* These methods provide means of accessing the dictionary vector containing the
* encoded data.
*/
public interface BaseDictionary {

/**
* The dictionary vector containing unique entries.
*/
FieldVector getVector();

/**
* The encoding used for the dictionary vector.
*/
DictionaryEncoding getEncoding();

/**
* The type of the dictionary vector.
*/
ArrowType getVectorType();

}
Loading

0 comments on commit c177d19

Please sign in to comment.