Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-514] Fix the core dump issue in Q93 with V2 test #515

Merged
merged 3 commits into from
Sep 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,17 @@

package com.intel.oap.vectorized;


import java.nio.ByteBuffer;

public class ArrowColumnarToRowInfo {
public long instanceID;
public long[] offsets;
public long[] lengths;
public long memoryAddress;

public ArrowColumnarToRowInfo(long instanceID,
long[] offsets, long[] lengths) {
long[] offsets, long[] lengths, long memoryAddress) {
this.instanceID = instanceID;
this.offsets = offsets;
this.lengths = lengths;
this.memoryAddress = memoryAddress;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@

package com.intel.oap.vectorized;

import sun.nio.ch.DirectBuffer;

import java.io.IOException;
import java.nio.ByteBuffer;

public class ArrowColumnarToRowJniWrapper {

Expand All @@ -30,6 +27,6 @@ public ArrowColumnarToRowJniWrapper() throws IOException {

public native ArrowColumnarToRowInfo nativeConvertColumnarToRow(
byte[] schema, int numRows, long[] bufAddrs,
long[] bufSizes, long memoryAddress, long memorySize, long fixedSizePerRow) throws RuntimeException;
long[] bufSizes, long memoryPollID) throws RuntimeException;
public native void nativeClose(long instanceID);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ package com.intel.oap.execution

import com.intel.oap.expression.ConverterUtils
import com.intel.oap.vectorized.{ArrowColumnarToRowJniWrapper, ArrowWritableColumnVector}
import org.apache.arrow.vector.BaseVariableWidthVector
import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema}
import org.apache.arrow.vector.types.pojo.{Field, Schema}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.array.ByteArrayMethods

import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
Expand Down Expand Up @@ -83,19 +82,6 @@ class ArrowColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExec(child =
val schema = new Schema(fields.asJava)
ConverterUtils.getSchemaBytesBuf(schema)
}
// For decimal type if the precision > 18 will need 16 bytes variable size.
def containDecimalCol(field: StructField): Boolean = field.dataType match {
case d: DecimalType if d.precision > 18 => true
case _ => false
}

def estimateBufferSize(numCols: Int, numRows: Int): Int = {
val fields = child.schema.fields
val decimalCols = fields.filter(field => containDecimalCol(field)).length
val fixedLength = UnsafeRow.calculateBitSetWidthInBytes(numCols) + numCols * 8
val decimalColSize = 16 * decimalCols
(fixedLength + decimalColSize) * numRows
}

batches.flatMap { batch =>
numInputBatches += 1
Expand All @@ -117,33 +103,10 @@ class ArrowColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExec(child =
val bufAddrs = new ListBuffer[Long]()
val bufSizes = new ListBuffer[Long]()
val fields = new ListBuffer[Field]()
var totalVariableSize = 0L
(0 until batch.numCols).foreach { idx =>
val column = batch.column(idx).asInstanceOf[ArrowWritableColumnVector]
fields += column.getValueVector.getField
val valueVector = column.getValueVector
if (valueVector.isInstanceOf[BaseVariableWidthVector]) {
// Calculate the total aligned size of variable cols
val arrowType = column.getValueVector.getField.getFieldType.getType
for (rowId <- 0 until batch.numRows()) {
val variableColSize = arrowType match {
case ArrowType.Utf8.INSTANCE =>
val value = column.getUTF8String(rowId)
if (value == null) {
0
} else value.numBytes()
case ArrowType.Binary.INSTANCE =>
val value = column.getBinary(rowId)
if (value == null) {
0
} else value.length
case _ => 0
}
val alignedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(variableColSize)
totalVariableSize += alignedSize
}
}
valueVector.getBuffers(false)
column.getValueVector.getBuffers(false)
.foreach { buffer =>
bufAddrs += buffer.memoryAddress()
bufSizes += buffer.readableBytes()
Expand All @@ -155,14 +118,10 @@ class ArrowColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExec(child =
}

val beforeConvert = System.nanoTime()
val totalFixedSize = estimateBufferSize(batch.numCols(), batch.numRows())
val size = totalFixedSize + totalVariableSize.toInt

val allocator = SparkMemoryUtils.contextAllocator()
val arrowBuf = allocator.buffer(size)
val info = jniWrapper.nativeConvertColumnarToRow(
arrowSchema, batch.numRows, bufAddrs.toArray, bufSizes.toArray,
arrowBuf.memoryAddress(), size, totalFixedSize / batch.numRows())
SparkMemoryUtils.contextMemoryPool().getNativeInstanceId)

convertTime += NANOSECONDS.toMillis(System.nanoTime() - beforeConvert)

Expand All @@ -173,7 +132,6 @@ class ArrowColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExec(child =
override def hasNext: Boolean = {
val result = rowId < batch.numRows()
if (!result && !closed) {
arrowBuf.release()
jniWrapper.nativeClose(info.instanceID)
closed = true
}
Expand All @@ -184,7 +142,7 @@ class ArrowColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExec(child =
if (rowId >= batch.numRows()) throw new NoSuchElementException

val (offset, length) = (info.offsets(rowId), info.lengths(rowId))
row.pointTo(null, arrowBuf.memoryAddress() + offset, length.toInt)
row.pointTo(null, info.memoryAddress + offset, length.toInt)
rowId += 1
row
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,9 @@ TEST_F(BenchmarkColumnarToRow, test) {
TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch));

if (record_batch) {
std::shared_ptr<arrow::Buffer> buffer;
buffer = *arrow::AllocateBuffer(419430400);

uint8_t* address = buffer->mutable_data();
std::shared_ptr<ColumnarToRowConverter> unsafe_row_writer_reader =
std::make_shared<ColumnarToRowConverter>(record_batch, address);
std::make_shared<ColumnarToRowConverter>(record_batch,
arrow::default_memory_pool());

TIME_NANO_OR_THROW(elapse_init, unsafe_row_writer_reader->Init());
TIME_NANO_OR_THROW(elapse_write, unsafe_row_writer_reader->Write());
Expand Down
23 changes: 12 additions & 11 deletions native-sql-engine/cpp/src/jni/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
arrow_columnar_to_row_info_class = CreateGlobalClassReference(
env, "Lcom/intel/oap/vectorized/ArrowColumnarToRowInfo;");
arrow_columnar_to_row_info_constructor =
GetMethodID(env, arrow_columnar_to_row_info_class, "<init>", "(J[J[J)V");
GetMethodID(env, arrow_columnar_to_row_info_class, "<init>", "(J[J[JJ)V");

return JNI_VERSION;
}
Expand Down Expand Up @@ -1504,8 +1504,7 @@ JNIEXPORT void JNICALL Java_com_intel_oap_vectorized_ShuffleDecompressionJniWrap
JNIEXPORT jobject JNICALL
Java_com_intel_oap_vectorized_ArrowColumnarToRowJniWrapper_nativeConvertColumnarToRow(
JNIEnv* env, jobject, jbyteArray schema_arr, jint num_rows, jlongArray buf_addrs,
jlongArray buf_sizes, jlong memory_address, jlong memory_size,
jlong fixed_size_per_row) {
jlongArray buf_sizes, jlong memory_pool_id) {
if (schema_arr == NULL) {
env->ThrowNew(
illegal_argument_exception_class,
Expand Down Expand Up @@ -1558,13 +1557,17 @@ Java_com_intel_oap_vectorized_ArrowColumnarToRowJniWrapper_nativeConvertColumnar
return NULL;
}

uint8_t* address = reinterpret_cast<uint8_t*>(memory_address);

// convert the record batch to spark unsafe row.
try {
auto* pool = reinterpret_cast<arrow::MemoryPool*>(memory_pool_id);
if (pool == nullptr) {
env->ThrowNew(illegal_argument_exception_class,
"Memory pool does not exist or has been closed");
return NULL;
}

std::shared_ptr<ColumnarToRowConverter> columnar_to_row_converter =
std::make_shared<ColumnarToRowConverter>(rb, address, memory_size,
fixed_size_per_row);
std::make_shared<ColumnarToRowConverter>(rb, pool);
auto status = columnar_to_row_converter->Init();
if (!status.ok()) {
env->ThrowNew(illegal_argument_exception_class,
Expand All @@ -1574,9 +1577,7 @@ Java_com_intel_oap_vectorized_ArrowColumnarToRowJniWrapper_nativeConvertColumnar
.c_str());
return NULL;
}

status = columnar_to_row_converter->Write();

if (!status.ok()) {
env->ThrowNew(
illegal_argument_exception_class,
Expand All @@ -1598,12 +1599,12 @@ Java_com_intel_oap_vectorized_ArrowColumnarToRowJniWrapper_nativeConvertColumnar
auto lengths_arr = env->NewLongArray(num_rows);
auto lengths_src = reinterpret_cast<const jlong*>(lengths.data());
env->SetLongArrayRegion(lengths_arr, 0, num_rows, lengths_src);
long address = reinterpret_cast<long>(columnar_to_row_converter->GetBufferAddress());

jobject arrow_columnar_to_row_info = env->NewObject(
arrow_columnar_to_row_info_class, arrow_columnar_to_row_info_constructor,
instanceID, offsets_arr, lengths_arr);
instanceID, offsets_arr, lengths_arr, address);
return arrow_columnar_to_row_info;

} catch (const std::runtime_error& error) {
env->ThrowNew(unsupportedoperation_exception_class, error.what());
} catch (const std::exception& error) {
Expand Down
100 changes: 64 additions & 36 deletions native-sql-engine/cpp/src/operators/columnar_to_row_converter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,82 @@

#include "operators/columnar_to_row_converter.h"

#include <iostream>

namespace sparkcolumnarplugin {
namespace columnartorow {

int64_t CalculateBitSetWidthInBytes(int32_t numFields) {
return ((numFields + 63) / 64) * 8;
}

int64_t RoundNumberOfBytesToNearestWord(int64_t numBytes) {
int64_t remainder = numBytes & 0x07; // This is equivalent to `numBytes % 8`
if (remainder == 0) {
return numBytes;
} else {
return numBytes + (8 - remainder);
}
}

int64_t CalculatedFixeSizePerRow(std::shared_ptr<arrow::Schema> schema,
int64_t num_cols) {
std::vector<std::shared_ptr<arrow::Field>> fields = schema->fields();
// Calculate the decimal col num when the precision >18
int32_t count = 0;
for (auto i = 0; i < num_cols; i++) {
auto type = fields[i]->type();
if (type->id() == arrow::Decimal128Type::type_id) {
auto dtype = dynamic_cast<arrow::Decimal128Type*>(type.get());
int32_t precision = dtype->precision();
if (precision > 18) count++;
}
}

int64_t fixed_size = CalculateBitSetWidthInBytes(num_cols) + num_cols * 8;
int64_t decimal_cols_size = count * 16;
return fixed_size + decimal_cols_size;
}

arrow::Status ColumnarToRowConverter::Init() {
num_rows_ = rb_->num_rows();
num_cols_ = rb_->num_columns();
// Calculate the initial size
nullBitsetWidthInBytes_ = CalculateBitSetWidthInBytes(num_cols_);
memset(buffer_address_, 0, sizeof(int8_t) * memory_size_);

int64_t fixed_size_per_row = CalculatedFixeSizePerRow(rb_->schema(), num_cols_);

// Initialize the offsets_ , lengths_, buffer_cursor_
for (auto i = 0; i < num_rows_; i++) {
lengths_.push_back(fixed_size_per_row);
offsets_.push_back(0);
buffer_cursor_.push_back(nullBitsetWidthInBytes_ + 8 * num_cols_);
}
// Calculated the lengths_
for (auto i = 0; i < num_cols_; i++) {
auto array = rb_->column(i);
if (arrow::is_binary_like(array->type_id())) {
auto binary_array = std::static_pointer_cast<arrow::BinaryArray>(array);
using offset_type = typename arrow::BinaryType::offset_type;
offset_type length;
for (auto j = 0; j < num_rows_; j++) {
auto value = binary_array->GetValue(j, &length);
lengths_[j] += RoundNumberOfBytesToNearestWord(length);
}
}
}
// Calculated the offsets_ and total memory size based on lengths_
int64_t total_memory_size = lengths_[0];
for (auto i = 1; i < num_rows_; i++) {
offsets_[i] = offsets_[i - 1] + lengths_[i - 1];
total_memory_size += lengths_[i];
}

ARROW_ASSIGN_OR_RAISE(buffer_, AllocateBuffer(total_memory_size, memory_pool_));

memset(buffer_->mutable_data(), 0, sizeof(int8_t) * total_memory_size);

buffer_address_ = buffer_->mutable_data();
return arrow::Status::OK();
}

Expand Down Expand Up @@ -429,42 +492,7 @@ arrow::Status WriteValue(uint8_t* buffer_address, int64_t field_offset,
return arrow::Status::OK();
}

int64_t RoundNumberOfBytesToNearestWord(int64_t numBytes) {
int64_t remainder = numBytes & 0x07; // This is equivalent to `numBytes % 8`
if (remainder == 0) {
return numBytes;
} else {
return numBytes + (8 - remainder);
}
}

arrow::Status ColumnarToRowConverter::Write() {
// Initialize the offsets_ , lengths_, buffer_cursor_
for (auto i = 0; i < num_rows_; i++) {
lengths_.push_back(fixed_size_per_row_);
offsets_.push_back(0);
buffer_cursor_.push_back(nullBitsetWidthInBytes_ + 8 * num_cols_);
}

// Calculated the lengths_
for (auto i = 0; i < num_cols_; i++) {
auto array = rb_->column(i);
if (arrow::is_binary_like(array->type_id())) {
auto binary_array = std::static_pointer_cast<arrow::BinaryArray>(array);
using offset_type = typename arrow::BinaryType::offset_type;
offset_type length;
for (auto j = 0; j < num_rows_; j++) {
auto value = binary_array->GetValue(j, &length);
lengths_[j] += RoundNumberOfBytesToNearestWord(length);
}
}
}

// Calculated the offsets_ based on lengths_
for (auto i = 1; i < num_rows_; i++) {
offsets_[i] = offsets_[i - 1] + lengths_[i - 1];
}

for (auto i = 0; i < num_cols_; i++) {
auto array = rb_->column(i);
int64_t field_offset = GetFieldOffset(nullBitsetWidthInBytes_, i);
Expand Down
15 changes: 6 additions & 9 deletions native-sql-engine/cpp/src/operators/columnar_to_row_converter.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,26 @@ namespace columnartorow {

class ColumnarToRowConverter {
public:
ColumnarToRowConverter(std::shared_ptr<arrow::RecordBatch> rb, uint8_t* buffer_address,
int64_t memory_size = 0, int64_t fixed_size_per_row = 0)
: rb_(rb),
buffer_address_(buffer_address),
memory_size_(memory_size),
fixed_size_per_row_(fixed_size_per_row) {}
ColumnarToRowConverter(std::shared_ptr<arrow::RecordBatch> rb,
arrow::MemoryPool* memory_pool)
: rb_(rb), memory_pool_(memory_pool) {}

arrow::Status Init();
arrow::Status Write();

uint8_t* GetBufferAddress() { return buffer_address_; } // for test
uint8_t* GetBufferAddress() { return buffer_address_; }
const std::vector<int64_t>& GetOffsets() { return offsets_; }
const std::vector<int64_t>& GetLengths() { return lengths_; }

protected:
std::vector<int64_t> buffer_cursor_;
std::shared_ptr<arrow::RecordBatch> rb_;
std::shared_ptr<arrow::Buffer> buffer_;
arrow::MemoryPool* memory_pool_ = arrow::default_memory_pool();
int64_t nullBitsetWidthInBytes_;
int64_t num_cols_;
int64_t num_rows_;
uint8_t* buffer_address_;
int64_t memory_size_;
int64_t fixed_size_per_row_;
std::vector<int64_t> offsets_;
std::vector<int64_t> lengths_;
};
Expand Down
Loading