diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/datasource/parquet/ParquetReader.java b/native-sql-engine/core/src/main/java/com/intel/oap/datasource/parquet/ParquetReader.java index fcbad8090..e98dd0acf 100644 --- a/native-sql-engine/core/src/main/java/com/intel/oap/datasource/parquet/ParquetReader.java +++ b/native-sql-engine/core/src/main/java/com/intel/oap/datasource/parquet/ParquetReader.java @@ -20,8 +20,7 @@ import java.io.IOException; import java.util.List; -import com.intel.oap.vectorized.ArrowRecordBatchBuilder; -import com.intel.oap.vectorized.ArrowRecordBatchBuilderImpl; +import org.apache.arrow.dataset.jni.UnsafeRecordBatchSerializer; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorLoader; @@ -111,14 +110,13 @@ public Schema getSchema() throws IOException { * @throws IOException throws io exception in case of native failure */ public ArrowRecordBatch readNext() throws IOException { - ArrowRecordBatchBuilder recordBatchBuilder = + byte[] serializedBatch = jniWrapper.nativeReadNext(nativeInstanceId); - if (recordBatchBuilder == null) { + if (serializedBatch == null) { return null; } - ArrowRecordBatchBuilderImpl recordBatchBuilderImpl = - new ArrowRecordBatchBuilderImpl(recordBatchBuilder); - ArrowRecordBatch batch = recordBatchBuilderImpl.build(); + ArrowRecordBatch batch = UnsafeRecordBatchSerializer.deserializeUnsafe(allocator, + serializedBatch); if (batch == null) { throw new IllegalArgumentException("failed to build record batch"); } diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/datasource/parquet/ParquetReaderJniWrapper.java b/native-sql-engine/core/src/main/java/com/intel/oap/datasource/parquet/ParquetReaderJniWrapper.java index 9f3aac028..9e7ca944d 100644 --- a/native-sql-engine/core/src/main/java/com/intel/oap/datasource/parquet/ParquetReaderJniWrapper.java +++ b/native-sql-engine/core/src/main/java/com/intel/oap/datasource/parquet/ParquetReaderJniWrapper.java @@ -17,7 +17,6 @@ package com.intel.oap.datasource.parquet; -import com.intel.oap.vectorized.ArrowRecordBatchBuilder; import com.intel.oap.vectorized.JniUtils; import java.io.IOException; @@ -76,7 +75,7 @@ public native void nativeInitParquetReader2( * @param id parquet reader instance number * @throws IOException throws exception in case of any io exception in native codes */ - public native ArrowRecordBatchBuilder nativeReadNext(long id) throws IOException; + public native byte[] nativeReadNext(long id) throws IOException; /** * Get schema from parquet file reader. diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/AdaptorReferenceManager.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/AdaptorReferenceManager.java deleted file mode 100644 index a267fc58d..000000000 --- a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/AdaptorReferenceManager.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.intel.oap.vectorized; - -import org.apache.arrow.memory.ArrowBuf; -import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.arrow.memory.*; -import org.apache.arrow.util.Preconditions; - -import org.apache.arrow.memory.ArrowBuf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A simple reference manager implementation for memory allocated by native code. The underlying - * memory will be released when reference count reach zero. - * - * Jul/13/2020, Hongze Zhang: TO BE DEPRECATED: Only nativeRelease is being called at this time. - */ -@Deprecated -public class AdaptorReferenceManager implements ReferenceManager { - public static final AdaptorReferenceManager DEFAULT = new AdaptorReferenceManager(-1L, -1); - - public native void nativeRelease(long nativeMemoryHolder); - - private static final Logger LOG = LoggerFactory.getLogger(AdaptorReferenceManager.class); - private final AtomicInteger bufRefCnt = new AtomicInteger(0); - private long nativeMemoryHolder; - private int size = 0; - - AdaptorReferenceManager(long nativeMemoryHolder, int size) { - try { - JniUtils.getInstance(); - } catch (IOException e) { - throw new RuntimeException(e); - } - this.nativeMemoryHolder = nativeMemoryHolder; - this.size = size; - } - @Override - public int getRefCount() { - return bufRefCnt.get(); - } - - @Override - public boolean release() { - return release(1); - } - - @Override - public boolean release(int decrement) { - Preconditions.checkState( - decrement >= 1, "ref count decrement should be greater than or equal to 1"); - // decrement the ref count - final int refCnt; - synchronized (this) { - refCnt = bufRefCnt.addAndGet(-decrement); - if (refCnt == 0) { - // refcount of this reference manager has dropped to 0 - // release the underlying memory - nativeRelease(nativeMemoryHolder); - } - } - // the new ref count should be >= 0 - Preconditions.checkState(refCnt >= 0, "RefCnt has gone negative"); - return refCnt == 0; - } - - @Override - public void retain() { - retain(1); - } - - @Override - public void retain(int increment) { - Preconditions.checkArgument(increment > 0, "retain(%d) argument is not positive", increment); - bufRefCnt.addAndGet(increment); - } - - @Override - public ArrowBuf retain(ArrowBuf srcBuffer, BufferAllocator targetAllocator) { - retain(); - return srcBuffer; - } - - @Override - public ArrowBuf deriveBuffer(ArrowBuf sourceBuffer, long index, long length) { - final long derivedBufferAddress = sourceBuffer.memoryAddress() + index; - - // create new ArrowBuf - final ArrowBuf derivedBuf = new ArrowBuf(this, null, length, derivedBufferAddress); - - return derivedBuf; - } - - @Override - public OwnershipTransferResult transferOwnership( - ArrowBuf sourceBuffer, BufferAllocator targetAllocator) { - return NO_OP.transferOwnership(sourceBuffer, targetAllocator); - } - - @Override - public BufferAllocator getAllocator() { - throw new UnsupportedOperationException("No Allocator is retained"); - } - - @Override - public long getSize() { - return size; - } - - @Override - public long getAccountedSize() { - return 0; - } -} diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ArrowRecordBatchBuilder.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ArrowRecordBatchBuilder.java deleted file mode 100644 index 4fb0ff2a5..000000000 --- a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ArrowRecordBatchBuilder.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.intel.oap.vectorized; - -/** ArrowRecordBatchBuilder. */ -public class ArrowRecordBatchBuilder { - - public int length; - public ArrowFieldNodeBuilder[] nodeBuilders; - public ArrowBufBuilder[] bufferBuilders; - - /** - * Create an instance to wrap native parameters for ArrowRecordBatchBuilder. - * - * @param length ArrowRecordBatch rowNums. - * @param nodeBuilders an Array hold ArrowFieldNodeBuilder. - * @param bufferBuilders an Array hold ArrowBufBuilder. - */ - public ArrowRecordBatchBuilder( - int length, ArrowFieldNodeBuilder[] nodeBuilders, ArrowBufBuilder[] bufferBuilders) { - this.length = length; - this.nodeBuilders = nodeBuilders; - this.bufferBuilders = bufferBuilders; - } -} diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ArrowRecordBatchBuilderImpl.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ArrowRecordBatchBuilderImpl.java deleted file mode 100644 index 1341b858c..000000000 --- a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ArrowRecordBatchBuilderImpl.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.intel.oap.vectorized; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.arrow.memory.*; -import org.apache.arrow.vector.ipc.message.ArrowFieldNode; -import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; - -import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils; - -/** ArrowRecordBatchBuilderImpl used to wrap native returned data into an ArrowRecordBatch. */ -public class ArrowRecordBatchBuilderImpl { - - private int length; - private ArrowRecordBatchBuilder recordBatchBuilder; - - /** - * Create ArrowRecordBatchBuilderImpl instance from ArrowRecordBatchBuilder. - * - * @param recordBatchBuilder ArrowRecordBatchBuilder instance. - */ - public ArrowRecordBatchBuilderImpl(ArrowRecordBatchBuilder recordBatchBuilder) { - this.recordBatchBuilder = recordBatchBuilder; - } - - /** - * Build ArrowRecordBatch from ArrowRecordBatchBuilder instance. - * - * @throws IOException throws exception - */ - public ArrowRecordBatch build() throws IOException { - if (recordBatchBuilder.length == 0) { - return null; - } - - List nodes = new ArrayList(); - for (ArrowFieldNodeBuilder tmp : recordBatchBuilder.nodeBuilders) { - nodes.add(new ArrowFieldNode(tmp.length, tmp.nullCount)); - } - - List buffers = new ArrayList(); - for (ArrowBufBuilder tmp : recordBatchBuilder.bufferBuilders) { - BufferAllocator allocator = SparkMemoryUtils.contextAllocator(); - NativeUnderlyingMemory am = new Underlying(allocator, tmp.size, - tmp.nativeInstanceId, tmp.memoryAddress); - ReferenceManager rm = am.createReferenceManager(allocator); - buffers.add(new ArrowBuf(rm, null, tmp.size, tmp.memoryAddress)); - } - try { - return new ArrowRecordBatch(recordBatchBuilder.length, nodes, buffers); - } finally { - buffers.forEach(ArrowBuf::close); - } - } - - private static class Underlying extends NativeUnderlyingMemory { - private final long nativeInstanceId; - - public Underlying(BufferAllocator accountingAllocator, int size, - long nativeInstanceId, long address) { - super(accountingAllocator, size, nativeInstanceId, address); - this.nativeInstanceId = nativeInstanceId; - } - - @Override - protected void release0() { - AdaptorReferenceManager.DEFAULT.nativeRelease(nativeInstanceId); - } - } -} diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/BatchIterator.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/BatchIterator.java index 56049e0dc..285e41f43 100644 --- a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/BatchIterator.java +++ b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/BatchIterator.java @@ -18,6 +18,9 @@ package com.intel.oap.vectorized; import java.io.IOException; + +import org.apache.arrow.dataset.jni.UnsafeRecordBatchSerializer; +import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.types.pojo.Schema; import java.util.List; @@ -28,16 +31,17 @@ import org.apache.arrow.vector.ipc.WriteChannel; import org.apache.arrow.vector.ipc.message.ArrowBuffer; import org.apache.arrow.vector.ipc.message.MessageSerializer; +import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils; public class BatchIterator implements AutoCloseable { private native boolean nativeHasNext(long nativeHandler); - private native ArrowRecordBatchBuilder nativeNext(long nativeHandler); + private native byte[] nativeNext(long nativeHandler); private native MetricsObject nativeFetchMetrics(long nativeHandler); - private native ArrowRecordBatchBuilder nativeProcess(long nativeHandler, + private native byte[] nativeProcess(long nativeHandler, byte[] schemaBuf, int numRows, long[] bufAddrs, long[] bufSizes); private native void nativeProcessAndCacheOne(long nativeHandler, byte[] schemaBuf, int numRows, long[] bufAddrs, long[] bufSizes); - private native ArrowRecordBatchBuilder nativeProcessWithSelection(long nativeHandler, + private native byte[] nativeProcessWithSelection(long nativeHandler, byte[] schemaBuf, int numRows, long[] bufAddrs, long[] bufSizes, int selectionVectorRecordCount, long selectionVectorAddr, long selectionVectorSize); private native void nativeProcessAndCacheOneWithSelection(long nativeHandler, @@ -64,16 +68,16 @@ public boolean hasNext() throws IOException { } public ArrowRecordBatch next() throws IOException { + BufferAllocator allocator = SparkMemoryUtils.contextAllocator(); if (nativeHandler == 0) { return null; } - ArrowRecordBatchBuilder resRecordBatchBuilder = nativeNext(nativeHandler); - if (resRecordBatchBuilder == null) { + byte[] serializedRecordBatch = nativeNext(nativeHandler); + if (serializedRecordBatch == null) { return null; } - ArrowRecordBatchBuilderImpl resRecordBatchBuilderImpl = - new ArrowRecordBatchBuilderImpl(resRecordBatchBuilder); - return resRecordBatchBuilderImpl.build(); + return UnsafeRecordBatchSerializer.deserializeUnsafe(allocator, + serializedRecordBatch); } public MetricsObject getMetrics() throws IOException, ClassNotFoundException { @@ -128,24 +132,24 @@ public ArrowRecordBatch process(Schema schema, ArrowRecordBatch recordBatch, if (nativeHandler == 0) { return null; } - ArrowRecordBatchBuilder resRecordBatchBuilder; + BufferAllocator allocator = SparkMemoryUtils.contextAllocator(); + byte[] serializedRecordBatch; if (selectionVector != null) { int selectionVectorRecordCount = selectionVector.getRecordCount(); long selectionVectorAddr = selectionVector.getBuffer().memoryAddress(); long selectionVectorSize = selectionVector.getBuffer().capacity(); - resRecordBatchBuilder = nativeProcessWithSelection(nativeHandler, + serializedRecordBatch = nativeProcessWithSelection(nativeHandler, getSchemaBytesBuf(schema), num_rows, bufAddrs, bufSizes, selectionVectorRecordCount, selectionVectorAddr, selectionVectorSize); } else { - resRecordBatchBuilder = nativeProcess( + serializedRecordBatch = nativeProcess( nativeHandler, getSchemaBytesBuf(schema), num_rows, bufAddrs, bufSizes); } - if (resRecordBatchBuilder == null) { + if (serializedRecordBatch == null) { return null; } - ArrowRecordBatchBuilderImpl resRecordBatchBuilderImpl = - new ArrowRecordBatchBuilderImpl(resRecordBatchBuilder); - return resRecordBatchBuilderImpl.build(); + return UnsafeRecordBatchSerializer.deserializeUnsafe(allocator, + serializedRecordBatch); } public void processAndCacheOne(Schema schema, ArrowRecordBatch recordBatch) diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ExpressionEvaluator.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ExpressionEvaluator.java index 1e3405c8a..e77ecf7d7 100644 --- a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ExpressionEvaluator.java +++ b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ExpressionEvaluator.java @@ -27,6 +27,7 @@ import org.apache.arrow.gandiva.expression.ExpressionTree; import org.apache.arrow.gandiva.ipc.GandivaTypes; import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.ipc.WriteChannel; import org.apache.arrow.vector.ipc.message.ArrowBuffer; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; @@ -155,16 +156,16 @@ public void evaluate(ColumnarNativeIterator batchItr) */ public ArrowRecordBatch[] evaluate2(ArrowRecordBatch recordBatch) throws RuntimeException, IOException { byte[] bytes = UnsafeRecordBatchSerializer.serializeUnsafe(recordBatch); - ArrowRecordBatchBuilder[] resRecordBatchBuilderList = jniWrapper.nativeEvaluate2(nativeHandler, bytes); - ArrowRecordBatch[] recordBatchList = new ArrowRecordBatch[resRecordBatchBuilderList.length]; - for (int i = 0; i < resRecordBatchBuilderList.length; i++) { - if (resRecordBatchBuilderList[i] == null) { + byte[][] serializedBatchArray = jniWrapper.nativeEvaluate2(nativeHandler, bytes); + BufferAllocator allocator = SparkMemoryUtils.contextAllocator(); + ArrowRecordBatch[] recordBatchList = new ArrowRecordBatch[serializedBatchArray.length]; + for (int i = 0; i < serializedBatchArray.length; i++) { + if (serializedBatchArray[i] == null) { recordBatchList[i] = null; break; } - ArrowRecordBatchBuilderImpl resRecordBatchBuilderImpl = new ArrowRecordBatchBuilderImpl( - resRecordBatchBuilderList[i]); - recordBatchList[i] = resRecordBatchBuilderImpl.build(); + recordBatchList[i] = UnsafeRecordBatchSerializer.deserializeUnsafe(allocator, + serializedBatchArray[i]); } return recordBatchList; } @@ -188,25 +189,26 @@ public ArrowRecordBatch[] evaluate(ArrowRecordBatch recordBatch, SelectionVector bufSizes[idx++] = bufLayout.getSize(); } - ArrowRecordBatchBuilder[] resRecordBatchBuilderList; + BufferAllocator allocator = SparkMemoryUtils.contextAllocator(); + + byte[][] serializedBatchArray; if (selectionVector != null) { int selectionVectorRecordCount = selectionVector.getRecordCount(); long selectionVectorAddr = selectionVector.getBuffer().memoryAddress(); long selectionVectorSize = selectionVector.getBuffer().capacity(); - resRecordBatchBuilderList = jniWrapper.nativeEvaluateWithSelection(nativeHandler, recordBatch.getLength(), + serializedBatchArray = jniWrapper.nativeEvaluateWithSelection(nativeHandler, recordBatch.getLength(), bufAddrs, bufSizes, selectionVectorRecordCount, selectionVectorAddr, selectionVectorSize); } else { - resRecordBatchBuilderList = jniWrapper.nativeEvaluate(nativeHandler, recordBatch.getLength(), bufAddrs, bufSizes); + serializedBatchArray = jniWrapper.nativeEvaluate(nativeHandler, recordBatch.getLength(), bufAddrs, bufSizes); } - ArrowRecordBatch[] recordBatchList = new ArrowRecordBatch[resRecordBatchBuilderList.length]; - for (int i = 0; i < resRecordBatchBuilderList.length; i++) { - if (resRecordBatchBuilderList[i] == null) { + ArrowRecordBatch[] recordBatchList = new ArrowRecordBatch[serializedBatchArray.length]; + for (int i = 0; i < serializedBatchArray.length; i++) { + if (serializedBatchArray[i] == null) { recordBatchList[i] = null; break; } - ArrowRecordBatchBuilderImpl resRecordBatchBuilderImpl = new ArrowRecordBatchBuilderImpl( - resRecordBatchBuilderList[i]); - recordBatchList[i] = resRecordBatchBuilderImpl.build(); + recordBatchList[i] = UnsafeRecordBatchSerializer.deserializeUnsafe(allocator, + serializedBatchArray[i]); } return recordBatchList; } @@ -233,16 +235,16 @@ public void SetMember(ArrowRecordBatch recordBatch) throws RuntimeException, IOE } public ArrowRecordBatch[] finish() throws RuntimeException, IOException { - ArrowRecordBatchBuilder[] resRecordBatchBuilderList = jniWrapper.nativeFinish(nativeHandler); - ArrowRecordBatch[] recordBatchList = new ArrowRecordBatch[resRecordBatchBuilderList.length]; - for (int i = 0; i < resRecordBatchBuilderList.length; i++) { - if (resRecordBatchBuilderList[i] == null) { + BufferAllocator allocator = SparkMemoryUtils.contextAllocator(); + byte[][] serializedBatchArray = jniWrapper.nativeFinish(nativeHandler); + ArrowRecordBatch[] recordBatchList = new ArrowRecordBatch[serializedBatchArray.length]; + for (int i = 0; i < serializedBatchArray.length; i++) { + if (serializedBatchArray[i] == null) { recordBatchList[i] = null; break; } - ArrowRecordBatchBuilderImpl resRecordBatchBuilderImpl = new ArrowRecordBatchBuilderImpl( - resRecordBatchBuilderList[i]); - recordBatchList[i] = resRecordBatchBuilderImpl.build(); + recordBatchList[i] = UnsafeRecordBatchSerializer.deserializeUnsafe(allocator, + serializedBatchArray[i]); } return recordBatchList; } diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ExpressionEvaluatorJniWrapper.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ExpressionEvaluatorJniWrapper.java index c9ddcad42..ef81c4532 100644 --- a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ExpressionEvaluatorJniWrapper.java +++ b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ExpressionEvaluatorJniWrapper.java @@ -141,13 +141,13 @@ native long nativeBuildWithFinish(long memoryPool, byte[] schemaBuf, byte[] expr * for offset vectors later). * @param bufSizes An array of buffer sizes. For each memory address in * bufAddrs, the size of the buffer is present in bufSizes - * @return A list of ArrowRecordBatchBuilder which can be used to build a List + * @return A list of serialized record batch which can be used to build a List * of ArrowRecordBatch */ - native ArrowRecordBatchBuilder[] nativeEvaluate(long nativeHandler, int numRows, long[] bufAddrs, + native byte[][] nativeEvaluate(long nativeHandler, int numRows, long[] bufAddrs, long[] bufSizes) throws RuntimeException; - native ArrowRecordBatchBuilder[] nativeEvaluate2(long nativeHandler, byte[] bytes) throws RuntimeException; + native byte[][] nativeEvaluate2(long nativeHandler, byte[] bytes) throws RuntimeException; /** * Evaluate the expressions represented by the nativeHandler on a record batch @@ -182,10 +182,10 @@ native void nativeEvaluateWithIterator(long nativeHandler, * @param selectionVector valid selected item record count * @param selectionVector selectionVector memory address * @param selectionVectorSize selectionVector total size - * @return A list of ArrowRecordBatchBuilder which can be used to build a List + * @return A list of serialized record batch which can be used to build a List * of ArrowRecordBatch */ - native ArrowRecordBatchBuilder[] nativeEvaluateWithSelection(long nativeHandler, int numRows, long[] bufAddrs, + native byte[][] nativeEvaluateWithSelection(long nativeHandler, int numRows, long[] bufAddrs, long[] bufSizes, int selectionVectorRecordCount, long selectionVectorAddr, long selectionVectorSize) throws RuntimeException; @@ -197,10 +197,10 @@ native ArrowRecordBatchBuilder[] nativeEvaluateWithSelection(long nativeHandler, * * @param nativeHandler nativeHandler representing expressions. Created using a * call to buildNativeCode - * @return A list of ArrowRecordBatchBuilder which can be used to build a List + * @return A list of serialized record batch which can be used to build a List * of ArrowRecordBatch */ - native ArrowRecordBatchBuilder[] nativeFinish(long nativeHandler) throws RuntimeException; + native byte[][] nativeFinish(long nativeHandler) throws RuntimeException; /** * Call Finish to get result, result will be as a iterator. diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ShuffleDecompressionJniWrapper.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ShuffleDecompressionJniWrapper.java index 384a434fb..d0add0f71 100644 --- a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ShuffleDecompressionJniWrapper.java +++ b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ShuffleDecompressionJniWrapper.java @@ -34,7 +34,7 @@ public ShuffleDecompressionJniWrapper() throws IOException { */ public native long make(byte[] schemaBuf) throws RuntimeException; - public native ArrowRecordBatchBuilder decompress( + public native byte[] decompress( long schemaHolderId, String compressionCodec, int numRows, diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/ArrowColumnarBatchSerializer.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/ArrowColumnarBatchSerializer.scala index 3265e5d47..c004af0dd 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/ArrowColumnarBatchSerializer.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/ArrowColumnarBatchSerializer.scala @@ -20,35 +20,30 @@ package com.intel.oap.vectorized import java.io._ import java.nio.ByteBuffer -import com.intel.oap.ColumnarPluginConfig +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.collection.mutable.ListBuffer +import scala.reflect.ClassTag + import com.intel.oap.expression.ConverterUtils -import org.apache.arrow.memory.BufferAllocator +import org.apache.arrow.dataset.jni.UnsafeRecordBatchSerializer import org.apache.arrow.memory.ArrowBuf -import org.apache.arrow.vector.ipc.message.ArrowFieldNode +import org.apache.arrow.memory.BufferAllocator import org.apache.arrow.vector.ipc.ArrowStreamReader -import org.apache.arrow.vector.{ - BaseFixedWidthVector, - BaseVariableWidthVector, - VectorLoader, - VectorSchemaRoot -} +import org.apache.arrow.vector.VectorLoader +import org.apache.arrow.vector.VectorSchemaRoot + import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging -import org.apache.spark.serializer.{ - DeserializationStream, - SerializationStream, - Serializer, - SerializerInstance -} -import org.apache.spark.sql.execution.datasources.v2.arrow.{SparkMemoryUtils, SparkVectorUtils} +import org.apache.spark.serializer.DeserializationStream +import org.apache.spark.serializer.SerializationStream +import org.apache.spark.serializer.Serializer +import org.apache.spark.serializer.SerializerInstance +import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils +import org.apache.spark.sql.execution.datasources.v2.arrow.SparkVectorUtils import org.apache.spark.sql.execution.metric.SQLMetric -import org.apache.spark.sql.util.ArrowUtils -import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} - -import scala.collection.JavaConverters._ -import scala.collection.mutable -import scala.collection.mutable.ListBuffer -import scala.reflect.ClassTag +import org.apache.spark.sql.vectorized.ColumnVector +import org.apache.spark.sql.vectorized.ColumnarBatch class ArrowColumnarBatchSerializer(readBatchNumRows: SQLMetric, numOutputRows: SQLMetric) extends Serializer @@ -223,15 +218,15 @@ private class ArrowColumnarBatchSerializerInstance( } } - val builder = jniWrapper.decompress( + val serializedBatch = jniWrapper.decompress( schemaHolderId, reader.asInstanceOf[ArrowCompressedStreamReader].GetCompressType(), root.getRowCount, bufAddrs.toArray, bufSizes.toArray, bufBS.toBitMask) - val builerImpl = new ArrowRecordBatchBuilderImpl(builder) - val decompressedRecordBatch = builerImpl.build + val decompressedRecordBatch = + UnsafeRecordBatchSerializer.deserializeUnsafe(allocator, serializedBatch); root.clear() if (decompressedRecordBatch != null) { diff --git a/native-sql-engine/cpp/src/jni/jni_wrapper.cc b/native-sql-engine/cpp/src/jni/jni_wrapper.cc index 9370e82e7..3b66fa1bf 100644 --- a/native-sql-engine/cpp/src/jni/jni_wrapper.cc +++ b/native-sql-engine/cpp/src/jni/jni_wrapper.cc @@ -48,18 +48,11 @@ namespace types { class ExpressionList; } // namespace types -static jclass arrow_record_batch_builder_class; -static jmethodID arrow_record_batch_builder_constructor; - -static jclass arrow_field_node_builder_class; -static jmethodID arrow_field_node_builder_constructor; - -static jclass arrowbuf_builder_class; -static jmethodID arrowbuf_builder_constructor; - static jclass serializable_obj_builder_class; static jmethodID serializable_obj_builder_constructor; +static jclass byte_array_class; + static jclass split_result_class; static jmethodID split_result_constructor; @@ -74,7 +67,6 @@ static jclass arrow_columnar_to_row_info_class; static jmethodID arrow_columnar_to_row_info_constructor; using arrow::jni::ConcurrentMap; -static ConcurrentMap> buffer_holder_; static jint JNI_VERSION = JNI_VERSION_1_8; @@ -123,6 +115,13 @@ arrow::Result> FromBytes( return batch; } +arrow::Result ToBytes(JNIEnv* env, + std::shared_ptr batch) { + ARROW_ASSIGN_OR_RAISE(jbyteArray bytes, + arrow::jniutil::SerializeUnsafeFromNative(env, batch)) + return bytes; +} + class JavaRecordBatchIterator { public: explicit JavaRecordBatchIterator(JavaVM* vm, @@ -200,52 +199,6 @@ arrow::Result MakeJavaRecordBatchIterator( return itr; } -jobject MakeRecordBatchBuilder(JNIEnv* env, std::shared_ptr schema, - std::shared_ptr record_batch) { - std::vector> nodes; - std::vector> buffers; - for (int i = 0; i < schema->num_fields(); ++i) { - auto column = record_batch->column(i); - AppendNodes(column, &nodes); - AppendBuffers(column, &buffers); - } - - jobjectArray field_array = - env->NewObjectArray(nodes.size(), arrow_field_node_builder_class, nullptr); - - jobjectArray arrowbuf_builder_array = - env->NewObjectArray(buffers.size(), arrowbuf_builder_class, nullptr); - - int node_idx = 0; - for (auto node : nodes) { - jobject field = - env->NewObject(arrow_field_node_builder_class, - arrow_field_node_builder_constructor, node.first, node.second); - env->SetObjectArrayElement(field_array, node_idx++, field); - } - for (size_t j = 0; j < buffers.size(); ++j) { - auto buffer = buffers[j]; - uint8_t* data = nullptr; - int size = 0; - int64_t capacity = 0; - if (buffer != nullptr) { - data = (uint8_t*)buffer->data(); - size = (int)buffer->size(); - capacity = buffer->capacity(); - } - jobject arrowbuf_builder = - env->NewObject(arrowbuf_builder_class, arrowbuf_builder_constructor, - buffer_holder_.Insert(std::move(buffer)), data, size, capacity); - env->SetObjectArrayElement(arrowbuf_builder_array, j, arrowbuf_builder); - } - - // create RecordBatch - jobject arrow_record_batch_builder = env->NewObject( - arrow_record_batch_builder_class, arrow_record_batch_builder_constructor, - record_batch->num_rows(), field_array, arrowbuf_builder_array); - return arrow_record_batch_builder; -} - using FileSystem = arrow::fs::FileSystem; #ifdef __cplusplus @@ -266,28 +219,12 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { illegal_argument_exception_class = CreateGlobalClassReference(env, "Ljava/lang/IllegalArgumentException;"); - arrow_record_batch_builder_class = CreateGlobalClassReference( - env, "Lcom/intel/oap/vectorized/ArrowRecordBatchBuilder;"); - arrow_record_batch_builder_constructor = - GetMethodID(env, arrow_record_batch_builder_class, "", - "(I[Lcom/intel/oap/vectorized/ArrowFieldNodeBuilder;" - "[Lcom/intel/oap/vectorized/ArrowBufBuilder;)V"); - - arrow_field_node_builder_class = - CreateGlobalClassReference(env, "Lcom/intel/oap/vectorized/ArrowFieldNodeBuilder;"); - arrow_field_node_builder_constructor = - GetMethodID(env, arrow_field_node_builder_class, "", "(II)V"); - - arrowbuf_builder_class = - CreateGlobalClassReference(env, "Lcom/intel/oap/vectorized/ArrowBufBuilder;"); - arrowbuf_builder_constructor = - GetMethodID(env, arrowbuf_builder_class, "", "(JJIJ)V"); - serializable_obj_builder_class = CreateGlobalClassReference( env, "Lcom/intel/oap/vectorized/NativeSerializableObject;"); serializable_obj_builder_constructor = GetMethodID(env, serializable_obj_builder_class, "", "([J[I)V"); + byte_array_class = CreateGlobalClassReference(env, "[B"); split_result_class = CreateGlobalClassReference(env, "Lcom/intel/oap/vectorized/SplitResult;"); split_result_constructor = @@ -323,15 +260,13 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) { env->DeleteGlobalRef(illegal_access_exception_class); env->DeleteGlobalRef(illegal_argument_exception_class); - env->DeleteGlobalRef(arrow_field_node_builder_class); - env->DeleteGlobalRef(arrowbuf_builder_class); - env->DeleteGlobalRef(arrow_record_batch_builder_class); env->DeleteGlobalRef(serializable_obj_builder_class); env->DeleteGlobalRef(split_result_class); env->DeleteGlobalRef(serialized_record_batch_iterator_class); env->DeleteGlobalRef(arrow_columnar_to_row_info_class); - buffer_holder_.Clear(); + env->DeleteGlobalRef(byte_array_class); + handler_holder_.Clear(); batch_iterator_holder_.Clear(); shuffle_splitter_holder_.Clear(); @@ -423,10 +358,8 @@ Java_com_intel_oap_vectorized_ExpressionEvaluatorJniWrapper_nativeBuild( #ifdef DEBUG auto handler_holder_size = handler_holder_.Size(); auto batch_holder_size = batch_iterator_holder_.Size(); - auto buffer_holder_size = buffer_holder_.Size(); std::cout << "build native Evaluator " << handler->ToString() << "\nremain refCnt [buffer|Evaluator|batchIterator] is [" - << buffer_holder_size << "|" << handler_holder_size << "|" << batch_holder_size << "]" << std::endl; #endif @@ -516,10 +449,8 @@ Java_com_intel_oap_vectorized_ExpressionEvaluatorJniWrapper_nativeClose(JNIEnv* #ifdef DEBUG auto handler_holder_size = handler_holder_.Size(); auto batch_holder_size = batch_iterator_holder_.Size(); - auto buffer_holder_size = buffer_holder_.Size(); std::cout << "close native Evaluator " << handler->ToString() << "\nremain refCnt [buffer|Evaluator|batchIterator] is [" - << buffer_holder_size << "|" << handler_holder_size << "|" << batch_holder_size << "]" << std::endl; #endif handler_holder_.Erase(id); @@ -540,7 +471,7 @@ Java_com_intel_oap_vectorized_ExpressionEvaluatorJniWrapper_nativeSpill( return spilled_size; } -JNIEXPORT jobject JNICALL +JNIEXPORT jobjectArray JNICALL Java_com_intel_oap_vectorized_ExpressionEvaluatorJniWrapper_nativeEvaluate( JNIEnv* env, jobject obj, jlong id, jint num_rows, jlongArray buf_addrs, jlongArray buf_sizes) { @@ -574,18 +505,25 @@ Java_com_intel_oap_vectorized_ExpressionEvaluatorJniWrapper_nativeEvaluate( std::shared_ptr res_schema; status = handler->getResSchema(&res_schema); - jobjectArray record_batch_builder_array = - env->NewObjectArray(out.size(), arrow_record_batch_builder_class, nullptr); + jobjectArray serialized_record_batch_array = + env->NewObjectArray(out.size(), byte_array_class, nullptr); int i = 0; - for (auto record_batch : out) { - jobject record_batch_builder = MakeRecordBatchBuilder(env, res_schema, record_batch); - env->SetObjectArrayElement(record_batch_builder_array, i++, record_batch_builder); + for (const auto& record_batch : out) { + const arrow::Result& r = ToBytes(env, record_batch); + if (!r.ok()) { + std::string error_message = "Error deserializing message" + r.status().message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + return nullptr; + } + jbyteArray serialized_record_batch = r.ValueOrDie(); + env->SetObjectArrayElement(serialized_record_batch_array, i++, + serialized_record_batch); } env->ReleaseLongArrayElements(buf_addrs, in_buf_addrs, JNI_ABORT); env->ReleaseLongArrayElements(buf_sizes, in_buf_sizes, JNI_ABORT); - return record_batch_builder_array; + return serialized_record_batch_array; } JNIEXPORT void JNICALL @@ -630,7 +568,7 @@ Java_com_intel_oap_vectorized_ExpressionEvaluatorJniWrapper_nativeGetSignature( return env->NewStringUTF((handler->GetSignature()).c_str()); } -JNIEXPORT jobject JNICALL +JNIEXPORT jobjectArray JNICALL Java_com_intel_oap_vectorized_ExpressionEvaluatorJniWrapper_nativeEvaluateWithSelection( JNIEnv* env, jobject obj, jlong id, jint num_rows, jlongArray buf_addrs, jlongArray buf_sizes, jint selection_vector_count, jlong selection_vector_buf_addr, @@ -673,18 +611,25 @@ Java_com_intel_oap_vectorized_ExpressionEvaluatorJniWrapper_nativeEvaluateWithSe std::shared_ptr res_schema; status = handler->getResSchema(&res_schema); - jobjectArray record_batch_builder_array = - env->NewObjectArray(out.size(), arrow_record_batch_builder_class, nullptr); + jobjectArray serialized_record_batch_array = + env->NewObjectArray(out.size(), byte_array_class, nullptr); int i = 0; - for (auto record_batch : out) { - jobject record_batch_builder = MakeRecordBatchBuilder(env, res_schema, record_batch); - env->SetObjectArrayElement(record_batch_builder_array, i++, record_batch_builder); + for (const auto& record_batch : out) { + const arrow::Result& r = ToBytes(env, record_batch); + if (!r.ok()) { + std::string error_message = "Error deserializing message" + r.status().message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + return nullptr; + } + jbyteArray serialized_record_batch = r.ValueOrDie(); + env->SetObjectArrayElement(serialized_record_batch_array, i++, + serialized_record_batch); } env->ReleaseLongArrayElements(buf_addrs, in_buf_addrs, JNI_ABORT); env->ReleaseLongArrayElements(buf_sizes, in_buf_sizes, JNI_ABORT); - return record_batch_builder_array; + return serialized_record_batch_array; } JNIEXPORT void JNICALL @@ -722,7 +667,7 @@ Java_com_intel_oap_vectorized_ExpressionEvaluatorJniWrapper_nativeSetMember( env->ReleaseLongArrayElements(buf_sizes, in_buf_sizes, JNI_ABORT); } -JNIEXPORT jobject JNICALL +JNIEXPORT jobjectArray JNICALL Java_com_intel_oap_vectorized_ExpressionEvaluatorJniWrapper_nativeFinish(JNIEnv* env, jobject obj, jlong id) { @@ -740,15 +685,22 @@ Java_com_intel_oap_vectorized_ExpressionEvaluatorJniWrapper_nativeFinish(JNIEnv* std::shared_ptr schema; status = handler->getResSchema(&schema); - jobjectArray record_batch_builder_array = - env->NewObjectArray(out.size(), arrow_record_batch_builder_class, nullptr); + jobjectArray serialized_record_batch_array = + env->NewObjectArray(out.size(), byte_array_class, nullptr); int i = 0; - for (auto record_batch : out) { - jobject record_batch_builder = MakeRecordBatchBuilder(env, schema, record_batch); - env->SetObjectArrayElement(record_batch_builder_array, i++, record_batch_builder); + for (const auto& record_batch : out) { + const arrow::Result& r = ToBytes(env, record_batch); + if (!r.ok()) { + std::string error_message = "Error deserializing message" + r.status().message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + return nullptr; + } + jbyteArray serialized_record_batch = r.ValueOrDie(); + env->SetObjectArrayElement(serialized_record_batch_array, i++, + serialized_record_batch); } - return record_batch_builder_array; + return serialized_record_batch_array; } JNIEXPORT jlong JNICALL @@ -819,7 +771,15 @@ JNIEXPORT jobject JNICALL Java_com_intel_oap_vectorized_BatchIterator_nativeNext env->ThrowNew(io_exception_class, error_message.c_str()); } - return MakeRecordBatchBuilder(env, out->schema(), out); + const arrow::Result& r = ToBytes(env, out); + if (!r.ok()) { + std::string error_message = "Error deserializing message" + r.status().message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + return nullptr; + } + jbyteArray serialized_record_batch = r.ValueOrDie(); + + return serialized_record_batch; } JNIEXPORT jobject JNICALL @@ -916,7 +876,15 @@ JNIEXPORT jobject JNICALL Java_com_intel_oap_vectorized_BatchIterator_nativeProc env->ReleaseLongArrayElements(buf_addrs, in_buf_addrs, JNI_ABORT); env->ReleaseLongArrayElements(buf_sizes, in_buf_sizes, JNI_ABORT); - return MakeRecordBatchBuilder(env, out->schema(), out); + const arrow::Result& r = ToBytes(env, out); + if (!r.ok()) { + std::string error_message = "Error deserializing message" + r.status().message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + return nullptr; + } + jbyteArray serialized_record_batch = r.ValueOrDie(); + + return serialized_record_batch; } JNIEXPORT jobject JNICALL @@ -972,7 +940,15 @@ Java_com_intel_oap_vectorized_BatchIterator_nativeProcessWithSelection( env->ReleaseLongArrayElements(buf_addrs, in_buf_addrs, JNI_ABORT); env->ReleaseLongArrayElements(buf_sizes, in_buf_sizes, JNI_ABORT); - return MakeRecordBatchBuilder(env, out->schema(), out); + const arrow::Result& r = ToBytes(env, out); + if (!r.ok()) { + std::string error_message = "Error deserializing message" + r.status().message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + return nullptr; + } + jbyteArray serialized_record_batch = r.ValueOrDie(); + + return serialized_record_batch; } JNIEXPORT void JNICALL @@ -1109,19 +1085,6 @@ JNIEXPORT void JNICALL Java_com_intel_oap_vectorized_BatchIterator_nativeClose( batch_iterator_holder_.Erase(id); } -JNIEXPORT void JNICALL -Java_com_intel_oap_vectorized_AdaptorReferenceManager_nativeRelease(JNIEnv* env, - jobject this_obj, - jlong id) { -#ifdef DEBUG - auto it = buffer_holder_.Lookup(id); - if (it.use_count() > 2) { - std::cout << "buffer ptr use count is " << it.use_count() << std::endl; - } -#endif - buffer_holder_.Erase(id); -} - JNIEXPORT jlong JNICALL Java_com_intel_oap_vectorized_ShuffleSplitterJniWrapper_nativeSpill( JNIEnv* env, jobject obj, jlong splitter_id, jlong size, jboolean call_by_self) { @@ -1143,7 +1106,7 @@ Java_com_intel_oap_vectorized_ShuffleSplitterJniWrapper_nativeSpill( return spilled_size; } -JNIEXPORT jobject JNICALL +JNIEXPORT jobjectArray JNICALL Java_com_intel_oap_vectorized_ExpressionEvaluatorJniWrapper_nativeEvaluate2( JNIEnv* env, jobject obj, jlong id, jbyteArray bytes) { arrow::Status status; @@ -1165,15 +1128,22 @@ Java_com_intel_oap_vectorized_ExpressionEvaluatorJniWrapper_nativeEvaluate2( std::shared_ptr res_schema; status = handler->getResSchema(&res_schema); - jobjectArray record_batch_builder_array = - env->NewObjectArray(out.size(), arrow_record_batch_builder_class, nullptr); + jobjectArray serialized_record_batch_array = + env->NewObjectArray(out.size(), byte_array_class, nullptr); int i = 0; - for (auto record_batch : out) { - jobject record_batch_builder = MakeRecordBatchBuilder(env, res_schema, record_batch); - env->SetObjectArrayElement(record_batch_builder_array, i++, record_batch_builder); + for (const auto& record_batch : out) { + const arrow::Result& r = ToBytes(env, record_batch); + if (!r.ok()) { + std::string error_message = "Error deserializing message" + r.status().message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + return nullptr; + } + jbyteArray serialized_record_batch = r.ValueOrDie(); + env->SetObjectArrayElement(serialized_record_batch_array, i++, + serialized_record_batch); } - return record_batch_builder_array; + return serialized_record_batch_array; } JNIEXPORT jlong JNICALL @@ -1515,7 +1485,16 @@ Java_com_intel_oap_vectorized_ShuffleDecompressionJniWrapper_decompress( .c_str()); return nullptr; } - return MakeRecordBatchBuilder(env, schema, rb); + const arrow::Result& r = + ToBytes(env, arrow::RecordBatch::Make(schema, num_rows, std::move(arrays))); + if (!r.ok()) { + std::string error_message = "Error deserializing message" + r.status().message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + return nullptr; + } + jbyteArray serialized_record_batch = r.ValueOrDie(); + + return serialized_record_batch; } JNIEXPORT void JNICALL Java_com_intel_oap_vectorized_ShuffleDecompressionJniWrapper_close(