Skip to content

Commit

Permalink
apacheGH-38414 [Java] [Vector] Add Delta dictionary support.
Browse files Browse the repository at this point in the history
Add a delta encoding flag to the DictionaryEncoding class.
Add a BaseDictionary interface (poor name but provides backwards compatibility) that
Dictionary implements it and a new BatchedDictionary class that handles writing data
to a dictionary and index allowing for flushing in a writer writeBatch call and either
replacing or delta encoding the dictionary.

Fix for apache#38414
  • Loading branch information
manolama committed Oct 24, 2023
1 parent 9e7991b commit 5713286
Show file tree
Hide file tree
Showing 33 changed files with 1,424 additions and 127 deletions.
4 changes: 2 additions & 2 deletions java/c/src/main/java/org/apache/arrow/c/ArrayExporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.BaseDictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;

Expand Down Expand Up @@ -110,7 +110,7 @@ void export(ArrowArray array, FieldVector vector, DictionaryProvider dictionaryP
}

if (dictionaryEncoding != null) {
Dictionary dictionary = dictionaryProvider.lookup(dictionaryEncoding.getId());
BaseDictionary dictionary = dictionaryProvider.lookup(dictionaryEncoding.getId());
checkNotNull(dictionary, "Dictionary lookup failed on export of dictionary encoded array");

data.dictionary = ArrowArray.allocateNew(allocator);
Expand Down
4 changes: 2 additions & 2 deletions java/c/src/main/java/org/apache/arrow/c/ArrayImporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.BaseDictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
Expand Down Expand Up @@ -103,7 +103,7 @@ private void doImport(ArrowArray.Snapshot snapshot) {
DictionaryEncoding encoding = vector.getField().getDictionary();
checkNotNull(encoding, "Missing encoding on import of ArrowArray with dictionary");

Dictionary dictionary = dictionaryProvider.lookup(encoding.getId());
BaseDictionary dictionary = dictionaryProvider.lookup(encoding.getId());
checkNotNull(dictionary, "Dictionary lookup failed on import of ArrowArray with dictionary");

// reset the dictionary vector to the initial state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.stream.Collectors;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.dictionary.BaseDictionary;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.types.pojo.Schema;
Expand All @@ -52,12 +53,12 @@ final class ArrowArrayStreamReader extends ArrowReader {
}

@Override
public Map<Long, Dictionary> getDictionaryVectors() {
public Map<Long, BaseDictionary> getDictionaryVectors() {
return provider.getDictionaryIds().stream().collect(Collectors.toMap(Function.identity(), provider::lookup));
}

@Override
public Dictionary lookup(long id) {
public BaseDictionary lookup(long id) {
return provider.lookup(id);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
import java.util.Map;
import java.util.Set;

import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.BaseDictionary;
import org.apache.arrow.vector.dictionary.BatchedDictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;

/**
Expand All @@ -39,14 +40,14 @@
*/
public class CDataDictionaryProvider implements DictionaryProvider, AutoCloseable {

private final Map<Long, Dictionary> map;
private final Map<Long, BaseDictionary> map;

public CDataDictionaryProvider() {
this.map = new HashMap<>();
}

void put(Dictionary dictionary) {
Dictionary previous = map.put(dictionary.getEncoding().getId(), dictionary);
void put(BaseDictionary dictionary) {
BaseDictionary previous = map.put(dictionary.getEncoding().getId(), dictionary);
if (previous != null) {
previous.getVector().close();
}
Expand All @@ -58,16 +59,25 @@ public final Set<Long> getDictionaryIds() {
}

@Override
public Dictionary lookup(long id) {
public BaseDictionary lookup(long id) {
return map.get(id);
}

@Override
public void close() {
for (Dictionary dictionary : map.values()) {
for (BaseDictionary dictionary : map.values()) {
dictionary.getVector().close();
}
map.clear();
}

@Override
public void resetBatchedDictionaries() {
map.values().forEach( dictionary -> {
if (dictionary instanceof BatchedDictionary) {
((BatchedDictionary) dictionary).reset();
}
});
}

}
4 changes: 2 additions & 2 deletions java/c/src/main/java/org/apache/arrow/c/SchemaExporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.arrow.c.jni.PrivateData;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.BaseDictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
import org.apache.arrow.vector.types.pojo.Field;
Expand Down Expand Up @@ -95,7 +95,7 @@ void export(ArrowSchema schema, Field field, DictionaryProvider dictionaryProvid
}

if (dictionaryEncoding != null) {
Dictionary dictionary = dictionaryProvider.lookup(dictionaryEncoding.getId());
BaseDictionary dictionary = dictionaryProvider.lookup(dictionaryEncoding.getId());
checkNotNull(dictionary, "Dictionary lookup failed on export of field with dictionary");

data.dictionary = ArrowSchema.allocateNew(allocator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.BaseDictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
import org.apache.arrow.vector.ipc.message.IpcOption;
Expand Down Expand Up @@ -66,7 +66,7 @@ static Schema generateSchemaMessages(final Schema originalSchema, final FlightDe
}
// Create and write dictionary batches
for (Long id : dictionaryIds) {
final Dictionary dictionary = provider.lookup(id);
final BaseDictionary dictionary = provider.lookup(id);
final FieldVector vector = dictionary.getVector();
final int count = vector.getValueCount();
// Do NOT close this root, as it does not actually own the vector.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.dictionary.BaseDictionary;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
Expand Down Expand Up @@ -268,7 +269,7 @@ public boolean next() {
if (dictionaries == null) {
throw new IllegalStateException("Dictionary ownership was claimed by the application.");
}
final Dictionary dictionary = dictionaries.lookup(id);
final BaseDictionary dictionary = dictionaries.lookup(id);
if (dictionary == null) {
throw new IllegalArgumentException("Dictionary not defined in schema: ID " + id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,11 @@ private void verifyHashCodeNotEqual(ArrowBuf buf1, byte[] ba1, int offset1, int
@Parameterized.Parameters(name = "hasher = {0}")
public static Collection<Object[]> getHasher() {
return Arrays.asList(
new Object[] {SimpleHasher.class.getSimpleName(),
SimpleHasher.INSTANCE},
new Object[] {MurmurHasher.class.getSimpleName(),
new MurmurHasher()
}
new Object[] {SimpleHasher.class.getSimpleName(),
SimpleHasher.INSTANCE},
new Object[] {MurmurHasher.class.getSimpleName(),
new MurmurHasher()
}
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.vector.dictionary;

import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;

/**
* Interface for all dictionary types.
*/
public interface BaseDictionary {

/**
* The dictionary vector containing unique entries.
*/
FieldVector getVector();

/**
* The encoding used for the dictionary vector.
*/
DictionaryEncoding getEncoding();

/**
* The type of the dictionary vector.
*/
ArrowType getVectorType();

}
Loading

0 comments on commit 5713286

Please sign in to comment.