From 8601dc8627aaba4f827565c7f56d46b1993f0875 Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Thu, 9 Jun 2022 17:04:41 +0800 Subject: [PATCH 1/4] implement concat_ws Signed-off-by: Yuan Zhou --- .../expression/ColumnarConcatOperator.scala | 67 +++++++++++++++++++ .../ColumnarExpressionConverter.scala | 10 +++ 2 files changed, 77 insertions(+) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarConcatOperator.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarConcatOperator.scala index 25f0155df..1282492a1 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarConcatOperator.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarConcatOperator.scala @@ -29,6 +29,67 @@ import org.apache.spark.sql.types._ import scala.collection.mutable.ListBuffer +class ColumnarConcatWs(exps: Seq[Expression], original: Expression) + extends ConcatWs(exps: Seq[Expression]) + with ColumnarExpression + with Logging { + + buildCheck() + + def buildCheck(): Unit = { + exps.foreach(expr => + if (expr.dataType != StringType) { + throw new UnsupportedOperationException( + s"${expr.dataType} is not supported in ColumnarConcatWS") + }) + } + + override def supportColumnarCodegen(args: java.lang.Object): Boolean = { + false + } + + override def doColumnarCodeGen(args: java.lang.Object): (TreeNode, ArrowType) = { + val iter: Iterator[Expression] = exps.iterator + val exp = iter.next() + val iterFaster: Iterator[Expression] = exps.iterator + iterFaster.next() + iterFaster.next() + + val (exp_node, expType): (TreeNode, ArrowType) = + exp.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) + + val resultType = new ArrowType.Utf8() + //concat_ws is null senstive + val funcNode = TreeBuilder.makeFunction("concat", + Lists.newArrayList(exp_node, rightNode(args, exp, exps, iter, iterFaster)), resultType) + (funcNode, expType) + } + + def rightNode(args: java.lang.Object, head: Expression, exps: Seq[Expression], + iter: Iterator[Expression], iterFaster: Iterator[Expression]): TreeNode = { + if (!iterFaster.hasNext) { + // When iter reaches the last but one expression + val (exp_node, expType): (TreeNode, ArrowType) = + exps.last.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) + val (head_node, headType): (TreeNode, ArrowType) = + head.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) + val resultType = new ArrowType.Utf8() + val funcNode = TreeBuilder.makeFunction("concat", + Lists.newArrayList(head_node, exp_node), resultType) + funcNode + } else { + val exp = iter.next() + iterFaster.next() + val (exp_node, expType): (TreeNode, ArrowType) = + exp.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) + val resultType = new ArrowType.Utf8() + val funcNode = TreeBuilder.makeFunction("concat", + Lists.newArrayList(exp_node, rightNode(args, head, exps, iter, iterFaster)), resultType) + funcNode + } + } +} + class ColumnarConcat(exps: Seq[Expression], original: Expression) extends Concat(exps: Seq[Expression]) with ColumnarExpression @@ -44,6 +105,10 @@ class ColumnarConcat(exps: Seq[Expression], original: Expression) }) } + override def supportColumnarCodegen(args: java.lang.Object): Boolean = { + false + } + override def doColumnarCodeGen(args: java.lang.Object): (TreeNode, ArrowType) = { val iter: Iterator[Expression] = exps.iterator val exp = iter.next() @@ -85,6 +150,8 @@ object ColumnarConcatOperator { def create(exps: Seq[Expression], original: Expression): Expression = original match { case c: Concat => new ColumnarConcat(exps, original) + case cws: ConcatWs => + new ColumnarConcatWs(exps, original) case other => throw new UnsupportedOperationException(s"not currently supported: $other.") } diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarExpressionConverter.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarExpressionConverter.scala index a9832f2c1..9f3385ddb 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarExpressionConverter.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarExpressionConverter.scala @@ -375,6 +375,16 @@ object ColumnarExpressionConverter extends Logging { convertBoundRefToAttrRef = convertBoundRefToAttrRef) } ColumnarConcatOperator.create(exps, expr) + case cws: ConcatWs => + check_if_no_calculation = false + logInfo(s"${expr.getClass} ${expr} is supported, no_cal is $check_if_no_calculation.") + val exps = cws.children.map { expr => + replaceWithColumnarExpression( + expr, + attributeSeq, + convertBoundRefToAttrRef = convertBoundRefToAttrRef) + } + ColumnarConcatOperator.create(exps, expr) case r: Round => check_if_no_calculation = false logInfo(s"${expr.getClass} ${expr} is supported, no_cal is $check_if_no_calculation.") From 761db9fb555674b78bc3763af69759f05814603b Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Mon, 13 Jun 2022 18:07:57 +0800 Subject: [PATCH 2/4] fix logic Signed-off-by: Yuan Zhou --- .../expression/ColumnarConcatOperator.scala | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarConcatOperator.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarConcatOperator.scala index 1282492a1..74119065b 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarConcatOperator.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarConcatOperator.scala @@ -50,33 +50,31 @@ class ColumnarConcatWs(exps: Seq[Expression], original: Expression) override def doColumnarCodeGen(args: java.lang.Object): (TreeNode, ArrowType) = { val iter: Iterator[Expression] = exps.iterator - val exp = iter.next() + val exp = iter.next() // spliter + val exp1 = iter.next() val iterFaster: Iterator[Expression] = exps.iterator iterFaster.next() iterFaster.next() + iterFaster.next() - val (exp_node, expType): (TreeNode, ArrowType) = + val (split_node, expType): (TreeNode, ArrowType) = exp.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) + val (exp1_node, exp1Type): (TreeNode, ArrowType) = + exp1.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) val resultType = new ArrowType.Utf8() - //concat_ws is null senstive val funcNode = TreeBuilder.makeFunction("concat", - Lists.newArrayList(exp_node, rightNode(args, exp, exps, iter, iterFaster)), resultType) + Lists.newArrayList(exp1_node, split_node, rightNode(args, exps, split_node, iter, iterFaster)), resultType) (funcNode, expType) } - def rightNode(args: java.lang.Object, head: Expression, exps: Seq[Expression], + def rightNode(args: java.lang.Object, exps: Seq[Expression], split_node: TreeNode, iter: Iterator[Expression], iterFaster: Iterator[Expression]): TreeNode = { if (!iterFaster.hasNext) { // When iter reaches the last but one expression val (exp_node, expType): (TreeNode, ArrowType) = exps.last.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) - val (head_node, headType): (TreeNode, ArrowType) = - head.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) - val resultType = new ArrowType.Utf8() - val funcNode = TreeBuilder.makeFunction("concat", - Lists.newArrayList(head_node, exp_node), resultType) - funcNode + exp_node } else { val exp = iter.next() iterFaster.next() @@ -84,7 +82,7 @@ class ColumnarConcatWs(exps: Seq[Expression], original: Expression) exp.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) val resultType = new ArrowType.Utf8() val funcNode = TreeBuilder.makeFunction("concat", - Lists.newArrayList(exp_node, rightNode(args, head, exps, iter, iterFaster)), resultType) + Lists.newArrayList(exp_node, split_node, rightNode(args, exps, split_node, iter, iterFaster)), resultType) funcNode } } From 8ad8949e5583d2a043733011095f335892efadf1 Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Mon, 13 Jun 2022 18:08:31 +0800 Subject: [PATCH 3/4] check arrow Signed-off-by: Yuan Zhou --- arrow-data-source/script/build_arrow.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-data-source/script/build_arrow.sh b/arrow-data-source/script/build_arrow.sh index d8ec40128..68e78837e 100755 --- a/arrow-data-source/script/build_arrow.sh +++ b/arrow-data-source/script/build_arrow.sh @@ -62,7 +62,7 @@ echo "ARROW_SOURCE_DIR=${ARROW_SOURCE_DIR}" echo "ARROW_INSTALL_DIR=${ARROW_INSTALL_DIR}" mkdir -p $ARROW_SOURCE_DIR mkdir -p $ARROW_INSTALL_DIR -git clone https://github.com/oap-project/arrow.git --branch arrow-4.0.0-oap $ARROW_SOURCE_DIR +git clone https://github.com/zhouyuan/arrow.git --branch wip_lpad $ARROW_SOURCE_DIR pushd $ARROW_SOURCE_DIR cmake ./cpp \ From b79d9ffa053213103be403188ea60a10230628b6 Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Tue, 14 Jun 2022 23:35:44 +0800 Subject: [PATCH 4/4] Revert "check arrow" This reverts commit 8ad8949e5583d2a043733011095f335892efadf1. --- arrow-data-source/script/build_arrow.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-data-source/script/build_arrow.sh b/arrow-data-source/script/build_arrow.sh index 68e78837e..d8ec40128 100755 --- a/arrow-data-source/script/build_arrow.sh +++ b/arrow-data-source/script/build_arrow.sh @@ -62,7 +62,7 @@ echo "ARROW_SOURCE_DIR=${ARROW_SOURCE_DIR}" echo "ARROW_INSTALL_DIR=${ARROW_INSTALL_DIR}" mkdir -p $ARROW_SOURCE_DIR mkdir -p $ARROW_INSTALL_DIR -git clone https://github.com/zhouyuan/arrow.git --branch wip_lpad $ARROW_SOURCE_DIR +git clone https://github.com/oap-project/arrow.git --branch arrow-4.0.0-oap $ARROW_SOURCE_DIR pushd $ARROW_SOURCE_DIR cmake ./cpp \