Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-501] Remove ArrowRecordBatchBuilder and its usages (#502)
Browse files Browse the repository at this point in the history
Closes #501
  • Loading branch information
zhztheplayer authored Sep 13, 2021
1 parent 584646f commit bafbaf9
Show file tree
Hide file tree
Showing 12 changed files with 184 additions and 470 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
import java.io.IOException;
import java.util.List;

import com.intel.oap.vectorized.ArrowRecordBatchBuilder;
import com.intel.oap.vectorized.ArrowRecordBatchBuilderImpl;
import org.apache.arrow.dataset.jni.UnsafeRecordBatchSerializer;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorLoader;
Expand Down Expand Up @@ -111,14 +110,13 @@ public Schema getSchema() throws IOException {
* @throws IOException throws io exception in case of native failure
*/
public ArrowRecordBatch readNext() throws IOException {
ArrowRecordBatchBuilder recordBatchBuilder =
byte[] serializedBatch =
jniWrapper.nativeReadNext(nativeInstanceId);
if (recordBatchBuilder == null) {
if (serializedBatch == null) {
return null;
}
ArrowRecordBatchBuilderImpl recordBatchBuilderImpl =
new ArrowRecordBatchBuilderImpl(recordBatchBuilder);
ArrowRecordBatch batch = recordBatchBuilderImpl.build();
ArrowRecordBatch batch = UnsafeRecordBatchSerializer.deserializeUnsafe(allocator,
serializedBatch);
if (batch == null) {
throw new IllegalArgumentException("failed to build record batch");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package com.intel.oap.datasource.parquet;

import com.intel.oap.vectorized.ArrowRecordBatchBuilder;
import com.intel.oap.vectorized.JniUtils;

import java.io.IOException;
Expand Down Expand Up @@ -76,7 +75,7 @@ public native void nativeInitParquetReader2(
* @param id parquet reader instance number
* @throws IOException throws exception in case of any io exception in native codes
*/
public native ArrowRecordBatchBuilder nativeReadNext(long id) throws IOException;
public native byte[] nativeReadNext(long id) throws IOException;

/**
* Get schema from parquet file reader.
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package com.intel.oap.vectorized;

import java.io.IOException;

import org.apache.arrow.dataset.jni.UnsafeRecordBatchSerializer;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.Schema;
import java.util.List;
Expand All @@ -28,16 +31,17 @@
import org.apache.arrow.vector.ipc.WriteChannel;
import org.apache.arrow.vector.ipc.message.ArrowBuffer;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils;

public class BatchIterator implements AutoCloseable {
private native boolean nativeHasNext(long nativeHandler);
private native ArrowRecordBatchBuilder nativeNext(long nativeHandler);
private native byte[] nativeNext(long nativeHandler);
private native MetricsObject nativeFetchMetrics(long nativeHandler);
private native ArrowRecordBatchBuilder nativeProcess(long nativeHandler,
private native byte[] nativeProcess(long nativeHandler,
byte[] schemaBuf, int numRows, long[] bufAddrs, long[] bufSizes);
private native void nativeProcessAndCacheOne(long nativeHandler, byte[] schemaBuf,
int numRows, long[] bufAddrs, long[] bufSizes);
private native ArrowRecordBatchBuilder nativeProcessWithSelection(long nativeHandler,
private native byte[] nativeProcessWithSelection(long nativeHandler,
byte[] schemaBuf, int numRows, long[] bufAddrs, long[] bufSizes,
int selectionVectorRecordCount, long selectionVectorAddr, long selectionVectorSize);
private native void nativeProcessAndCacheOneWithSelection(long nativeHandler,
Expand All @@ -64,16 +68,16 @@ public boolean hasNext() throws IOException {
}

public ArrowRecordBatch next() throws IOException {
BufferAllocator allocator = SparkMemoryUtils.contextAllocator();
if (nativeHandler == 0) {
return null;
}
ArrowRecordBatchBuilder resRecordBatchBuilder = nativeNext(nativeHandler);
if (resRecordBatchBuilder == null) {
byte[] serializedRecordBatch = nativeNext(nativeHandler);
if (serializedRecordBatch == null) {
return null;
}
ArrowRecordBatchBuilderImpl resRecordBatchBuilderImpl =
new ArrowRecordBatchBuilderImpl(resRecordBatchBuilder);
return resRecordBatchBuilderImpl.build();
return UnsafeRecordBatchSerializer.deserializeUnsafe(allocator,
serializedRecordBatch);
}

public MetricsObject getMetrics() throws IOException, ClassNotFoundException {
Expand Down Expand Up @@ -128,24 +132,24 @@ public ArrowRecordBatch process(Schema schema, ArrowRecordBatch recordBatch,
if (nativeHandler == 0) {
return null;
}
ArrowRecordBatchBuilder resRecordBatchBuilder;
BufferAllocator allocator = SparkMemoryUtils.contextAllocator();
byte[] serializedRecordBatch;
if (selectionVector != null) {
int selectionVectorRecordCount = selectionVector.getRecordCount();
long selectionVectorAddr = selectionVector.getBuffer().memoryAddress();
long selectionVectorSize = selectionVector.getBuffer().capacity();
resRecordBatchBuilder = nativeProcessWithSelection(nativeHandler,
serializedRecordBatch = nativeProcessWithSelection(nativeHandler,
getSchemaBytesBuf(schema), num_rows, bufAddrs, bufSizes,
selectionVectorRecordCount, selectionVectorAddr, selectionVectorSize);
} else {
resRecordBatchBuilder = nativeProcess(
serializedRecordBatch = nativeProcess(
nativeHandler, getSchemaBytesBuf(schema), num_rows, bufAddrs, bufSizes);
}
if (resRecordBatchBuilder == null) {
if (serializedRecordBatch == null) {
return null;
}
ArrowRecordBatchBuilderImpl resRecordBatchBuilderImpl =
new ArrowRecordBatchBuilderImpl(resRecordBatchBuilder);
return resRecordBatchBuilderImpl.build();
return UnsafeRecordBatchSerializer.deserializeUnsafe(allocator,
serializedRecordBatch);
}

public void processAndCacheOne(Schema schema, ArrowRecordBatch recordBatch)
Expand Down
Loading

0 comments on commit bafbaf9

Please sign in to comment.