Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-461]Fix issue when doing rowToArrowColumnar conversion for ArrayType #492

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,15 @@ public Decimal getDecimal(int rowId, int precision, int scale) {
public UTF8String getUTF8String(int rowId) {
if (isNullAt(rowId))
return null;
return accessor.getUTF8String(rowId);
if (dataType() instanceof ArrayType) {
UTF8String ret_0 = accessor.getUTF8String(rowId);
for (int i = 0; i < ((ArrayAccessor) accessor).getArrayLength(rowId); i++) {
ret_0 = UTF8String.concat(ret_0, getArray(rowId).getUTF8String(i));
}
return ret_0;
} else {
return accessor.getUTF8String(rowId);
}
}

@Override
Expand Down Expand Up @@ -1165,13 +1173,10 @@ final long getLong(int rowId) {

private static class ArrayAccessor extends ArrowVectorAccessor {
private final ListVector accessor;
ArrowWritableColumnVector arrayData;

ArrayAccessor(ListVector vector) {
super(vector);
this.accessor = vector;
arrayData =
new ArrowWritableColumnVector(vector.getDataVector(), 0, vector.size(), false);
}

@Override
Expand All @@ -1197,6 +1202,12 @@ public int getArrayOffset(int rowId) {
int index = rowId * ListVector.OFFSET_WIDTH;
return accessor.getOffsetBuffer().getInt(index);
}

@Override
final UTF8String getUTF8String(int rowId) {
return UTF8String.fromString(
"Array[" + getArrayOffset(rowId) + "-" + getArrayLength(rowId) + "]");
}
}

/**
Expand Down Expand Up @@ -1849,12 +1860,24 @@ final void setNulls(int rowId, int count) {

private static class ArrayWriter extends ArrowVectorWriter {
private final ListVector writer;
// private final ArrowWritableColumnVector arrayData;

ArrayWriter(ListVector vector, ArrowVectorWriter elementVector) {
super(vector);
this.writer = vector;
}

@Override
void setArray(int rowId, int offset, int length) {
int index = rowId * ListVector.OFFSET_WIDTH;
writer.getOffsetBuffer().setInt(index, offset);
writer.getOffsetBuffer().setInt(index + ListVector.OFFSET_WIDTH, offset + length);
writer.setNotNull(rowId);
}

@Override
final void setNull(int rowId) {
writer.setNull(rowId);
}
}

private static class StructWriter extends ArrowVectorWriter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,27 @@ import scala.collection.mutable.ListBuffer

class ColumnarBoundReference(ordinal: Int, dataType: DataType, nullable: Boolean)
extends BoundReference(ordinal, dataType, nullable)
with ColumnarExpression with Logging {
with ColumnarExpression
with Logging {

buildCheck()

def buildCheck(): Unit = {
try {
ConverterUtils.checkIfTypeSupported(dataType)
dataType match {
case at: ArrayType =>
case _ =>
ConverterUtils.checkIfTypeSupported(dataType)
}
} catch {
case e : UnsupportedOperationException =>
case e: UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${dataType} is not supported in ColumnarBoundReference.")
}
}
override def doColumnarCodeGen(args: java.lang.Object): (TreeNode, ArrowType) = {
val resultType = CodeGeneration.getResultType(dataType)
val field = Field.nullable(s"c_$ordinal", resultType)
val field = ConverterUtils.createArrowField(s"c_$ordinal", dataType)
val fieldTypes = args.asInstanceOf[java.util.List[Field]]
fieldTypes.add(field)
(TreeBuilder.makeField(field), resultType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,36 +47,42 @@ import scala.collection.mutable.ListBuffer
import scala.collection.immutable.List
import scala.collection.mutable.ArrayBuffer

class ColumnarProjection (
originalInputAttributes: Seq[Attribute],
exprs: Seq[Expression],
skipLiteral: Boolean = false,
renameResult: Boolean = false) extends AutoCloseable with Logging {
class ColumnarProjection(
originalInputAttributes: Seq[Attribute],
exprs: Seq[Expression],
skipLiteral: Boolean = false,
renameResult: Boolean = false)
extends AutoCloseable
with Logging {
// build gandiva projection here.
//System.out.println(s"originalInputAttributes is ${originalInputAttributes}, exprs is ${exprs.toList}")
//////////////// Project original input to aggregate input //////////////////
var projector : Projector = null
var inputList : java.util.List[Field] = Lists.newArrayList()
var projector: Projector = null
var inputList: java.util.List[Field] = Lists.newArrayList()
val expressionList = if (skipLiteral) {
exprs.filter(expr => !expr.isInstanceOf[Literal])
} else {
exprs
}
val resultAttributes = expressionList.toList.zipWithIndex.map{case (expr, i) =>
if (renameResult) {
ConverterUtils.getResultAttrFromExpr(expr, s"res_$i")
} else {
ConverterUtils.getResultAttrFromExpr(expr)
}
val resultAttributes = expressionList.toList.zipWithIndex.map {
case (expr, i) =>
if (renameResult) {
ConverterUtils.getResultAttrFromExpr(expr, s"res_$i")
} else {
ConverterUtils.getResultAttrFromExpr(expr)
}
}
var check_if_no_calculation = true
val projPrepareList : Seq[ExpressionTree] = expressionList.zipWithIndex.map {
val projPrepareList: Seq[ExpressionTree] = expressionList.zipWithIndex.map {
case (expr, i) => {
ColumnarExpressionConverter.reset()
var columnarExpr: Expression =
ColumnarExpressionConverter.replaceWithColumnarExpression(expr, originalInputAttributes, i)
ColumnarExpressionConverter.replaceWithColumnarExpression(
expr,
originalInputAttributes,
i)
if (ColumnarExpressionConverter.ifNoCalculation == false) {
check_if_no_calculation = false
check_if_no_calculation = false
}
logInfo(s"columnarExpr is ${columnarExpr}")
val (node, resultType) =
Expand All @@ -85,47 +91,56 @@ class ColumnarProjection (
}
}

val (ordinalList, arrowSchema) = if (projPrepareList.size > 0 &&
(s"${projPrepareList.map(_.toProtobuf)}".contains("fnNode") || projPrepareList.size != inputList.size)) {
val inputFieldList = inputList.asScala.toList.distinct
val schema = new Schema(inputFieldList.asJava)
projector = Projector.make(schema, projPrepareList.toList.asJava)
(inputFieldList.map(field => {
field.getName.replace("c_", "").toInt
}),
schema)
} else {
val inputFieldList = inputList.asScala.toList
(inputFieldList.map(field => {
field.getName.replace("c_", "").toInt
}),
new Schema(inputFieldList.asJava))
val (ordinalList, arrowSchema) = {
var protoBufTest: String = null
try {
protoBufTest = s"${projPrepareList.map(_.toProtobuf)}"
} catch {
case _ => protoBufTest = null
}
if (protoBufTest != null && projPrepareList.size > 0 &&
(protoBufTest.contains("fnNode") || projPrepareList.size != inputList.size)) {
val inputFieldList = inputList.asScala.toList.distinct
val schema = new Schema(inputFieldList.asJava)
projector = Projector.make(schema, projPrepareList.toList.asJava)
(inputFieldList.map(field => {
field.getName.replace("c_", "").toInt
}), schema)
} else {
val inputFieldList = inputList.asScala.toList
(inputFieldList.map(field => {
field.getName.replace("c_", "").toInt
}), new Schema(inputFieldList.asJava))
}
}
//System.out.println(s"Project input ordinal is ${ordinalList}, Schema is ${arrowSchema}")
val outputArrowSchema = new Schema(resultAttributes.map(attr => {
Field.nullable(s"${attr.name}#${attr.exprId.id}", CodeGeneration.getResultType(attr.dataType))
}).asJava)
val outputArrowSchema = new Schema(
resultAttributes.map(attr => ConverterUtils.createArrowField(attr)).asJava)
val outputSchema = ArrowUtils.fromArrowSchema(outputArrowSchema)

def output(): List[AttributeReference] = {
resultAttributes
}

def getOrdinalList(): List[Int] = {
ordinalList
ordinalList
}

def needEvaluate : Boolean = { projector != null }
def evaluate(numRows: Int, inputColumnVector: List[ValueVector]): List[ArrowWritableColumnVector] = {
def needEvaluate: Boolean = { projector != null }
def evaluate(
numRows: Int,
inputColumnVector: List[ValueVector]): List[ArrowWritableColumnVector] = {
if (projector != null) {
val inputRecordBatch: ArrowRecordBatch = ConverterUtils.createArrowRecordBatch(numRows, inputColumnVector)
val inputRecordBatch: ArrowRecordBatch =
ConverterUtils.createArrowRecordBatch(numRows, inputColumnVector)
val outputVectors = ArrowWritableColumnVector.allocateColumns(numRows, outputSchema)
val valueVectors = outputVectors.map(columnVector => columnVector.getValueVector()).toList
projector.evaluate(inputRecordBatch, valueVectors.asJava)
ConverterUtils.releaseArrowRecordBatch(inputRecordBatch)
outputVectors.toList
} else {
val inputRecordBatch: ArrowRecordBatch = ConverterUtils.createArrowRecordBatch(numRows, inputColumnVector)
val inputRecordBatch: ArrowRecordBatch =
ConverterUtils.createArrowRecordBatch(numRows, inputColumnVector)
ArrowWritableColumnVector.loadColumns(numRows, outputArrowSchema, inputRecordBatch).toList
}
}
Expand All @@ -139,28 +154,31 @@ class ColumnarProjection (
}

object ColumnarProjection extends Logging {
def buildCheck(originalInputAttributes: Seq[Attribute],
exprs: Seq[Expression]): Unit = {
def buildCheck(originalInputAttributes: Seq[Attribute], exprs: Seq[Expression]): Unit = {
for (expr <- exprs) {
ColumnarExpressionConverter
.replaceWithColumnarExpression(expr, originalInputAttributes)
}
}
def binding(originalInputAttributes: Seq[Attribute],
exprs: Seq[Expression],
expIdx: Int,
skipLiteral: Boolean = false): List[Int] = {
val expressionList = if (skipLiteral) {
exprs.filter(expr => !expr.isInstanceOf[Literal])
def binding(
originalInputAttributes: Seq[Attribute],
exprs: Seq[Expression],
expIdx: Int,
skipLiteral: Boolean = false): List[Int] = {
val expressionList = if (skipLiteral) {
exprs.filter(expr => !expr.isInstanceOf[Literal])
} else {
exprs
}
var inputList : java.util.List[Field] = Lists.newArrayList()
expressionList.map {
expr => {
var inputList: java.util.List[Field] = Lists.newArrayList()
expressionList.map { expr =>
{
ColumnarExpressionConverter.reset()
var columnarExpr: Expression =
ColumnarExpressionConverter.replaceWithColumnarExpression(expr, originalInputAttributes, expIdx)
ColumnarExpressionConverter.replaceWithColumnarExpression(
expr,
originalInputAttributes,
expIdx)
columnarExpr.asInstanceOf[ColumnarExpression].doColumnarCodeGen(inputList)
}
}
Expand All @@ -169,11 +187,10 @@ object ColumnarProjection extends Logging {
})
}
def create(
originalInputAttributes: Seq[Attribute],
exprs: Seq[Expression],
skipLiteral: Boolean = false,
renameResult: Boolean = false)
: ColumnarProjection = {
originalInputAttributes: Seq[Attribute],
exprs: Seq[Expression],
skipLiteral: Boolean = false,
renameResult: Boolean = false): ColumnarProjection = {
new ColumnarProjection(originalInputAttributes, exprs, skipLiteral, renameResult)
}
}
Loading