Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-501] Remove ArrowRecordBatchBuilder and its usages #502

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
import java.io.IOException;
import java.util.List;

import com.intel.oap.vectorized.ArrowRecordBatchBuilder;
import com.intel.oap.vectorized.ArrowRecordBatchBuilderImpl;
import org.apache.arrow.dataset.jni.UnsafeRecordBatchSerializer;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorLoader;
Expand Down Expand Up @@ -111,14 +110,13 @@ public Schema getSchema() throws IOException {
* @throws IOException throws io exception in case of native failure
*/
public ArrowRecordBatch readNext() throws IOException {
ArrowRecordBatchBuilder recordBatchBuilder =
byte[] serializedBatch =
jniWrapper.nativeReadNext(nativeInstanceId);
if (recordBatchBuilder == null) {
if (serializedBatch == null) {
return null;
}
ArrowRecordBatchBuilderImpl recordBatchBuilderImpl =
new ArrowRecordBatchBuilderImpl(recordBatchBuilder);
ArrowRecordBatch batch = recordBatchBuilderImpl.build();
ArrowRecordBatch batch = UnsafeRecordBatchSerializer.deserializeUnsafe(allocator,
serializedBatch);
if (batch == null) {
throw new IllegalArgumentException("failed to build record batch");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package com.intel.oap.datasource.parquet;

import com.intel.oap.vectorized.ArrowRecordBatchBuilder;
import com.intel.oap.vectorized.JniUtils;

import java.io.IOException;
Expand Down Expand Up @@ -76,7 +75,7 @@ public native void nativeInitParquetReader2(
* @param id parquet reader instance number
* @throws IOException throws exception in case of any io exception in native codes
*/
public native ArrowRecordBatchBuilder nativeReadNext(long id) throws IOException;
public native byte[] nativeReadNext(long id) throws IOException;

/**
* Get schema from parquet file reader.
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package com.intel.oap.vectorized;

import java.io.IOException;

import org.apache.arrow.dataset.jni.UnsafeRecordBatchSerializer;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.Schema;
import java.util.List;
Expand All @@ -28,16 +31,17 @@
import org.apache.arrow.vector.ipc.WriteChannel;
import org.apache.arrow.vector.ipc.message.ArrowBuffer;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils;

public class BatchIterator implements AutoCloseable {
private native boolean nativeHasNext(long nativeHandler);
private native ArrowRecordBatchBuilder nativeNext(long nativeHandler);
private native byte[] nativeNext(long nativeHandler);
private native MetricsObject nativeFetchMetrics(long nativeHandler);
private native ArrowRecordBatchBuilder nativeProcess(long nativeHandler,
private native byte[] nativeProcess(long nativeHandler,
byte[] schemaBuf, int numRows, long[] bufAddrs, long[] bufSizes);
private native void nativeProcessAndCacheOne(long nativeHandler, byte[] schemaBuf,
int numRows, long[] bufAddrs, long[] bufSizes);
private native ArrowRecordBatchBuilder nativeProcessWithSelection(long nativeHandler,
private native byte[] nativeProcessWithSelection(long nativeHandler,
byte[] schemaBuf, int numRows, long[] bufAddrs, long[] bufSizes,
int selectionVectorRecordCount, long selectionVectorAddr, long selectionVectorSize);
private native void nativeProcessAndCacheOneWithSelection(long nativeHandler,
Expand All @@ -64,16 +68,16 @@ public boolean hasNext() throws IOException {
}

public ArrowRecordBatch next() throws IOException {
BufferAllocator allocator = SparkMemoryUtils.contextAllocator();
if (nativeHandler == 0) {
return null;
}
ArrowRecordBatchBuilder resRecordBatchBuilder = nativeNext(nativeHandler);
if (resRecordBatchBuilder == null) {
byte[] serializedRecordBatch = nativeNext(nativeHandler);
if (serializedRecordBatch == null) {
return null;
}
ArrowRecordBatchBuilderImpl resRecordBatchBuilderImpl =
new ArrowRecordBatchBuilderImpl(resRecordBatchBuilder);
return resRecordBatchBuilderImpl.build();
return UnsafeRecordBatchSerializer.deserializeUnsafe(allocator,
serializedRecordBatch);
}

public MetricsObject getMetrics() throws IOException, ClassNotFoundException {
Expand Down Expand Up @@ -128,24 +132,24 @@ public ArrowRecordBatch process(Schema schema, ArrowRecordBatch recordBatch,
if (nativeHandler == 0) {
return null;
}
ArrowRecordBatchBuilder resRecordBatchBuilder;
BufferAllocator allocator = SparkMemoryUtils.contextAllocator();
byte[] serializedRecordBatch;
if (selectionVector != null) {
int selectionVectorRecordCount = selectionVector.getRecordCount();
long selectionVectorAddr = selectionVector.getBuffer().memoryAddress();
long selectionVectorSize = selectionVector.getBuffer().capacity();
resRecordBatchBuilder = nativeProcessWithSelection(nativeHandler,
serializedRecordBatch = nativeProcessWithSelection(nativeHandler,
getSchemaBytesBuf(schema), num_rows, bufAddrs, bufSizes,
selectionVectorRecordCount, selectionVectorAddr, selectionVectorSize);
} else {
resRecordBatchBuilder = nativeProcess(
serializedRecordBatch = nativeProcess(
nativeHandler, getSchemaBytesBuf(schema), num_rows, bufAddrs, bufSizes);
}
if (resRecordBatchBuilder == null) {
if (serializedRecordBatch == null) {
return null;
}
ArrowRecordBatchBuilderImpl resRecordBatchBuilderImpl =
new ArrowRecordBatchBuilderImpl(resRecordBatchBuilder);
return resRecordBatchBuilderImpl.build();
return UnsafeRecordBatchSerializer.deserializeUnsafe(allocator,
serializedRecordBatch);
}

public void processAndCacheOne(Schema schema, ArrowRecordBatch recordBatch)
Expand Down
Loading