Skip to content

Commit

Permalink
GH-38168: [Java] Fix multi-batch Dictionary in Arrow{Reader|Writer}
Browse files Browse the repository at this point in the history
When manually writing dictionary vectors and writing multiple batches in a single `ArrowFileWriter`,
only the first dictionary batch was written and subsequent batches were ignored.
On reading, the `ArrowFileReader` would load only the first batch and use that batch for decoding
subsequent batches, resulting in errors or incorrect decodings.
This patch will now flush the dictionaries on each batch write and load the batches for the
dictionaries on read.
Following the docs at https://arrow.apache.org/docs/format/Columnar.html#dictionary-messages.

Note that this does not address the delta dictionary encoding issue as the writer does not
currently havea means of setting the delta flag. Neither does it allow for streaming writes
of dictionaries (though the unit tests show a work-around).

Fix for #38168
  • Loading branch information
manolama committed Oct 9, 2023
1 parent fae16c8 commit 92e2c73
Show file tree
Hide file tree
Showing 3 changed files with 375 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,6 @@ public void initialize() throws IOException {
if (footer.getRecordBatches().size() == 0) {
return;
}
// Read and load all dictionaries from schema
for (int i = 0; i < dictionaries.size(); i++) {
ArrowDictionaryBatch dictionaryBatch = readDictionary();
loadDictionary(dictionaryBatch);
}
}

/**
Expand Down Expand Up @@ -164,6 +159,13 @@ public boolean loadNextBatch() throws IOException {
ArrowBlock block = footer.getRecordBatches().get(currentRecordBatch++);
ArrowRecordBatch batch = readRecordBatch(in, block, allocator);
loadRecordBatch(batch);

// Read and load all dictionaries from schema
for (int i = 0; i < dictionaries.size(); i++) {
ArrowDictionaryBatch dictionaryBatch = readDictionary();
loadDictionary(dictionaryBatch);
}

return true;
} else {
return false;
Expand All @@ -185,14 +187,16 @@ public List<ArrowBlock> getRecordBlocks() throws IOException {
}

/**
* Loads record batch for the given block.
* Loads record batch and dictionaries for the given block.
*/
public boolean loadRecordBatch(ArrowBlock block) throws IOException {
ensureInitialized();
int blockIndex = footer.getRecordBatches().indexOf(block);
if (blockIndex == -1) {
throw new IllegalArgumentException("Arrow block does not exist in record batches: " + block);
}

currentDictionaryBatch = blockIndex * dictionaries.size();
currentRecordBatch = blockIndex;
return loadNextBatch();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public class ArrowFileWriter extends ArrowWriter {
private final List<ArrowBlock> recordBlocks = new ArrayList<>();

private Map<String, String> metaData;
private boolean dictionariesWritten = false;

public ArrowFileWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out) {
super(root, provider, out);
Expand Down Expand Up @@ -129,12 +128,6 @@ protected void endInternal(WriteChannel out) throws IOException {
@Override
protected void ensureDictionariesWritten(DictionaryProvider provider, Set<Long> dictionaryIdsUsed)
throws IOException {
if (dictionariesWritten) {
return;
}
dictionariesWritten = true;
// Write out all dictionaries required.
// Replacement dictionaries are not supported in the IPC file format.
for (long id : dictionaryIdsUsed) {
Dictionary dictionary = provider.lookup(id);
writeDictionaryBatch(dictionary);
Expand Down
Loading

0 comments on commit 92e2c73

Please sign in to comment.