diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ArrowRowToColumnarJniWrapper.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ArrowRowToColumnarJniWrapper.java new file mode 100644 index 000000000..b53a33629 --- /dev/null +++ b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ArrowRowToColumnarJniWrapper.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.oap.vectorized; + +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; + +public class ArrowRowToColumnarJniWrapper { + public ArrowRowToColumnarJniWrapper() throws Exception { + JniUtils.getInstance(); + } + + public native byte[] nativeConvertRowToColumnar( + byte[] schema, long[] rowLength, + long bufferAddress, long memoryPollID) throws RuntimeException; + +} diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala index 53b2f836f..bc7c93f38 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala @@ -80,6 +80,9 @@ class GazellePluginConfig(conf: SQLConf) extends Logging { val enableArrowColumnarToRow: Boolean = conf.getConfString("spark.oap.sql.columnar.columnartorow", "true").toBoolean && enableCpu + val enableArrowRowToColumnar: Boolean = + conf.getConfString("spark.oap.sql.columnar.rowtocolumnar", "true").toBoolean && enableCpu + val forceShuffledHashJoin: Boolean = conf.getConfString("spark.oap.sql.columnar.forceshuffledhashjoin", "false").toBoolean && enableCpu diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala new file mode 100644 index 000000000..090031127 --- /dev/null +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.oap.execution + +import java.util.concurrent.TimeUnit._ + +import scala.collection.mutable.ListBuffer +import com.intel.oap.expression.ConverterUtils +import com.intel.oap.sql.execution.RowToColumnConverter +import com.intel.oap.vectorized.{ArrowRowToColumnarJniWrapper, ArrowWritableColumnVector, CloseableColumnBatchIterator} +import org.apache.arrow.dataset.jni.UnsafeRecordBatchSerializer +import org.apache.arrow.memory.ArrowBuf +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch +import org.apache.arrow.vector.types.pojo.Schema +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.{RowToColumnarExec, SparkPlan} +import org.apache.spark.sql.execution.datasources.v2.arrow.{SparkMemoryUtils, SparkSchemaUtils} +import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils.UnsafeItr +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.execution.vectorized.WritableColumnVector +import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.ArrowUtils +import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} +import org.apache.spark.TaskContext +import org.apache.spark.unsafe.Platform + + +class ArrowRowToColumnarExec(child: SparkPlan) extends RowToColumnarExec(child = child) { + override def nodeName: String = "ArrowRowToColumnarExec" + + buildCheck() + + def buildCheck(): Unit = { + val schema = child.schema + for (field <- schema.fields) { + field.dataType match { + case d: BooleanType => + case d: ByteType => + case d: ShortType => + case d: IntegerType => + case d: LongType => + case d: FloatType => + case d: DoubleType => + case d: StringType => + case d: DateType => + case d: DecimalType => + case d: TimestampType => + case d: BinaryType => + case _ => + throw new UnsupportedOperationException(s"${field.dataType} " + + s"is not supported in ArrowColumnarToRowExec.") + } + } + } + + override lazy val metrics: Map[String, SQLMetric] = Map( + "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), + "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"), + "processTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to convert") + ) + + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + val numInputRows = longMetric("numInputRows") + val numOutputBatches = longMetric("numOutputBatches") + val processTime = longMetric("processTime") + // Instead of creating a new config we are reusing columnBatchSize. In the future if we do + // combine with some of the Arrow conversion tools we will need to unify some of the configs. + val numRows = conf.columnBatchSize + // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire + // plan (this) in the closure. + val localSchema = this.schema + child.execute().mapPartitions { rowIterator => + + val jniWrapper = new ArrowRowToColumnarJniWrapper() + val timeZoneId = SparkSchemaUtils.getLocalTimezoneID() + val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId) + var schemaBytes: Array[Byte] = null + + if (rowIterator.hasNext) { + val res = new Iterator[ColumnarBatch] { + private val converters = new RowToColumnConverter(localSchema) + private var last_cb: ColumnarBatch = null + private var elapse: Long = 0 + // Allocate large buffer to store the numRows rows + val bufferSize = 134217728 // 128M can estimator the buffer size based on the data type + val allocator = SparkMemoryUtils.contextAllocator() + val arrowBuf: ArrowBuf = allocator.buffer(bufferSize) + override def hasNext: Boolean = { + rowIterator.hasNext + } + TaskContext.get().addTaskCompletionListener[Unit] { _ => + arrowBuf.close() + } + override def next(): ColumnarBatch = { + var isUnsafeRow = true + var firstRow = InternalRow.apply() + var hasNextRow = false + if (rowIterator.hasNext) { + firstRow = rowIterator.next() + hasNextRow = true + } + if (!firstRow.isInstanceOf[UnsafeRow]) { + isUnsafeRow = false + } + + if (arrowBuf != null && isUnsafeRow) { + val rowLength = new ListBuffer[Long]() + var rowCount = 0 + var offset = 0 + val start = System.nanoTime() + + assert(firstRow.isInstanceOf[UnsafeRow]) + val unsafeRow = firstRow.asInstanceOf[UnsafeRow] + val sizeInBytes = unsafeRow.getSizeInBytes + Platform.copyMemory(unsafeRow.getBaseObject, unsafeRow.getBaseOffset, + null, arrowBuf.memoryAddress() + offset, sizeInBytes) + offset += sizeInBytes + rowLength += sizeInBytes.toLong + rowCount += 1 + + while (rowCount < numRows && rowIterator.hasNext) { + val row = rowIterator.next() // UnsafeRow + assert(row.isInstanceOf[UnsafeRow]) + val unsafeRow = row.asInstanceOf[UnsafeRow] + val sizeInBytes = unsafeRow.getSizeInBytes + Platform.copyMemory(unsafeRow.getBaseObject, unsafeRow.getBaseOffset, + null, arrowBuf.memoryAddress() + offset, sizeInBytes) + offset += sizeInBytes + rowLength += sizeInBytes.toLong + rowCount += 1 + } + if (schemaBytes == null) { + schemaBytes = ConverterUtils.getSchemaBytesBuf(arrowSchema) + } + val serializedRecordBatch = jniWrapper.nativeConvertRowToColumnar(schemaBytes, rowLength.toArray, + arrowBuf.memoryAddress(), SparkMemoryUtils.contextMemoryPool().getNativeInstanceId) + numInputRows += rowCount + numOutputBatches += 1 + val rb = UnsafeRecordBatchSerializer.deserializeUnsafe(allocator, serializedRecordBatch) + val output = ConverterUtils.fromArrowRecordBatch(arrowSchema, rb) + val outputNumRows = rb.getLength + ConverterUtils.releaseArrowRecordBatch(rb) + last_cb = new ColumnarBatch(output.map(v => v.asInstanceOf[ColumnVector]).toArray, outputNumRows) + elapse = System.nanoTime() - start + processTime.set(NANOSECONDS.toMillis(elapse)) + last_cb + } else { + logInfo("the buffer allocated failed and will fall back to non arrow optimization") + val vectors: Seq[WritableColumnVector] = + ArrowWritableColumnVector.allocateColumns(numRows, schema) + var rowCount = 0 + + val start = System.nanoTime() + converters.convert(firstRow, vectors.toArray) + elapse += System.nanoTime() - start + rowCount += 1 + + while (rowCount < numRows && rowIterator.hasNext) { + val row = rowIterator.next() + val start = System.nanoTime() + converters.convert(row, vectors.toArray) + elapse += System.nanoTime() - start + rowCount += 1 + } + vectors.foreach(v => v.asInstanceOf[ArrowWritableColumnVector].setValueCount(rowCount)) + processTime.set(NANOSECONDS.toMillis(elapse)) + numInputRows += rowCount + numOutputBatches += 1 + last_cb = new ColumnarBatch(vectors.toArray, rowCount) + last_cb + } + } + } + new CloseableColumnBatchIterator(res) + } else { + Iterator.empty + } + } + } + + override def canEqual(other: Any): Boolean = other.isInstanceOf[ArrowRowToColumnarExec] + + override def equals(other: Any): Boolean = other match { + case that: ArrowRowToColumnarExec => + (that canEqual this) && super.equals(that) + case _ => false + } + +} diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarBinaryExpression.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarBinaryExpression.scala index 897df4e03..1ad6b4bed 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarBinaryExpression.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarBinaryExpression.scala @@ -95,6 +95,22 @@ class ColumnarGetJsonObject(left: Expression, right: Expression, original: GetJs } } +class ColumnarStringInstr(left: Expression, right: Expression, original: StringInstr) + extends StringInstr(original.str, original.substr) with ColumnarExpression with Logging { + + override def doColumnarCodeGen(args: Object): (TreeNode, ArrowType) = { + val (left_node, _): (TreeNode, ArrowType) = + left.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) + val (right_node, _): (TreeNode, ArrowType) = + right.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) + val resultType = CodeGeneration.getResultType(dataType) + // Be careful about the argument order. + val funcNode = TreeBuilder.makeFunction("locate", + Lists.newArrayList(right_node, left_node, + TreeBuilder.makeLiteral(1.asInstanceOf[java.lang.Integer])), resultType) + (funcNode, resultType) + } +} object ColumnarBinaryExpression { @@ -116,6 +132,8 @@ object ColumnarBinaryExpression { new ColumnarDateSub(left, right) case g: GetJsonObject => new ColumnarGetJsonObject(left, right, g) + case instr: StringInstr => + new ColumnarStringInstr(left, right, instr) case other => throw new UnsupportedOperationException(s"not currently supported: $other.") } diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarExpressionConverter.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarExpressionConverter.scala index bf9d010c3..b752e437b 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarExpressionConverter.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarExpressionConverter.scala @@ -275,6 +275,17 @@ object ColumnarExpressionConverter extends Logging { ss.len, convertBoundRefToAttrRef = convertBoundRefToAttrRef), expr) + case st: StringTranslate => + logInfo(s"${expr.getClass} ${expr} is supported, no_cal is $check_if_no_calculation.") + ColumnarTernaryOperator.create( + replaceWithColumnarExpression(st.srcExpr, attributeSeq, + convertBoundRefToAttrRef = convertBoundRefToAttrRef), + replaceWithColumnarExpression(st.matchingExpr, attributeSeq, + convertBoundRefToAttrRef = convertBoundRefToAttrRef), + replaceWithColumnarExpression(st.replaceExpr, attributeSeq, + convertBoundRefToAttrRef = convertBoundRefToAttrRef), + expr + ) case u: UnaryExpression => logInfo(s"${expr.getClass} ${expr} is supported, no_cal is $check_if_no_calculation.") if (!u.isInstanceOf[CheckOverflow] || !u.child.isInstanceOf[Divide]) { @@ -384,8 +395,11 @@ object ColumnarExpressionConverter extends Logging { containsSubquery(b.left) || containsSubquery(b.right) case s: String2TrimExpression => s.children.map(containsSubquery).exists(_ == true) + case st: StringTranslate => + st.children.map(containsSubquery).exists(_ == true) case regexp: RegExpReplace => - containsSubquery(regexp.subject) || containsSubquery(regexp.regexp) || containsSubquery(regexp.rep) || containsSubquery(regexp.pos) + containsSubquery(regexp.subject) || containsSubquery( + regexp.regexp) || containsSubquery(regexp.rep) || containsSubquery(regexp.pos) case expr => throw new UnsupportedOperationException( s" --> ${expr.getClass} | ${expr} is not currently supported.") diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarTernaryOperator.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarTernaryOperator.scala index 49ca6b07c..b1bf3da05 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarTernaryOperator.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarTernaryOperator.scala @@ -110,15 +110,43 @@ class ColumnarStringSplit(child: Expression, regex: Expression, } } +class ColumnarStringTranslate(src: Expression, matchingExpr: Expression, + replaceExpr: Expression, original: Expression) + extends StringTranslate(src, matchingExpr, replaceExpr) with ColumnarExpression{ + buildCheck + + def buildCheck: Unit = { + val supportedTypes = List(StringType) + if (supportedTypes.indexOf(src.dataType) == -1) { + throw new UnsupportedOperationException(s"${src.dataType}" + + s" is not supported in ColumnarStringTranslate!") + } + } + + override def doColumnarCodeGen(args: java.lang.Object) : (TreeNode, ArrowType) = { + val (str_node, _): (TreeNode, ArrowType) = + src.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) + val (matchingExpr_node, _): (TreeNode, ArrowType) = + matchingExpr.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) + val (replaceExpr_node, _): (TreeNode, ArrowType) = + replaceExpr.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) + val resultType = new ArrowType.Utf8() + (TreeBuilder.makeFunction("translate", + Lists.newArrayList(str_node, matchingExpr_node, replaceExpr_node), resultType), resultType) + } +} + object ColumnarTernaryOperator { - def create(str: Expression, pos: Expression, len: Expression, + def create(src: Expression, arg1: Expression, arg2: Expression, original: Expression): Expression = original match { case ss: Substring => - new ColumnarSubString(str, pos, len, ss) + new ColumnarSubString(src, arg1, arg2, ss) // Currently not supported. // case a: StringSplit => // new ColumnarStringSplit(str, a.regex, a.limit, a) + case st: StringTranslate => + new ColumnarStringTranslate(src, arg1, arg2, st) case other => throw new UnsupportedOperationException(s"not currently supported: $other.") } diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarUnaryOperator.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarUnaryOperator.scala index 1dd97858a..ef417d63e 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarUnaryOperator.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarUnaryOperator.scala @@ -293,6 +293,62 @@ class ColumnarAbs(child: Expression, original: Expression) } } +class ColumnarFloor(child: Expression, original: Expression) + extends Floor(child: Expression) + with ColumnarExpression + with Logging { + + buildCheck() + + def buildCheck(): Unit = { + // Currently, decimal type is not supported. + val supportedTypes = List(DoubleType, LongType) + if (supportedTypes.indexOf(child.dataType) == -1 && + !child.dataType.isInstanceOf[DecimalType]) { + throw new UnsupportedOperationException( + s"${child.dataType} is not supported in ColumnarFloor") + } + } + + override def doColumnarCodeGen(args: java.lang.Object): (TreeNode, ArrowType) = { + val (child_node, _): (TreeNode, ArrowType) = + child.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) + + val resultType = CodeGeneration.getResultType(dataType) + val funcNode = + TreeBuilder.makeFunction("floor", Lists.newArrayList(child_node), resultType) + (funcNode, resultType) + } +} + +class ColumnarCeil(child: Expression, original: Expression) + extends Ceil(child: Expression) + with ColumnarExpression + with Logging { + + buildCheck() + + def buildCheck(): Unit = { + // Currently, decimal type is not supported. + val supportedTypes = List(DoubleType, LongType) + if (supportedTypes.indexOf(child.dataType) == -1 && + !child.dataType.isInstanceOf[DecimalType]) { + throw new UnsupportedOperationException( + s"${child.dataType} is not supported in ColumnarCeil") + } + } + + override def doColumnarCodeGen(args: java.lang.Object): (TreeNode, ArrowType) = { + val (child_node, _): (TreeNode, ArrowType) = + child.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) + + val resultType = CodeGeneration.getResultType(dataType) + val funcNode = + TreeBuilder.makeFunction("ceil", Lists.newArrayList(child_node), resultType) + (funcNode, resultType) + } +} + class ColumnarUpper(child: Expression, original: Expression) extends Upper(child: Expression) with ColumnarExpression @@ -822,6 +878,10 @@ object ColumnarUnaryOperator { new ColumnarNot(child, n) case a: Abs => new ColumnarAbs(child, a) + case f: Floor => + new ColumnarFloor(child, f) + case c: Ceil => + new ColumnarCeil(child, c) case u: Upper => new ColumnarUpper(child, u) case c: Cast => diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala index 553c98abd..8e3a2afb5 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala @@ -338,9 +338,15 @@ case class ColumnarPostOverrides() extends Rule[SparkPlan] { def replaceWithColumnarPlan(plan: SparkPlan): SparkPlan = plan match { case plan: RowToColumnarExec => - val child = replaceWithColumnarPlan(plan.child) - logDebug(s"ColumnarPostOverrides RowToArrowColumnarExec(${child.getClass})") - RowToArrowColumnarExec(child) + if (columnarConf.enableArrowRowToColumnar) { + val child = replaceWithColumnarPlan(plan.child) + logDebug(s"ColumnarPostOverrides ArrowRowToColumnarExec(${child.getClass})") + new ArrowRowToColumnarExec(child) + } else { + val child = replaceWithColumnarPlan(plan.child) + logDebug(s"ColumnarPostOverrides RowToArrowColumnarExec(${child.getClass})") + RowToArrowColumnarExec(child) + } case ColumnarToRowExec(child: ColumnarShuffleExchangeAdaptor) => replaceWithColumnarPlan(child) case ColumnarToRowExec(child: ColumnarBroadcastExchangeAdaptor) => diff --git a/native-sql-engine/core/src/test/scala/com/intel/oap/execution/ArrowRowToColumnarExecSuite.scala b/native-sql-engine/core/src/test/scala/com/intel/oap/execution/ArrowRowToColumnarExecSuite.scala new file mode 100644 index 000000000..0fd898c50 --- /dev/null +++ b/native-sql-engine/core/src/test/scala/com/intel/oap/execution/ArrowRowToColumnarExecSuite.scala @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.oap.execution + +import java.time.ZoneId + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import com.intel.oap.expression.ConverterUtils +import com.intel.oap.vectorized.ArrowRowToColumnarJniWrapper +import org.apache.arrow.dataset.jni.UnsafeRecordBatchSerializer +import org.apache.arrow.memory.ArrowBuf +import org.apache.arrow.vector.types.pojo.{Field, Schema} + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.util.{DateTimeUtils, GenericArrayData} +import org.apache.spark.sql.execution.datasources.v2.arrow.{SparkMemoryUtils, SparkSchemaUtils} +import org.apache.spark.sql.execution.vectorized.WritableColumnVector +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{ArrayType, BooleanType, ByteType, DataType, DateType, Decimal, DecimalType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType, StructField, StructType, TimestampType} +import org.apache.spark.sql.util.ArrowUtils +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} +import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.types.UTF8String + +class ArrowRowToColumnarExecSuite extends SharedSparkSession { + + test("ArrowRowToColumnarExec: Boolean type with array list") { + val converter: UnsafeProjection = + UnsafeProjection.create(Array[DataType](ArrayType(BooleanType))) + val schema = StructType(Seq(StructField("boolean type with array", ArrayType(BooleanType)))) + val rowIterator = (0 until 2).map { i => + converter.apply(InternalRow(new GenericArrayData(Seq(true, false)))) + }.toIterator + + val cb = ArrowRowToColumnarExecSuite.nativeOp(schema, rowIterator) + val convert_rowIterator = cb.rowIterator + + var rowId = 0 + while (rowId < cb.numRows()) { + val row = convert_rowIterator.next() + rowId += 1 + val array = row.getArray(0) + assert(true == array.getBoolean(0)) + assert(false == array.getBoolean(1)) + } + } + + test("ArrowRowToColumnarExec: Byte type with array list") { + val converter: UnsafeProjection = + UnsafeProjection.create(Array[DataType](ArrayType(ByteType))) + val schema = StructType(Seq(StructField("boolean type with array", ArrayType(ByteType)))) + val rowIterator = (0 until 2).map { i => + converter.apply(InternalRow(new GenericArrayData(Seq(1.toByte, 2.toByte)))) + }.toIterator + + val cb = ArrowRowToColumnarExecSuite.nativeOp(schema, rowIterator) + val convert_rowIterator = cb.rowIterator + + var rowId = 0 + while (rowId < cb.numRows()) { + val row = convert_rowIterator.next() + rowId += 1 + val array = row.getArray(0) + assert(1.toByte == array.getByte(0)) + assert(2.toByte == array.getByte(1)) + } + } + + test("ArrowRowToColumnarExec: Short type with array list") { + val converter: UnsafeProjection = + UnsafeProjection.create(Array[DataType](ArrayType(ShortType))) + val schema = StructType(Seq(StructField("short type with array", ArrayType(ShortType)))) + val rowIterator = (0 until 2).map { i => + converter.apply(InternalRow(new GenericArrayData(Seq(1.toShort, 2.toShort)))) + }.toIterator + + val cb = ArrowRowToColumnarExecSuite.nativeOp(schema, rowIterator) + val convert_rowIterator = cb.rowIterator + + var rowId = 0 + while (rowId < cb.numRows()) { + val row = convert_rowIterator.next() + rowId += 1 + val array = row.getArray(0) + assert(1.toShort == array.getShort(0)) + assert(2.toShort == array.getShort(1)) + } + } + + test("ArrowRowToColumnarExec: Int type with array list") { + val converter: UnsafeProjection = + UnsafeProjection.create(Array[DataType](ArrayType(IntegerType))) + val schema = StructType(Seq(StructField("Int type with array", ArrayType(IntegerType)))) + val rowIterator = (0 until 2).map { i => + converter.apply(InternalRow(new GenericArrayData(Seq(-10, -20)))) + }.toIterator + + val cb = ArrowRowToColumnarExecSuite.nativeOp(schema, rowIterator) + val convert_rowIterator = cb.rowIterator + + var rowId = 0 + while (rowId < cb.numRows()) { + val row = convert_rowIterator.next() + rowId += 1 + val array = row.getArray(0) + assert(-10 == array.getInt(0)) + assert(-20 == array.getInt(1)) + } + } + + test("ArrowRowToColumnarExec: Long type with array list") { + val converter: UnsafeProjection = + UnsafeProjection.create(Array[DataType](ArrayType(LongType))) + val schema = StructType(Seq(StructField("Long type with array", ArrayType(LongType)))) + val rowIterator = (0 until 2).map { i => + converter.apply(InternalRow(new GenericArrayData(Seq(1.toLong, 2.toLong)))) + }.toIterator + + val cb = ArrowRowToColumnarExecSuite.nativeOp(schema, rowIterator) + val convert_rowIterator = cb.rowIterator + + var rowId = 0 + while (rowId < cb.numRows()) { + val row = convert_rowIterator.next() + rowId += 1 + val array = row.getArray(0) + assert(1.toLong == array.getLong(0)) + assert(2.toLong == array.getLong(1)) + } + } + + test("ArrowRowToColumnarExec: Float type with array list") { + val converter: UnsafeProjection = + UnsafeProjection.create(Array[DataType](ArrayType(FloatType))) + val schema = StructType(Seq(StructField("Float type with array", ArrayType(FloatType)))) + val rowIterator = (0 until 2).map { i => + converter.apply(InternalRow(new GenericArrayData(Seq(1.toFloat, 2.toFloat)))) + }.toIterator + + val cb = ArrowRowToColumnarExecSuite.nativeOp(schema, rowIterator) + val convert_rowIterator = cb.rowIterator + + var rowId = 0 + while (rowId < cb.numRows()) { + val row = convert_rowIterator.next() + rowId += 1 + val array = row.getArray(0) + assert(1.toFloat == array.getFloat(0)) + assert(2.toFloat == array.getFloat(1)) + } + } + + test("ArrowRowToColumnarExec: Double type with array list") { + val converter: UnsafeProjection = + UnsafeProjection.create(Array[DataType](ArrayType(DoubleType))) + val schema = StructType(Seq(StructField("Double type with array", ArrayType(DoubleType)))) + val rowIterator = (0 until 2).map { i => + converter.apply(InternalRow(new GenericArrayData(Seq(1.toDouble, 2.toDouble)))) + }.toIterator + + val cb = ArrowRowToColumnarExecSuite.nativeOp(schema, rowIterator) + val convert_rowIterator = cb.rowIterator + + var rowId = 0 + while (rowId < cb.numRows()) { + val row = convert_rowIterator.next() + rowId += 1 + val array = row.getArray(0) + assert(1.toDouble == array.getDouble(0)) + assert(2.toDouble == array.getDouble(1)) + } + } + + test("ArrowRowToColumnarExec: String type with array list") { + val converter: UnsafeProjection = + UnsafeProjection.create(Array[DataType](ArrayType(StringType))) + val schema = StructType(Seq(StructField("String type with array", ArrayType(StringType)))) + val rowIterator = (0 until 2).map { i => + converter.apply(InternalRow(new GenericArrayData( + Seq(UTF8String.fromString("abc"), UTF8String.fromString("def"))))) + }.toIterator + + val cb = ArrowRowToColumnarExecSuite.nativeOp(schema, rowIterator) + val convert_rowIterator = cb.rowIterator + + var rowId = 0 + while (rowId < cb.numRows()) { + val row = convert_rowIterator.next() + rowId += 1 + val array = row.getArray(0) + assert(UTF8String.fromString("abc") == array.getUTF8String(0)) + assert(UTF8String.fromString("def") == array.getUTF8String(1)) + } + } + + test("ArrowRowToColumnarExec: Decimal type with array list precision <= 18") { + val converter: UnsafeProjection = + UnsafeProjection.create(Array[DataType](ArrayType(DecimalType(10, 4)))) + val schema = StructType( + Seq(StructField("Decimal type with array", ArrayType(DecimalType(10, 4))))) + val rowIterator = (0 until 2).map { i => + converter.apply(InternalRow(new GenericArrayData( + Seq(new Decimal().set(BigDecimal("-1.5645")), new Decimal().set(BigDecimal("-1.8645")))))) + }.toIterator + + val cb = ArrowRowToColumnarExecSuite.nativeOp(schema, rowIterator) + val convert_rowIterator = cb.rowIterator + + var rowId = 0 + while (rowId < cb.numRows()) { + val row = convert_rowIterator.next() + rowId += 1 + val array = row.getArray(0) + assert(new Decimal().set(BigDecimal("-1.5645")) == array.getDecimal(0, 10, 4)) + assert(new Decimal().set(BigDecimal("-1.8645")) == array.getDecimal(1, 10, 4)) + } + } + + test("ArrowRowToColumnarExec: Decimal type with array list precision > 18") { + val converter: UnsafeProjection = + UnsafeProjection.create(Array[DataType](ArrayType(DecimalType(19, 4)))) + val schema = StructType( + Seq(StructField("Decimal type with array", ArrayType(DecimalType(19, 4))))) + val rowIterator = (0 until 2).map { i => + converter.apply(InternalRow(new GenericArrayData( + Seq(new Decimal().set(BigDecimal("1.2457")), new Decimal().set(BigDecimal("1.2457")))))) + }.toIterator + + val cb = ArrowRowToColumnarExecSuite.nativeOp(schema, rowIterator) + val convert_rowIterator = cb.rowIterator + + var rowId = 0 + while (rowId < cb.numRows()) { + val row = convert_rowIterator.next() + rowId += 1 + val array = row.getArray(0) + assert(new Decimal().set(BigDecimal("1.2457")) == array.getDecimal(0, 19, 4)) + assert(new Decimal().set(BigDecimal("1.2457")) == array.getDecimal(1, 19, 4)) + } + } + + test("ArrowRowToColumnarExec: Timestamp type with array list ") { + val converter: UnsafeProjection = + UnsafeProjection.create(Array[DataType](ArrayType(TimestampType))) + val defaultZoneId = ZoneId.systemDefault() + val schema = StructType( + Seq(StructField("Timestamp type with array", ArrayType(TimestampType)))) + val rowIterator: Iterator[InternalRow] = (0 until 2).map { i => + converter.apply(InternalRow(new GenericArrayData( + Seq(DateTimeUtils.stringToTimestamp( + UTF8String.fromString("1970-1-1 00:00:00"), defaultZoneId).get, + DateTimeUtils.stringToTimestamp( + UTF8String.fromString("1970-1-1 00:00:00"), defaultZoneId).get)))) + }.toIterator + + val cb = ArrowRowToColumnarExecSuite.nativeOp(schema, rowIterator) + val convert_rowIterator = cb.rowIterator + + var rowId = 0 + while (rowId < cb.numRows()) { + val row = convert_rowIterator.next() + rowId += 1 + val array = row.getArray(0) + assert(DateTimeUtils.stringToTimestamp( + UTF8String.fromString("1970-1-1 00:00:00"), defaultZoneId).get == + array.get(0, TimestampType).asInstanceOf[Long]) + assert(DateTimeUtils.stringToTimestamp( + UTF8String.fromString("1970-1-1 00:00:00"), defaultZoneId).get == + array.get(1, TimestampType).asInstanceOf[Long]) + } + } + + test("ArrowRowToColumnarExec: Date32 type with array list ") { + val converter: UnsafeProjection = + UnsafeProjection.create(Array[DataType](ArrayType(DateType))) + val defaultZoneId = ZoneId.systemDefault() + val schema: StructType = StructType( + Seq(StructField("Date type with array", ArrayType(DateType)))) + val rowIterator: Iterator[InternalRow] = (0 until 2).map { i => + converter.apply(InternalRow(new GenericArrayData( + Seq(DateTimeUtils.stringToDate(UTF8String.fromString("1970-1-1"), defaultZoneId).get, + DateTimeUtils.stringToDate(UTF8String.fromString("1970-1-1"), defaultZoneId).get)))) + }.toIterator + + val cb: ColumnarBatch = ArrowRowToColumnarExecSuite.nativeOp(schema, rowIterator) + val convert_rowIterator = cb.rowIterator + + var rowId = 0 + while (rowId < cb.numRows()) { + val row = convert_rowIterator.next() + rowId += 1 + val array = row.getArray(0) + assert(DateTimeUtils.stringToDate(UTF8String.fromString("1970-1-1"), defaultZoneId).get == + array.get(0, DateType).asInstanceOf[Int]) + assert(DateTimeUtils.stringToDate(UTF8String.fromString("1970-1-1"), defaultZoneId).get == + array.get(1, DateType).asInstanceOf[Int]) + } + } +} + +object ArrowRowToColumnarExecSuite { + + def serializeSchema(fields: Seq[Field]): Array[Byte] = { + val schema = new Schema(fields.asJava) + ConverterUtils.getSchemaBytesBuf(schema) + } + + def nativeOp(schema: StructType, rowIterator: Iterator[InternalRow]): ColumnarBatch = { + val bufferSize = 1024 // 128M can estimator the buffer size based on the data type + val allocator = SparkMemoryUtils.contextAllocator() + val arrowBuf = allocator.buffer(bufferSize) + + val rowLength = new ListBuffer[Long]() + var rowCount = 0 + var offset = 0 + while (rowIterator.hasNext) { + val row = rowIterator.next() // UnsafeRow + assert(row.isInstanceOf[UnsafeRow]) + val unsafeRow = row.asInstanceOf[UnsafeRow] + val sizeInBytes = unsafeRow.getSizeInBytes + Platform.copyMemory(unsafeRow.getBaseObject, unsafeRow.getBaseOffset, + null, arrowBuf.memoryAddress() + offset, sizeInBytes) + offset += sizeInBytes + rowLength += sizeInBytes.toLong + rowCount += 1 + } + val timeZoneId = SparkSchemaUtils.getLocalTimezoneID() + val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId) + val schemaBytes: Array[Byte] = ConverterUtils.getSchemaBytesBuf(arrowSchema) + val jniWrapper = new ArrowRowToColumnarJniWrapper() + val serializedRecordBatch = jniWrapper.nativeConvertRowToColumnar( + schemaBytes, rowLength.toArray, arrowBuf.memoryAddress(), + SparkMemoryUtils.contextMemoryPool().getNativeInstanceId) + val rb = UnsafeRecordBatchSerializer.deserializeUnsafe(allocator, serializedRecordBatch) + val output = ConverterUtils.fromArrowRecordBatch(arrowSchema, rb) + val outputNumRows = rb.getLength + ConverterUtils.releaseArrowRecordBatch(rb) + new ColumnarBatch(output.map(v => v.asInstanceOf[ColumnVector]).toArray, outputNumRows) + } +} + diff --git a/native-sql-engine/cpp/src/CMakeLists.txt b/native-sql-engine/cpp/src/CMakeLists.txt index 7681ea973..703a5c1f7 100644 --- a/native-sql-engine/cpp/src/CMakeLists.txt +++ b/native-sql-engine/cpp/src/CMakeLists.txt @@ -513,6 +513,7 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS codegen/arrow_compute/ext/kernels_ext.cc shuffle/splitter.cc operators/columnar_to_row_converter.cc + operators/row_to_columnar_converter.cc precompile/hash_map.cc precompile/sparse_hash_map.cc precompile/builder.cc diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/expression_codegen_visitor.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/expression_codegen_visitor.cc index b4a807289..b52a522ec 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/expression_codegen_visitor.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/expression_codegen_visitor.cc @@ -282,6 +282,27 @@ arrow::Status ExpressionCodegenVisitor::Visit(const gandiva::FunctionNode& node) prepare_ss << "}" << std::endl; prepare_str_ += prepare_ss.str(); check_str_ = validity; + } else if (func_name.compare("translate") == 0) { + codes_str_ = func_name + "_" + std::to_string(cur_func_id); + auto validity = codes_str_ + "_validity"; + real_codes_str_ = codes_str_; + real_validity_str_ = validity; + std::stringstream prepare_ss; + prepare_ss << GetCTypeString(node.return_type()) << " " << codes_str_ << ";" + << std::endl; + prepare_ss << "bool " << validity << " = " << child_visitor_list[0]->GetPreCheck() + << ";" << std::endl; + prepare_ss << "if (" << validity << ") {" << std::endl; + prepare_ss << codes_str_ << " = translate" + << "(" << child_visitor_list[0]->GetResult() << ", " + << child_visitor_list[1]->GetResult() << ", " + << child_visitor_list[2]->GetResult() << ");" << std::endl; + prepare_ss << "}" << std::endl; + for (int i = 0; i < 1; i++) { + prepare_str_ += child_visitor_list[i]->GetPrepare(); + } + prepare_str_ += prepare_ss.str(); + check_str_ = validity; } else if (func_name.compare("substr") == 0) { ss << child_visitor_list[0]->GetResult() << ".substr(" << "((" << child_visitor_list[1]->GetResult() << " - 1) < 0 ? 0 : (" @@ -303,6 +324,31 @@ arrow::Status ExpressionCodegenVisitor::Visit(const gandiva::FunctionNode& node) prepare_ss << "if (" << check_str_ << ")" << std::endl; prepare_ss << codes_str_ << " = " << ss.str() << ";" << std::endl; prepare_str_ += prepare_ss.str(); + } else if (func_name.compare("instr") == 0) { + codes_str_ = func_name + "_" + std::to_string(cur_func_id); + auto validity = codes_str_ + "_validity"; + real_codes_str_ = codes_str_; + real_validity_str_ = validity; + std::stringstream prepare_ss; + prepare_ss << GetCTypeString(node.return_type()) << " " << codes_str_ << ";" + << std::endl; + prepare_ss << "bool " << validity << " = " << child_visitor_list[0]->GetPreCheck() + << ";" << std::endl; + prepare_ss << "if (" << validity << ") {" << std::endl; + prepare_ss << "auto ind = " << child_visitor_list[0]->GetResult() << ".find(" + << child_visitor_list[1]->GetResult() << ");" << std::endl; + prepare_ss << "if (ind == std::string::npos) {" << std::endl; + prepare_ss << codes_str_ << " = 0;" << std::endl; + prepare_ss << "}" << std::endl; + prepare_ss << "else {" << std::endl; + prepare_ss << codes_str_ << " = ind + 1;" << std::endl; + prepare_ss << "}" << std::endl; + prepare_ss << "}" << std::endl; + for (int i = 0; i < 1; i++) { + prepare_str_ += child_visitor_list[i]->GetPrepare(); + } + prepare_str_ += prepare_ss.str(); + check_str_ = validity; } else if (func_name.compare("btrim") == 0) { codes_str_ = func_name + "_" + std::to_string(cur_func_id); auto validity = codes_str_ + "_validity"; @@ -334,6 +380,58 @@ arrow::Status ExpressionCodegenVisitor::Visit(const gandiva::FunctionNode& node) prepare_str_ += prepare_ss.str(); check_str_ = validity; + } else if (func_name.compare("ltrim") == 0) { + codes_str_ = func_name + "_" + std::to_string(cur_func_id); + auto validity = codes_str_ + "_validity"; + real_codes_str_ = codes_str_; + real_validity_str_ = validity; + std::stringstream prepare_ss; + prepare_ss << GetCTypeString(node.return_type()) << " " << codes_str_ << ";" + << std::endl; + prepare_ss << "bool " << validity << " = " << child_visitor_list[0]->GetPreCheck() + << ";" << std::endl; + prepare_ss << "if (" << validity << ") {" << std::endl; + prepare_ss << "std::string arg = " << child_visitor_list[0]->GetResult() << ";" + << std::endl; + prepare_ss << "int start_index = 0, end_index = arg.length() - 1;" << std::endl; + prepare_ss << "while (start_index <= end_index && arg[start_index] == ' ') {" + << std::endl; + prepare_ss << "start_index++;" << std::endl; + prepare_ss << "}" << std::endl; + prepare_ss << codes_str_ << " = arg.substr(start_index, end_index - start_index + 1);" + << std::endl; + prepare_ss << "}" << std::endl; + for (int i = 0; i < 1; i++) { + prepare_str_ += child_visitor_list[i]->GetPrepare(); + } + prepare_str_ += prepare_ss.str(); + check_str_ = validity; + } else if (func_name.compare("rtrim") == 0) { + codes_str_ = func_name + "_" + std::to_string(cur_func_id); + auto validity = codes_str_ + "_validity"; + real_codes_str_ = codes_str_; + real_validity_str_ = validity; + std::stringstream prepare_ss; + prepare_ss << GetCTypeString(node.return_type()) << " " << codes_str_ << ";" + << std::endl; + prepare_ss << "bool " << validity << " = " << child_visitor_list[0]->GetPreCheck() + << ";" << std::endl; + prepare_ss << "if (" << validity << ") {" << std::endl; + prepare_ss << "std::string arg = " << child_visitor_list[0]->GetResult() << ";" + << std::endl; + prepare_ss << "int start_index = 0, end_index = arg.length() - 1;" << std::endl; + prepare_ss << "while (end_index >= start_index && arg[end_index] == ' ') {" + << std::endl; + prepare_ss << "end_index--;" << std::endl; + prepare_ss << "}" << std::endl; + prepare_ss << codes_str_ << " = arg.substr(start_index, end_index - start_index + 1);" + << std::endl; + prepare_ss << "}" << std::endl; + for (int i = 0; i < 1; i++) { + prepare_str_ += child_visitor_list[i]->GetPrepare(); + } + prepare_str_ += prepare_ss.str(); + check_str_ = validity; } else if (func_name.compare("upper") == 0) { std::stringstream prepare_ss; auto child_name = child_visitor_list[0]->GetResult(); diff --git a/native-sql-engine/cpp/src/jni/jni_wrapper.cc b/native-sql-engine/cpp/src/jni/jni_wrapper.cc index f82068c37..5e9ddad39 100644 --- a/native-sql-engine/cpp/src/jni/jni_wrapper.cc +++ b/native-sql-engine/cpp/src/jni/jni_wrapper.cc @@ -41,6 +41,7 @@ #include "jni/concurrent_map.h" #include "jni/jni_common.h" #include "operators/columnar_to_row_converter.h" +#include "operators/row_to_columnar_converter.h" #include "proto/protobuf_utils.h" #include "shuffle/splitter.h" #include "utils/exception.h" @@ -122,6 +123,7 @@ static jint JNI_VERSION = JNI_VERSION_1_8; using CodeGenerator = sparkcolumnarplugin::codegen::CodeGenerator; using ColumnarToRowConverter = sparkcolumnarplugin::columnartorow::ColumnarToRowConverter; +using RowToColumnarConverter = sparkcolumnarplugin::rowtocolumnar::RowToColumnarConverter; static arrow::jni::ConcurrentMap> handler_holder_; static arrow::jni::ConcurrentMap> batch_iterator_holder_; @@ -1051,6 +1053,7 @@ Java_com_intel_oap_vectorized_ShuffleSplitterJniWrapper_nativeMake( auto splitOptions = SplitOptions::Defaults(); splitOptions.write_schema = write_schema; splitOptions.prefer_spill = prefer_spill; + splitOptions.buffered_write = true; if (buffer_size > 0) { splitOptions.buffer_size = buffer_size; } @@ -1393,6 +1396,44 @@ Java_com_intel_oap_vectorized_ArrowColumnarToRowJniWrapper_nativeClose( JNI_METHOD_END() } +JNIEXPORT jobject JNICALL +Java_com_intel_oap_vectorized_ArrowRowToColumnarJniWrapper_nativeConvertRowToColumnar( + JNIEnv* env, jobject, jbyteArray schema_arr, jlongArray row_length, + jlong memory_address, jlong memory_pool_id) { + if (schema_arr == NULL) { + env->ThrowNew( + illegal_argument_exception_class, + std::string("Native convert row to columnar schema can't be null").c_str()); + return NULL; + } + if (row_length == NULL) { + env->ThrowNew( + illegal_argument_exception_class, + std::string("Native convert row to columnar: buf_addrs can't be null").c_str()); + return NULL; + } + + std::shared_ptr schema; + // ValueOrDie in MakeSchema + MakeSchema(env, schema_arr, &schema); + jlong* in_row_length = env->GetLongArrayElements(row_length, JNI_FALSE); + uint8_t* address = reinterpret_cast(memory_address); + auto* pool = reinterpret_cast(memory_pool_id); + int num_rows = env->GetArrayLength(row_length); + int num_columnars = schema->num_fields(); + std::shared_ptr rb; + std::shared_ptr row_to_columnar_converter = + std::make_shared(schema, num_columnars, num_rows, + in_row_length, address, pool); + JniAssertOkOrThrow(row_to_columnar_converter->Init(&rb), + "Native convert Row to Columnar Init " + "RowToColumnarConverter failed"); + + jbyteArray serialized_record_batch = + JniGetOrThrow(ToBytes(env, rb), "Error deserializing message"); + return serialized_record_batch; +} + JNIEXPORT void JNICALL Java_com_intel_oap_tpc_MallocUtils_mallocTrim(JNIEnv* env, jobject obj) { JNI_METHOD_START diff --git a/native-sql-engine/cpp/src/operators/row_to_columnar_converter.cc b/native-sql-engine/cpp/src/operators/row_to_columnar_converter.cc new file mode 100644 index 000000000..18928a481 --- /dev/null +++ b/native-sql-engine/cpp/src/operators/row_to_columnar_converter.cc @@ -0,0 +1,999 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "operators/row_to_columnar_converter.h" + +#include +#include + +namespace sparkcolumnarplugin { +namespace rowtocolumnar { + +int64_t CalculateBitSetWidthInBytes(int32_t numFields) { + return ((numFields + 63) / 64) * 8; +} + +int64_t GetFieldOffset(int64_t nullBitsetWidthInBytes, int32_t index) { + return nullBitsetWidthInBytes + 8L * index; +} + +bool IsNull(uint8_t* buffer_address, int32_t index) { + int64_t mask = 1L << (index & 0x3f); // mod 64 and shift + int64_t wordOffset = (index >> 6) * 8; + int64_t word; + memcpy(&word, buffer_address + wordOffset, sizeof(int64_t)); + int64_t value = (word & mask); + int64_t thebit = value >> (index & 0x3f); + if (thebit == 1) { + return true; + } else { + return false; + } +} + +int32_t CalculateHeaderPortionInBytes(int32_t num_elements) { + return 8 + ((num_elements + 63) / 64) * 8; +} + +int32_t WordOffset(uint8_t* buffer_address, int32_t index) { + int64_t mask = 1L << (index & 0x3f); // mod 64 and shift + int64_t wordOffset = (index >> 6) * 8; + int64_t word; + memcpy(&word, buffer_address + wordOffset, sizeof(int64_t)); + int64_t value = (word & mask); + int64_t thebit = value >> (index & 0x3f); +} + +arrow::Status CreateArrayData(std::shared_ptr schema, int64_t num_rows, + int32_t columnar_id, int64_t fieldOffset, + std::vector& offsets, uint8_t* memory_address_, + std::shared_ptr* array, + arrow::MemoryPool* pool) { + auto field = schema->field(columnar_id); + auto type = field->type(); + + switch (type->id()) { + case arrow::BooleanType::type_id: { + arrow::ArrayData out_data; + out_data.length = num_rows; + out_data.buffers.resize(2); + out_data.type = arrow::TypeTraits::type_singleton(); + + ARROW_ASSIGN_OR_RAISE(out_data.buffers[1], AllocateBitmap(num_rows, pool)); + ARROW_ASSIGN_OR_RAISE(out_data.buffers[0], AllocateBitmap(num_rows, pool)); + auto array_data = out_data.buffers[1]->mutable_data(); + int64_t position = 0; + int64_t null_count = 0; + + auto out_is_valid = out_data.buffers[0]->mutable_data(); + while (position < num_rows) { + bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); + if (is_null) { + null_count++; + arrow::BitUtil::SetBitTo(out_is_valid, position, false); + } else { + bool value = *(bool*)(memory_address_ + offsets[position] + fieldOffset); + arrow::BitUtil::SetBitTo(array_data, position, value); + arrow::BitUtil::SetBitTo(out_is_valid, position, true); + } + position++; + } + out_data.null_count = null_count; + *array = MakeArray(std::make_shared(std::move(out_data))); + return arrow::Status::OK(); + break; + } + case arrow::Int8Type::type_id: { + arrow::ArrayData out_data; + out_data.length = num_rows; + out_data.buffers.resize(2); + out_data.type = arrow::TypeTraits::type_singleton(); + ARROW_ASSIGN_OR_RAISE( + out_data.buffers[1], + AllocateBuffer(sizeof(arrow::TypeTraits::CType) * num_rows, + pool)); + ARROW_ASSIGN_OR_RAISE(out_data.buffers[0], AllocateBitmap(num_rows, pool)); + // auto array_data = out_data.buffers[1]->mutable_data(); + auto array_data = + out_data.GetMutableValues::CType>(1); + int64_t position = 0; + int64_t null_count = 0; + auto out_is_valid = out_data.buffers[0]->mutable_data(); + while (position < num_rows) { + bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); + if (is_null) { + null_count++; + arrow::BitUtil::SetBitTo(out_is_valid, position, false); + array_data[position] = arrow::TypeTraits::CType{}; + } else { + auto value = *(int8_t*)(memory_address_ + offsets[position] + fieldOffset); + array_data[position] = value; + arrow::BitUtil::SetBitTo(out_is_valid, position, true); + } + position++; + } + out_data.null_count = null_count; + *array = MakeArray(std::make_shared(std::move(out_data))); + return arrow::Status::OK(); + break; + } + case arrow::Int16Type::type_id: { + arrow::ArrayData out_data; + out_data.length = num_rows; + out_data.buffers.resize(2); + out_data.type = arrow::TypeTraits::type_singleton(); + ARROW_ASSIGN_OR_RAISE( + out_data.buffers[1], + AllocateBuffer(sizeof(arrow::TypeTraits::CType) * num_rows, + pool)); + ARROW_ASSIGN_OR_RAISE(out_data.buffers[0], AllocateBitmap(num_rows, pool)); + // auto array_data = out_data.buffers[1]->mutable_data(); + auto array_data = + out_data.GetMutableValues::CType>(1); + int64_t position = 0; + int64_t null_count = 0; + auto out_is_valid = out_data.buffers[0]->mutable_data(); + while (position < num_rows) { + bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); + if (is_null) { + null_count++; + arrow::BitUtil::SetBitTo(out_is_valid, position, false); + array_data[position] = arrow::TypeTraits::CType{}; + } else { + auto value = *(int16_t*)(memory_address_ + offsets[position] + fieldOffset); + array_data[position] = value; + arrow::BitUtil::SetBitTo(out_is_valid, position, true); + } + position++; + } + out_data.null_count = null_count; + *array = MakeArray(std::make_shared(std::move(out_data))); + return arrow::Status::OK(); + break; + } + case arrow::Int32Type::type_id: { + arrow::ArrayData out_data; + out_data.length = num_rows; + out_data.buffers.resize(2); + out_data.type = arrow::TypeTraits::type_singleton(); + ARROW_ASSIGN_OR_RAISE( + out_data.buffers[1], + AllocateBuffer(sizeof(arrow::TypeTraits::CType) * num_rows, + pool)); + ARROW_ASSIGN_OR_RAISE(out_data.buffers[0], AllocateBitmap(num_rows, pool)); + // auto array_data = out_data.buffers[1]->mutable_data(); + auto array_data = + out_data.GetMutableValues::CType>(1); + int64_t position = 0; + int64_t null_count = 0; + auto out_is_valid = out_data.buffers[0]->mutable_data(); + while (position < num_rows) { + bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); + if (is_null) { + null_count++; + arrow::BitUtil::SetBitTo(out_is_valid, position, false); + array_data[position] = arrow::TypeTraits::CType{}; + } else { + auto value = *(int32_t*)(memory_address_ + offsets[position] + fieldOffset); + array_data[position] = value; + arrow::BitUtil::SetBitTo(out_is_valid, position, true); + } + position++; + } + out_data.null_count = null_count; + *array = MakeArray(std::make_shared(std::move(out_data))); + break; + } + case arrow::Int64Type::type_id: { + arrow::ArrayData out_data; + out_data.length = num_rows; + out_data.buffers.resize(2); + out_data.type = arrow::TypeTraits::type_singleton(); + ARROW_ASSIGN_OR_RAISE( + out_data.buffers[1], + AllocateBuffer(sizeof(arrow::TypeTraits::CType) * num_rows, + pool)); + ARROW_ASSIGN_OR_RAISE(out_data.buffers[0], AllocateBitmap(num_rows, pool)); + // auto array_data = out_data.buffers[1]->mutable_data(); + auto array_data = + out_data.GetMutableValues::CType>(1); + int64_t position = 0; + int64_t null_count = 0; + auto out_is_valid = out_data.buffers[0]->mutable_data(); + while (position < num_rows) { + bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); + if (is_null) { + null_count++; + arrow::BitUtil::SetBitTo(out_is_valid, position, false); + array_data[position] = arrow::TypeTraits::CType{}; + } else { + auto value = *(int64_t*)(memory_address_ + offsets[position] + fieldOffset); + array_data[position] = value; + arrow::BitUtil::SetBitTo(out_is_valid, position, true); + } + position++; + } + out_data.null_count = null_count; + *array = MakeArray(std::make_shared(std::move(out_data))); + break; + } + case arrow::FloatType::type_id: { + arrow::ArrayData out_data; + out_data.length = num_rows; + out_data.buffers.resize(2); + out_data.type = arrow::TypeTraits::type_singleton(); + ARROW_ASSIGN_OR_RAISE( + out_data.buffers[1], + AllocateBuffer(sizeof(arrow::TypeTraits::CType) * num_rows, + pool)); + ARROW_ASSIGN_OR_RAISE(out_data.buffers[0], AllocateBitmap(num_rows, pool)); + // auto array_data = out_data.buffers[1]->mutable_data(); + auto array_data = + out_data.GetMutableValues::CType>(1); + int64_t position = 0; + int64_t null_count = 0; + auto out_is_valid = out_data.buffers[0]->mutable_data(); + while (position < num_rows) { + bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); + if (is_null) { + null_count++; + arrow::BitUtil::SetBitTo(out_is_valid, position, false); + array_data[position] = arrow::TypeTraits::CType{}; + } else { + auto value = *(float*)(memory_address_ + offsets[position] + fieldOffset); + array_data[position] = value; + arrow::BitUtil::SetBitTo(out_is_valid, position, true); + } + position++; + } + out_data.null_count = null_count; + *array = MakeArray(std::make_shared(std::move(out_data))); + break; + } + case arrow::DoubleType::type_id: { + arrow::ArrayData out_data; + out_data.length = num_rows; + out_data.buffers.resize(2); + out_data.type = arrow::TypeTraits::type_singleton(); + ARROW_ASSIGN_OR_RAISE( + out_data.buffers[1], + AllocateBuffer(sizeof(arrow::TypeTraits::CType) * num_rows, + pool)); + ARROW_ASSIGN_OR_RAISE(out_data.buffers[0], AllocateBitmap(num_rows, pool)); + // auto array_data = out_data.buffers[1]->mutable_data(); + auto array_data = + out_data.GetMutableValues::CType>(1); + int64_t position = 0; + int64_t null_count = 0; + auto out_is_valid = out_data.buffers[0]->mutable_data(); + while (position < num_rows) { + bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); + if (is_null) { + null_count++; + arrow::BitUtil::SetBitTo(out_is_valid, position, false); + array_data[position] = arrow::TypeTraits::CType{}; + } else { + auto value = *(double*)(memory_address_ + offsets[position] + fieldOffset); + array_data[position] = value; + arrow::BitUtil::SetBitTo(out_is_valid, position, true); + } + position++; + } + out_data.null_count = null_count; + *array = MakeArray(std::make_shared(std::move(out_data))); + break; + } + case arrow::BinaryType::type_id: { + std::unique_ptr::BuilderType> builder_; + std::unique_ptr array_builder; + arrow::MakeBuilder(pool, arrow::TypeTraits::type_singleton(), + &array_builder); + builder_.reset(arrow::internal::checked_cast< + arrow::TypeTraits::BuilderType*>( + array_builder.release())); + + using offset_type = typename arrow::BinaryType::offset_type; + for (int64_t position = 0; position < num_rows; position++) { + bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); + if (is_null) { + RETURN_NOT_OK(builder_->AppendNull()); + } else { + int64_t offsetAndSize; + memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset, + sizeof(int64_t)); + offset_type length = int32_t(offsetAndSize); + int32_t wordoffset = int32_t(offsetAndSize >> 32); + RETURN_NOT_OK( + builder_->Append(memory_address_ + offsets[position] + wordoffset, length)); + } + } + auto status = builder_->Finish(array); + break; + } + case arrow::StringType::type_id: { + std::unique_ptr::BuilderType> builder_; + std::unique_ptr array_builder; + arrow::MakeBuilder(pool, arrow::TypeTraits::type_singleton(), + &array_builder); + builder_.reset(arrow::internal::checked_cast< + arrow::TypeTraits::BuilderType*>( + array_builder.release())); + + using offset_type = typename arrow::StringType::offset_type; + for (int64_t position = 0; position < num_rows; position++) { + bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); + if (is_null) { + RETURN_NOT_OK(builder_->AppendNull()); + } else { + int64_t offsetAndSize; + memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset, + sizeof(int64_t)); + offset_type length = int32_t(offsetAndSize); + int32_t wordoffset = int32_t(offsetAndSize >> 32); + RETURN_NOT_OK( + builder_->Append(memory_address_ + offsets[position] + wordoffset, length)); + } + } + auto status = builder_->Finish(array); + break; + } + case arrow::Decimal128Type::type_id: { + auto dtype = std::dynamic_pointer_cast(type); + int32_t precision = dtype->precision(); + int32_t scale = dtype->scale(); + + arrow::ArrayData out_data; + out_data.length = num_rows; + out_data.buffers.resize(2); + out_data.type = arrow::decimal128(precision, scale); + ARROW_ASSIGN_OR_RAISE(out_data.buffers[1], AllocateBuffer(16 * num_rows, pool)); + ARROW_ASSIGN_OR_RAISE(out_data.buffers[0], AllocateBitmap(num_rows, pool)); + auto array_data = out_data.GetMutableValues(1); + + int64_t position = 0; + int64_t null_count = 0; + auto out_is_valid = out_data.buffers[0]->mutable_data(); + while (position < num_rows) { + bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); + if (is_null) { + null_count++; + arrow::BitUtil::SetBitTo(out_is_valid, position, false); + array_data[position] = arrow::Decimal128{}; + } else { + arrow::BitUtil::SetBitTo(out_is_valid, position, true); + if (precision <= 18) { + int64_t low_value; + memcpy(&low_value, memory_address_ + offsets[position] + fieldOffset, 8); + arrow::Decimal128 value = + arrow::Decimal128(arrow::BasicDecimal128(low_value)); + array_data[position] = value; + } else { + int64_t offsetAndSize; + memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset, + sizeof(int64_t)); + int32_t length = int32_t(offsetAndSize); + int32_t wordoffset = int32_t(offsetAndSize >> 32); + uint8_t bytesValue[length]; + memcpy(bytesValue, memory_address_ + offsets[position] + wordoffset, length); + uint8_t bytesValue2[16]{}; + for (int k = length - 1; k >= 0; k--) { + bytesValue2[length - 1 - k] = bytesValue[k]; + } + arrow::Decimal128 value = + arrow::Decimal128(arrow::BasicDecimal128(bytesValue2)); + array_data[position] = value; + } + } + position++; + } + out_data.null_count = null_count; + *array = MakeArray(std::make_shared(std::move(out_data))); + break; + } + case arrow::Date32Type::type_id: { + arrow::ArrayData out_data; + out_data.length = num_rows; + out_data.buffers.resize(2); + out_data.type = arrow::TypeTraits::type_singleton(); + ARROW_ASSIGN_OR_RAISE( + out_data.buffers[1], + AllocateBuffer(sizeof(arrow::TypeTraits::CType) * num_rows, + pool)); + ARROW_ASSIGN_OR_RAISE(out_data.buffers[0], AllocateBitmap(num_rows, pool)); + // auto array_data = out_data.buffers[1]->mutable_data(); + auto array_data = + out_data.GetMutableValues::CType>(1); + int64_t position = 0; + int64_t null_count = 0; + auto out_is_valid = out_data.buffers[0]->mutable_data(); + while (position < num_rows) { + bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); + if (is_null) { + null_count++; + arrow::BitUtil::SetBitTo(out_is_valid, position, false); + array_data[position] = arrow::TypeTraits::CType{}; + } else { + auto value = *(int32_t*)(memory_address_ + offsets[position] + fieldOffset); + array_data[position] = value; + arrow::BitUtil::SetBitTo(out_is_valid, position, true); + } + position++; + } + out_data.null_count = null_count; + *array = MakeArray(std::make_shared(std::move(out_data))); + break; + } + case arrow::TimestampType::type_id: { + arrow::ArrayData out_data; + out_data.length = num_rows; + out_data.buffers.resize(2); + out_data.type = arrow::int64(); + ARROW_ASSIGN_OR_RAISE( + out_data.buffers[1], + AllocateBuffer( + sizeof(arrow::TypeTraits::CType) * num_rows, pool)); + ARROW_ASSIGN_OR_RAISE(out_data.buffers[0], AllocateBitmap(num_rows, pool)); + // auto array_data = out_data.buffers[1]->mutable_data(); + auto array_data = + out_data.GetMutableValues::CType>(1); + int64_t position = 0; + int64_t null_count = 0; + auto out_is_valid = out_data.buffers[0]->mutable_data(); + while (position < num_rows) { + bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); + if (is_null) { + null_count++; + arrow::BitUtil::SetBitTo(out_is_valid, position, false); + array_data[position] = arrow::TypeTraits::CType{}; + } else { + auto value = *(int64_t*)(memory_address_ + offsets[position] + fieldOffset); + array_data[position] = value; + arrow::BitUtil::SetBitTo(out_is_valid, position, true); + } + position++; + } + out_data.null_count = null_count; + *array = MakeArray(std::make_shared(std::move(out_data))); + break; + } + case arrow::ListType::type_id: { + auto list_type = std::dynamic_pointer_cast(type); + auto child_type = list_type->value_type(); + switch (child_type->id()) { + case arrow::BooleanType::type_id: { + arrow::ListBuilder parent_builder( + pool, std::make_shared(pool)); + // The following builder is owned by components_builder. + arrow::BooleanBuilder& child_builder = + *(static_cast(parent_builder.value_builder())); + for (int64_t position = 0; position < num_rows; position++) { + bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); + if (is_null) { + RETURN_NOT_OK(parent_builder.AppendNull()); + } else { + RETURN_NOT_OK(parent_builder.Append()); + int64_t offsetAndSize; + memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset, + sizeof(int64_t)); + int32_t length = int32_t(offsetAndSize); + int32_t wordoffset = int32_t(offsetAndSize >> 32); + int64_t num_elements = + *(int64_t*)(memory_address_ + offsets[position] + wordoffset); + int64_t header_in_bytes = CalculateHeaderPortionInBytes(num_elements); + for (auto j = 0; j < num_elements; j++) { + bool is_null = + IsNull(memory_address_ + offsets[position] + wordoffset + 8, j); + if (is_null) { + child_builder.AppendNull(); + } else { + bool value = *(bool*)(memory_address_ + offsets[position] + wordoffset + + header_in_bytes + j * sizeof(bool)); + RETURN_NOT_OK(child_builder.Append(value)); + } + } + } + } + ARROW_RETURN_NOT_OK(parent_builder.Finish(array)); + break; + } + case arrow::Int8Type::type_id: { + arrow::ListBuilder parent_builder(pool, + std::make_shared(pool)); + // The following builder is owned by components_builder. + arrow::Int8Builder& child_builder = + *(static_cast(parent_builder.value_builder())); + for (int64_t position = 0; position < num_rows; position++) { + bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); + if (is_null) { + RETURN_NOT_OK(parent_builder.AppendNull()); + } else { + RETURN_NOT_OK(parent_builder.Append()); + int64_t offsetAndSize; + memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset, + sizeof(int64_t)); + int32_t length = int32_t(offsetAndSize); + int32_t wordoffset = int32_t(offsetAndSize >> 32); + int64_t num_elements = + *(int64_t*)(memory_address_ + offsets[position] + wordoffset); + int64_t header_in_bytes = CalculateHeaderPortionInBytes(num_elements); + for (auto j = 0; j < num_elements; j++) { + bool is_null = + IsNull(memory_address_ + offsets[position] + wordoffset + 8, j); + if (is_null) { + child_builder.AppendNull(); + } else { + auto value = + *(int8_t*)(memory_address_ + offsets[position] + wordoffset + + header_in_bytes + j * sizeof(int8_t)); + RETURN_NOT_OK(child_builder.Append(value)); + } + } + } + } + ARROW_RETURN_NOT_OK(parent_builder.Finish(array)); + break; + } + case arrow::Int16Type::type_id: { + arrow::ListBuilder parent_builder(pool, + std::make_shared(pool)); + // The following builder is owned by components_builder. + arrow::Int16Builder& child_builder = + *(static_cast(parent_builder.value_builder())); + for (int64_t position = 0; position < num_rows; position++) { + bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); + if (is_null) { + RETURN_NOT_OK(parent_builder.AppendNull()); + } else { + RETURN_NOT_OK(parent_builder.Append()); + int64_t offsetAndSize; + memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset, + sizeof(int64_t)); + int32_t length = int32_t(offsetAndSize); + int32_t wordoffset = int32_t(offsetAndSize >> 32); + int64_t num_elements = + *(int64_t*)(memory_address_ + offsets[position] + wordoffset); + int64_t header_in_bytes = CalculateHeaderPortionInBytes(num_elements); + for (auto j = 0; j < num_elements; j++) { + bool is_null = + IsNull(memory_address_ + offsets[position] + wordoffset + 8, j); + if (is_null) { + child_builder.AppendNull(); + } else { + auto value = + *(int16_t*)(memory_address_ + offsets[position] + wordoffset + + header_in_bytes + j * sizeof(int16_t)); + RETURN_NOT_OK(child_builder.Append(value)); + } + } + } + } + ARROW_RETURN_NOT_OK(parent_builder.Finish(array)); + break; + } + case arrow::Int32Type::type_id: { + arrow::ListBuilder parent_builder(pool, + std::make_shared(pool)); + // The following builder is owned by components_builder. + arrow::Int32Builder& child_builder = + *(static_cast(parent_builder.value_builder())); + for (int64_t position = 0; position < num_rows; position++) { + bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); + if (is_null) { + RETURN_NOT_OK(parent_builder.AppendNull()); + } else { + RETURN_NOT_OK(parent_builder.Append()); + int64_t offsetAndSize; + memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset, + sizeof(int64_t)); + int32_t length = int32_t(offsetAndSize); + int32_t wordoffset = int32_t(offsetAndSize >> 32); + int64_t num_elements = + *(int64_t*)(memory_address_ + offsets[position] + wordoffset); + int64_t header_in_bytes = CalculateHeaderPortionInBytes(num_elements); + for (auto j = 0; j < num_elements; j++) { + bool is_null = + IsNull(memory_address_ + offsets[position] + wordoffset + 8, j); + if (is_null) { + RETURN_NOT_OK(child_builder.AppendNull()); + } else { + auto value = + *(int32_t*)(memory_address_ + offsets[position] + wordoffset + + header_in_bytes + j * sizeof(int32_t)); + RETURN_NOT_OK(child_builder.Append(value)); + } + } + } + } + ARROW_RETURN_NOT_OK(parent_builder.Finish(array)); + break; + } + case arrow::Int64Type::type_id: { + arrow::ListBuilder parent_builder(pool, + std::make_shared(pool)); + // The following builder is owned by components_builder. + arrow::Int64Builder& child_builder = + *(static_cast(parent_builder.value_builder())); + for (int64_t position = 0; position < num_rows; position++) { + bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); + if (is_null) { + RETURN_NOT_OK(parent_builder.AppendNull()); + } else { + RETURN_NOT_OK(parent_builder.Append()); + int64_t offsetAndSize; + memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset, + sizeof(int64_t)); + int32_t length = int32_t(offsetAndSize); + int32_t wordoffset = int32_t(offsetAndSize >> 32); + int64_t num_elements = + *(int64_t*)(memory_address_ + offsets[position] + wordoffset); + int64_t header_in_bytes = CalculateHeaderPortionInBytes(num_elements); + for (auto j = 0; j < num_elements; j++) { + bool is_null = + IsNull(memory_address_ + offsets[position] + wordoffset + 8, j); + if (is_null) { + child_builder.AppendNull(); + } else { + auto value = + *(int64_t*)(memory_address_ + offsets[position] + wordoffset + + header_in_bytes + j * sizeof(int64_t)); + RETURN_NOT_OK(child_builder.Append(value)); + } + } + } + } + ARROW_RETURN_NOT_OK(parent_builder.Finish(array)); + break; + } + case arrow::FloatType::type_id: { + arrow::ListBuilder parent_builder(pool, + std::make_shared(pool)); + // The following builder is owned by components_builder. + arrow::FloatBuilder& child_builder = + *(static_cast(parent_builder.value_builder())); + for (int64_t position = 0; position < num_rows; position++) { + bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); + if (is_null) { + RETURN_NOT_OK(parent_builder.AppendNull()); + } else { + RETURN_NOT_OK(parent_builder.Append()); + int64_t offsetAndSize; + memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset, + sizeof(int64_t)); + int32_t length = int32_t(offsetAndSize); + int32_t wordoffset = int32_t(offsetAndSize >> 32); + int64_t num_elements = + *(int64_t*)(memory_address_ + offsets[position] + wordoffset); + int64_t header_in_bytes = CalculateHeaderPortionInBytes(num_elements); + for (auto j = 0; j < num_elements; j++) { + bool is_null = + IsNull(memory_address_ + offsets[position] + wordoffset + 8, j); + if (is_null) { + child_builder.AppendNull(); + } else { + auto value = + *(float*)(memory_address_ + offsets[position] + wordoffset + + header_in_bytes + j * sizeof(float)); + RETURN_NOT_OK(child_builder.Append(value)); + } + } + } + } + ARROW_RETURN_NOT_OK(parent_builder.Finish(array)); + break; + } + case arrow::DoubleType::type_id: { + arrow::ListBuilder parent_builder(pool, + std::make_shared(pool)); + // The following builder is owned by components_builder. + arrow::DoubleBuilder& child_builder = + *(static_cast(parent_builder.value_builder())); + for (int64_t position = 0; position < num_rows; position++) { + bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); + if (is_null) { + RETURN_NOT_OK(parent_builder.AppendNull()); + } else { + RETURN_NOT_OK(parent_builder.Append()); + int64_t offsetAndSize; + memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset, + sizeof(int64_t)); + int32_t length = int32_t(offsetAndSize); + int32_t wordoffset = int32_t(offsetAndSize >> 32); + int64_t num_elements = + *(int64_t*)(memory_address_ + offsets[position] + wordoffset); + int64_t header_in_bytes = CalculateHeaderPortionInBytes(num_elements); + for (auto j = 0; j < num_elements; j++) { + bool is_null = + IsNull(memory_address_ + offsets[position] + wordoffset + 8, j); + if (is_null) { + child_builder.AppendNull(); + } else { + auto value = + *(double*)(memory_address_ + offsets[position] + wordoffset + + header_in_bytes + j * sizeof(double)); + RETURN_NOT_OK(child_builder.Append(value)); + } + } + } + } + ARROW_RETURN_NOT_OK(parent_builder.Finish(array)); + break; + } + case arrow::Date32Type::type_id: { + arrow::ListBuilder parent_builder(pool, + std::make_shared(pool)); + // The following builder is owned by components_builder. + arrow::Date32Builder& child_builder = + *(static_cast(parent_builder.value_builder())); + for (int64_t position = 0; position < num_rows; position++) { + bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); + if (is_null) { + RETURN_NOT_OK(parent_builder.AppendNull()); + } else { + RETURN_NOT_OK(parent_builder.Append()); + int64_t offsetAndSize; + memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset, + sizeof(int64_t)); + int32_t length = int32_t(offsetAndSize); + int32_t wordoffset = int32_t(offsetAndSize >> 32); + int64_t num_elements = + *(int64_t*)(memory_address_ + offsets[position] + wordoffset); + int64_t header_in_bytes = CalculateHeaderPortionInBytes(num_elements); + for (auto j = 0; j < num_elements; j++) { + bool is_null = + IsNull(memory_address_ + offsets[position] + wordoffset + 8, j); + if (is_null) { + child_builder.AppendNull(); + } else { + auto value = + *(int32_t*)(memory_address_ + offsets[position] + wordoffset + + header_in_bytes + j * sizeof(int32_t)); + RETURN_NOT_OK(child_builder.Append(value)); + } + } + } + } + ARROW_RETURN_NOT_OK(parent_builder.Finish(array)); + break; + } + case arrow::TimestampType::type_id: { + arrow::ListBuilder parent_builder( + pool, std::make_shared(arrow::int64(), pool)); + // The following builder is owned by components_builder. + arrow::TimestampBuilder& child_builder = + *(static_cast(parent_builder.value_builder())); + for (int64_t position = 0; position < num_rows; position++) { + bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); + if (is_null) { + RETURN_NOT_OK(parent_builder.AppendNull()); + } else { + RETURN_NOT_OK(parent_builder.Append()); + int64_t offsetAndSize; + memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset, + sizeof(int64_t)); + int32_t length = int32_t(offsetAndSize); + int32_t wordoffset = int32_t(offsetAndSize >> 32); + int64_t num_elements = + *(int64_t*)(memory_address_ + offsets[position] + wordoffset); + int64_t header_in_bytes = CalculateHeaderPortionInBytes(num_elements); + for (auto j = 0; j < num_elements; j++) { + bool is_null = + IsNull(memory_address_ + offsets[position] + wordoffset + 8, j); + if (is_null) { + child_builder.AppendNull(); + } else { + auto value = + *(int64_t*)(memory_address_ + offsets[position] + wordoffset + + header_in_bytes + j * sizeof(int64_t)); + RETURN_NOT_OK(child_builder.Append(value)); + } + } + } + } + ARROW_RETURN_NOT_OK(parent_builder.Finish(array)); + break; + } + case arrow::BinaryType::type_id: { + arrow::ListBuilder parent_builder(pool, + std::make_shared(pool)); + // The following builder is owned by components_builder. + arrow::BinaryBuilder& child_builder = + *(static_cast(parent_builder.value_builder())); + for (int64_t position = 0; position < num_rows; position++) { + bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); + if (is_null) { + RETURN_NOT_OK(parent_builder.AppendNull()); + } else { + RETURN_NOT_OK(parent_builder.Append()); + int64_t offsetAndSize; + memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset, + sizeof(int64_t)); + int32_t length = int32_t(offsetAndSize); + int32_t wordoffset = int32_t(offsetAndSize >> 32); + int64_t num_elements = + *(int64_t*)(memory_address_ + offsets[position] + wordoffset); + int64_t header_in_bytes = CalculateHeaderPortionInBytes(num_elements); + using offset_type = typename arrow::BinaryType::offset_type; + for (auto j = 0; j < num_elements; j++) { + bool is_null = + IsNull(memory_address_ + offsets[position] + wordoffset + 8, j); + if (is_null) { + child_builder.AppendNull(); + } else { + int64_t elementOffsetAndSize; + memcpy(&elementOffsetAndSize, + memory_address_ + offsets[position] + wordoffset + + header_in_bytes + 8 * j, + sizeof(int64_t)); + offset_type elementLength = int32_t(elementOffsetAndSize); + int32_t elementOffset = int32_t(elementOffsetAndSize >> 32); + RETURN_NOT_OK(child_builder.Append( + memory_address_ + offsets[position] + wordoffset + elementOffset, + elementLength)); + } + } + } + } + ARROW_RETURN_NOT_OK(parent_builder.Finish(array)); + break; + } + case arrow::StringType::type_id: { + arrow::ListBuilder parent_builder(pool, + std::make_shared(pool)); + // The following builder is owned by components_builder. + arrow::StringBuilder& child_builder = + *(static_cast(parent_builder.value_builder())); + for (int64_t position = 0; position < num_rows; position++) { + bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); + if (is_null) { + RETURN_NOT_OK(parent_builder.AppendNull()); + } else { + RETURN_NOT_OK(parent_builder.Append()); + int64_t offsetAndSize; + memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset, + sizeof(int64_t)); + int32_t length = int32_t(offsetAndSize); + int32_t wordoffset = int32_t(offsetAndSize >> 32); + int64_t num_elements = + *(int64_t*)(memory_address_ + offsets[position] + wordoffset); + int64_t header_in_bytes = CalculateHeaderPortionInBytes(num_elements); + using offset_type = typename arrow::StringType::offset_type; + for (auto j = 0; j < num_elements; j++) { + bool is_null = + IsNull(memory_address_ + offsets[position] + wordoffset + 8, j); + if (is_null) { + child_builder.AppendNull(); + } else { + int64_t elementOffsetAndSize; + memcpy(&elementOffsetAndSize, + memory_address_ + offsets[position] + wordoffset + + header_in_bytes + 8 * j, + sizeof(int64_t)); + offset_type elementLength = int32_t(elementOffsetAndSize); + int32_t elementOffset = int32_t(elementOffsetAndSize >> 32); + RETURN_NOT_OK(child_builder.Append( + memory_address_ + offsets[position] + wordoffset + elementOffset, + elementLength)); + } + } + } + } + ARROW_RETURN_NOT_OK(parent_builder.Finish(array)); + break; + } + case arrow::Decimal128Type::type_id: { + std::shared_ptr dtype = + std::dynamic_pointer_cast(child_type); + int32_t precision = dtype->precision(); + int32_t scale = dtype->scale(); + arrow::ListBuilder parent_builder( + pool, std::make_shared(dtype, pool)); + // The following builder is owned by components_builder. + arrow::Decimal128Builder& child_builder = + *(static_cast(parent_builder.value_builder())); + + for (int64_t position = 0; position < num_rows; position++) { + bool is_null = IsNull(memory_address_ + offsets[position], columnar_id); + if (is_null) { + RETURN_NOT_OK(parent_builder.AppendNull()); + } else { + RETURN_NOT_OK(parent_builder.Append()); + int64_t offsetAndSize; + memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset, + sizeof(int64_t)); + int32_t length = int32_t(offsetAndSize); + int32_t wordoffset = int32_t(offsetAndSize >> 32); + int64_t num_elements = + *(int64_t*)(memory_address_ + offsets[position] + wordoffset); + int64_t header_in_bytes = CalculateHeaderPortionInBytes(num_elements); + for (auto j = 0; j < num_elements; j++) { + bool is_null = + IsNull(memory_address_ + offsets[position] + wordoffset + 8, j); + if (is_null) { + child_builder.AppendNull(); + } else { + if (precision <= 18) { + int64_t low_value; + memcpy(&low_value, + memory_address_ + offsets[position] + wordoffset + + header_in_bytes + 8 * j, + sizeof(int64_t)); + auto value = arrow::Decimal128(arrow::BasicDecimal128(low_value)); + RETURN_NOT_OK(child_builder.Append(value)); + } else { + int64_t elementOffsetAndSize; + memcpy(&elementOffsetAndSize, + memory_address_ + offsets[position] + wordoffset + + header_in_bytes + 8 * j, + sizeof(int64_t)); + int32_t elementLength = int32_t(elementOffsetAndSize); + int32_t elementOffset = int32_t(elementOffsetAndSize >> 32); + uint8_t bytesValue[elementLength]; + memcpy( + bytesValue, + memory_address_ + offsets[position] + wordoffset + elementOffset, + elementLength); + uint8_t bytesValue2[16]{}; + for (int k = elementLength - 1; k >= 0; k--) { + bytesValue2[elementLength - 1 - k] = bytesValue[k]; + } + arrow::Decimal128 value = + arrow::Decimal128(arrow::BasicDecimal128(bytesValue2)); + RETURN_NOT_OK(child_builder.Append(value)); + } + } + } + } + } + ARROW_RETURN_NOT_OK(parent_builder.Finish(array)); + break; + } + default: + return arrow::Status::Invalid("Unsupported data type: " + child_type->id()); + } + break; + } + default: + return arrow::Status::Invalid("Unsupported data type: " + type->id()); + } + return arrow::Status::OK(); +} + +arrow::Status RowToColumnarConverter::Init(std::shared_ptr* batch) { + int64_t nullBitsetWidthInBytes = CalculateBitSetWidthInBytes(num_cols_); + for (auto i = 0; i < num_rows_; i++) { + offsets_.push_back(0); + } + for (auto i = 1; i < num_rows_; i++) { + offsets_[i] = offsets_[i - 1] + row_length_[i - 1]; + } + std::vector> arrays; + auto num_fields = schema_->num_fields(); + + for (auto i = 0; i < num_fields; i++) { + auto field = schema_->field(i); + std::shared_ptr array_data; + int64_t field_offset = GetFieldOffset(nullBitsetWidthInBytes, i); + RETURN_NOT_OK(CreateArrayData(schema_, num_rows_, i, field_offset, offsets_, + memory_address_, &array_data, m_pool_)); + arrays.push_back(array_data); + } + *batch = arrow::RecordBatch::Make(schema_, num_rows_, arrays); + return arrow::Status::OK(); +} + +} // namespace rowtocolumnar +} // namespace sparkcolumnarplugin \ No newline at end of file diff --git a/native-sql-engine/cpp/src/operators/row_to_columnar_converter.h b/native-sql-engine/cpp/src/operators/row_to_columnar_converter.h new file mode 100644 index 000000000..76ea7e3b7 --- /dev/null +++ b/native-sql-engine/cpp/src/operators/row_to_columnar_converter.h @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "gandiva/decimal_type_util.h" + +namespace sparkcolumnarplugin { +namespace rowtocolumnar { + +class RowToColumnarConverter { + public: + RowToColumnarConverter(std::shared_ptr schema, int64_t num_cols, + int64_t num_rows, int64_t* row_length, uint8_t* memory_address, + arrow::MemoryPool* memory_pool) + : schema_(schema), + num_cols_(num_cols), + num_rows_(num_rows), + row_length_(row_length), + memory_address_(memory_address), + m_pool_(memory_pool) {} + + arrow::Status Init(std::shared_ptr* batch); + + protected: + std::shared_ptr schema_; + int64_t num_cols_; + int64_t num_rows_; + int64_t* row_length_; + uint8_t* memory_address_; + std::vector offsets_; + arrow::MemoryPool* m_pool_ = arrow::default_memory_pool(); +}; + +} // namespace rowtocolumnar +} // namespace sparkcolumnarplugin diff --git a/native-sql-engine/cpp/src/precompile/gandiva.h b/native-sql-engine/cpp/src/precompile/gandiva.h index d722f799d..a3d5e53a5 100644 --- a/native-sql-engine/cpp/src/precompile/gandiva.h +++ b/native-sql-engine/cpp/src/precompile/gandiva.h @@ -26,6 +26,7 @@ #include #include #include +#include #include "third_party/gandiva/decimal_ops.h" #include "third_party/gandiva/types.h" @@ -314,4 +315,30 @@ bool like(const std::string& data, const std::string& pattern) { std::string pcre_pattern = SqlLikePatternToPcre(pattern, 0); RE2 regex(pcre_pattern); return RE2::FullMatch(data, regex); +} + +const std::string translate(const std::string text, const std::string matching_str, + const std::string replace_str) { + char res[text.length()]; + std::unordered_map replace_map; + for (int i = 0; i < matching_str.length(); i++) { + if (i >= replace_str.length()) { + replace_map[matching_str[i]] = '\0'; + } else { + replace_map[matching_str[i]] = replace_str[i]; + } + } + int j = 0; + for (int i = 0; i < text.length(); i++) { + if (replace_map.find(text[i]) == replace_map.end()) { + res[j++] = text[i]; + continue; + } + char replace_char = replace_map[text[i]]; + if (replace_char != '\0') { + res[j++] = replace_char; + } + } + int out_len = j; + return std::string((char*)res, out_len); } \ No newline at end of file diff --git a/native-sql-engine/cpp/src/shuffle/splitter.cc b/native-sql-engine/cpp/src/shuffle/splitter.cc index d58f0605d..fa99532ac 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.cc +++ b/native-sql-engine/cpp/src/shuffle/splitter.cc @@ -433,8 +433,15 @@ arrow::Status Splitter::Split(const arrow::RecordBatch& rb) { arrow::Status Splitter::Stop() { EVAL_START("write", options_.thread_id) // open data file output stream - ARROW_ASSIGN_OR_RAISE(data_file_os_, + std::shared_ptr fout; + ARROW_ASSIGN_OR_RAISE(fout, arrow::io::FileOutputStream::Open(options_.data_file, true)); + if (options_.buffered_write) { + ARROW_ASSIGN_OR_RAISE(data_file_os_, arrow::io::BufferedOutputStream::Create( + 16384, options_.memory_pool, fout)); + } else { + data_file_os_ = fout; + } // stop PartitionWriter and collect metrics for (auto pid = 0; pid < num_partitions_; ++pid) { diff --git a/native-sql-engine/cpp/src/shuffle/splitter.h b/native-sql-engine/cpp/src/shuffle/splitter.h index dbf07aa87..501aa6f10 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.h +++ b/native-sql-engine/cpp/src/shuffle/splitter.h @@ -230,7 +230,7 @@ class Splitter { std::vector sub_dir_selection_; std::vector configured_dirs_; - std::shared_ptr data_file_os_; + std::shared_ptr data_file_os_; // shared by all partition writers std::shared_ptr schema_payload_; diff --git a/native-sql-engine/cpp/src/shuffle/type.h b/native-sql-engine/cpp/src/shuffle/type.h index e73974243..77d254670 100644 --- a/native-sql-engine/cpp/src/shuffle/type.h +++ b/native-sql-engine/cpp/src/shuffle/type.h @@ -44,6 +44,7 @@ struct SplitOptions { arrow::Compression::type compression_type = arrow::Compression::UNCOMPRESSED; bool prefer_spill = true; bool write_schema = true; + bool buffered_write = false; std::string data_file; diff --git a/pom.xml b/pom.xml index 23f351ce5..c869a42cd 100644 --- a/pom.xml +++ b/pom.xml @@ -53,17 +53,17 @@ hadoop-2.7.4 - - - !hadoop.version - - 2.7.4 hadoop-3.2 + + + !hadoop.version + + 3.2.0