Skip to content

Commit

Permalink
remove loadNextBatch method in ArrowCompressedStreamReader
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf authored and zhouyuan committed Mar 1, 2021
1 parent 75aa10c commit 1f294a8
Showing 1 changed file with 0 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,47 +70,6 @@ protected void initialize() throws IOException {
this.dictionaries = Collections.unmodifiableMap(dictionaries);
}

/**
* Load the next ArrowRecordBatch to the vector schema root if available.
*
* @return true if a batch was read, false on EOS
* @throws IOException on error
*/
public boolean loadNextBatch() throws IOException {
prepareLoadNextBatch();
MessageResult result = messageReader.readNext();

// Reached EOS
if (result == null) {
return false;
}
// Get the compress type from customMetadata. Currently the customMetadata only have one entry.
compressType = result.getMessage().customMetadata(0).value();

if (result.getMessage().headerType() == MessageHeader.RecordBatch) {
ArrowBuf bodyBuffer = result.getBodyBuffer();

// For zero-length batches, need an empty buffer to deserialize the batch
if (bodyBuffer == null) {
bodyBuffer = allocator.getEmpty();
}

ArrowRecordBatch batch = MessageSerializer.deserializeRecordBatch(result.getMessage(), bodyBuffer);
loadRecordBatch(batch);
checkDictionaries();
return true;
} else if (result.getMessage().headerType() == MessageHeader.DictionaryBatch) {
// if it's dictionary message, read dictionary message out and continue to read unless get a batch or eos.
ArrowDictionaryBatch dictionaryBatch = readDictionary(result);
loadDictionary(dictionaryBatch);
loadedDictionaryCount++;
return loadNextBatch();
} else {
throw new IOException("Expected RecordBatch or DictionaryBatch but header was " +
result.getMessage().headerType());
}
}

@Override
protected void loadRecordBatch(ArrowRecordBatch batch) {
try {
Expand Down

0 comments on commit 1f294a8

Please sign in to comment.