Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-955] implement lpad/rpad #964

Merged
merged 6 commits into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,15 @@ case class ColumnarSortMergeJoinExec(
// build check for condition
val conditionExpr: Expression = condition.orNull
if (conditionExpr != null) {
ColumnarExpressionConverter.replaceWithColumnarExpression(conditionExpr)
val columnarConditionExpr =
ColumnarExpressionConverter.replaceWithColumnarExpression(conditionExpr)
val supportCodegen =
columnarConditionExpr.asInstanceOf[ColumnarExpression].supportColumnarCodegen(null)
// Columnar SMJ only has codegen version of implementation.
if (!supportCodegen) {
throw new UnsupportedOperationException(
"Condition expression is not fully supporting codegen!")
}
}
// build check types
for (attr <- left.output) {
Expand All @@ -372,12 +380,24 @@ case class ColumnarSortMergeJoinExec(
// build check for expr
if (leftKeys != null) {
for (expr <- leftKeys) {
ColumnarExpressionConverter.replaceWithColumnarExpression(expr)
val columnarExpr = ColumnarExpressionConverter.replaceWithColumnarExpression(expr)
val supportCodegen =
columnarExpr.asInstanceOf[ColumnarExpression].supportColumnarCodegen(null)
if (!supportCodegen) {
throw new UnsupportedOperationException(
"Condition expression is not fully supporting codegen!")
}
}
}
if (rightKeys != null) {
for (expr <- rightKeys) {
ColumnarExpressionConverter.replaceWithColumnarExpression(expr)
val columnarExpr = ColumnarExpressionConverter.replaceWithColumnarExpression(expr)
val supportCodegen =
columnarExpr.asInstanceOf[ColumnarExpression].supportColumnarCodegen(null)
if (!supportCodegen) {
throw new UnsupportedOperationException(
"Condition expression is not fully supporting codegen!")
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,26 @@ object ColumnarExpressionConverter extends Logging {
convertBoundRefToAttrRef = convertBoundRefToAttrRef),
expr
)
case slpad: StringLPad =>
ColumnarTernaryOperator.create(
replaceWithColumnarExpression(slpad.str, attributeSeq,
convertBoundRefToAttrRef = convertBoundRefToAttrRef),
replaceWithColumnarExpression(slpad.len, attributeSeq,
convertBoundRefToAttrRef = convertBoundRefToAttrRef),
replaceWithColumnarExpression(slpad.pad, attributeSeq,
convertBoundRefToAttrRef = convertBoundRefToAttrRef),
expr
)
case srpad: StringRPad =>
ColumnarTernaryOperator.create(
replaceWithColumnarExpression(srpad.str, attributeSeq,
convertBoundRefToAttrRef = convertBoundRefToAttrRef),
replaceWithColumnarExpression(srpad.len, attributeSeq,
convertBoundRefToAttrRef = convertBoundRefToAttrRef),
replaceWithColumnarExpression(srpad.pad, attributeSeq,
convertBoundRefToAttrRef = convertBoundRefToAttrRef),
expr
)
case sr: StringReplace =>
check_if_no_calculation = false
logInfo(s"${expr.getClass} ${expr} is supported, no_cal is $check_if_no_calculation.")
Expand Down Expand Up @@ -531,6 +551,10 @@ object ColumnarExpressionConverter extends Logging {
containsSubquery(sr.replaceExpr)
case conv: Conv =>
conv.children.map(containsSubquery).exists(_ == true)
case lpad: StringLPad =>
lpad.children.map(containsSubquery).exists(_ == true)
case rpad: StringRPad =>
rpad.children.map(containsSubquery).exists(_ == true)
case expr: ScalaUDF if (expr.udfName match {
case Some(name) =>
ColumnarUDF.isSupportedUDF(name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ class ColumnarIn(value: Expression, list: Seq[Expression], original: Expression)
throw new UnsupportedOperationException(
s"${value.dataType} is not supported in ColumnarIn.")
}
if (list.map(_.isInstanceOf[Literal]).exists(_ == false)) {
throw new UnsupportedOperationException(
"Only Literal Type is supported for the input list!"
)
}
}

override def doColumnarCodeGen(args: java.lang.Object): (TreeNode, ArrowType) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,76 @@ class ColumnarRegExpExtract(subject: Expression, regexp: Expression, idx: Expres
}
}

class ColumnarStringLPad(str: Expression, len: Expression, pad: Expression,
original: Expression) extends StringLPad(str: Expression,
len: Expression, pad: Expression) with ColumnarExpression {

buildCheck

def buildCheck: Unit = {
val supportedType = List(StringType)
if (supportedType.indexOf(str.dataType) == -1) {
throw new RuntimeException("Only string type is expected!")
}

if (!pad.isInstanceOf[Literal]) {
throw new UnsupportedOperationException("Only literal regexp" +
" is supported in ColumnarRegExpExtract by now!")
}
}

override def supportColumnarCodegen(args: java.lang.Object): Boolean = {
false
}

override def doColumnarCodeGen(args: Object): (TreeNode, ArrowType) = {
val (str_node, _): (TreeNode, ArrowType) =
str.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
val (len_node, _): (TreeNode, ArrowType) =
len.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
val (pad_node, _): (TreeNode, ArrowType) =
pad.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
val resultType = new ArrowType.Utf8()
(TreeBuilder.makeFunction("lpad",
Lists.newArrayList(str_node, len_node, pad_node), resultType), resultType)
}
}

class ColumnarStringRPad(str: Expression, len: Expression, pad: Expression,
original: Expression) extends StringRPad(str: Expression,
len: Expression, pad: Expression) with ColumnarExpression {

buildCheck

def buildCheck: Unit = {
val supportedType = List(StringType)
if (supportedType.indexOf(str.dataType) == -1) {
throw new RuntimeException("Only string type is expected!")
}

if (!pad.isInstanceOf[Literal]) {
throw new UnsupportedOperationException("Only literal regexp" +
" is supported in ColumnarRegExpExtract by now!")
}
}

override def supportColumnarCodegen(args: java.lang.Object): Boolean = {
false
}

override def doColumnarCodeGen(args: Object): (TreeNode, ArrowType) = {
val (str_node, _): (TreeNode, ArrowType) =
str.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
val (len_node, _): (TreeNode, ArrowType) =
len.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
val (pad_node, _): (TreeNode, ArrowType) =
pad.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
val resultType = new ArrowType.Utf8()
(TreeBuilder.makeFunction("rpad",
Lists.newArrayList(str_node, len_node, pad_node), resultType), resultType)
}
}

class ColumnarSubstringIndex(strExpr: Expression, delimExpr: Expression,
countExpr: Expression, original: Expression)
extends SubstringIndex(strExpr, delimExpr, countExpr) with ColumnarExpression {
Expand Down Expand Up @@ -310,6 +380,10 @@ object ColumnarTernaryOperator {
new ColumnarStringLocate(src, arg1, arg2, sl)
case re: RegExpExtract =>
new ColumnarRegExpExtract(src, arg1, arg2, re)
case slpad: StringLPad =>
new ColumnarStringLPad(src, arg1, arg2, slpad)
case slpad: StringRPad =>
new ColumnarStringRPad(src, arg1, arg2, slpad)
case substrIndex: SubstringIndex =>
new ColumnarSubstringIndex(src, arg1, arg2, substrIndex)
case _: StringReplace =>
Expand Down