Skip to content

Commit

Permalink
move strpos, substr functions to datafusion_functions (#9849)
Browse files Browse the repository at this point in the history
* Fix to_timestamp benchmark

* Remove reference to simd and nightly build as simd is no longer an available feature in DataFusion and building with nightly may not be a good recommendation when getting started.

* Fixed missing trim() function.

* Create unicode module in datafusion/functions/src/unicode and unicode_expressions feature flag, move char_length function

* move Left, Lpad, Reverse, Right, Rpad functions to datafusion_functions

* move strpos, substr functions to datafusion_functions

* Cleanup tests
  • Loading branch information
Omega359 authored Mar 29, 2024
1 parent 3eeb108 commit c2879f5
Show file tree
Hide file tree
Showing 18 changed files with 598 additions and 452 deletions.
36 changes: 5 additions & 31 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,6 @@ pub enum BuiltinScalarFunction {
InitCap,
/// random
Random,
/// strpos
Strpos,
/// substr
Substr,
/// translate
Translate,
/// substr_index
Expand Down Expand Up @@ -211,8 +207,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::EndsWith => Volatility::Immutable,
BuiltinScalarFunction::InitCap => Volatility::Immutable,
BuiltinScalarFunction::Radians => Volatility::Immutable,
BuiltinScalarFunction::Strpos => Volatility::Immutable,
BuiltinScalarFunction::Substr => Volatility::Immutable,
BuiltinScalarFunction::Translate => Volatility::Immutable,
BuiltinScalarFunction::SubstrIndex => Volatility::Immutable,
BuiltinScalarFunction::FindInSet => Volatility::Immutable,
Expand Down Expand Up @@ -252,12 +246,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Pi => Ok(Float64),
BuiltinScalarFunction::Random => Ok(Float64),
BuiltinScalarFunction::EndsWith => Ok(Boolean),
BuiltinScalarFunction::Strpos => {
utf8_to_int_type(&input_expr_types[0], "strpos/instr/position")
}
BuiltinScalarFunction::Substr => {
utf8_to_str_type(&input_expr_types[0], "substr")
}
BuiltinScalarFunction::SubstrIndex => {
utf8_to_str_type(&input_expr_types[0], "substr_index")
}
Expand Down Expand Up @@ -341,24 +329,12 @@ impl BuiltinScalarFunction {
Signature::uniform(1, vec![Utf8, LargeUtf8], self.volatility())
}

BuiltinScalarFunction::EndsWith | BuiltinScalarFunction::Strpos => {
Signature::one_of(
vec![
Exact(vec![Utf8, Utf8]),
Exact(vec![Utf8, LargeUtf8]),
Exact(vec![LargeUtf8, Utf8]),
Exact(vec![LargeUtf8, LargeUtf8]),
],
self.volatility(),
)
}

BuiltinScalarFunction::Substr => Signature::one_of(
BuiltinScalarFunction::EndsWith => Signature::one_of(
vec![
Exact(vec![Utf8, Int64]),
Exact(vec![LargeUtf8, Int64]),
Exact(vec![Utf8, Int64, Int64]),
Exact(vec![LargeUtf8, Int64, Int64]),
Exact(vec![Utf8, Utf8]),
Exact(vec![Utf8, LargeUtf8]),
Exact(vec![LargeUtf8, Utf8]),
Exact(vec![LargeUtf8, LargeUtf8]),
],
self.volatility(),
),
Expand Down Expand Up @@ -537,8 +513,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ConcatWithSeparator => &["concat_ws"],
BuiltinScalarFunction::EndsWith => &["ends_with"],
BuiltinScalarFunction::InitCap => &["initcap"],
BuiltinScalarFunction::Strpos => &["strpos", "instr", "position"],
BuiltinScalarFunction::Substr => &["substr"],
BuiltinScalarFunction::Translate => &["translate"],
BuiltinScalarFunction::SubstrIndex => &["substr_index", "substring_index"],
BuiltinScalarFunction::FindInSet => &["find_in_set"],
Expand Down
6 changes: 0 additions & 6 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,9 +579,6 @@ scalar_expr!(Log, log, base x, "logarithm of a `x` for a particular `base`");

scalar_expr!(InitCap, initcap, string, "converts the first letter of each word in `string` in uppercase and the remaining characters in lowercase");
scalar_expr!(EndsWith, ends_with, string suffix, "whether the `string` ends with the `suffix`");
scalar_expr!(Strpos, strpos, string substring, "finds the position from where the `substring` matches the `string`");
scalar_expr!(Substr, substr, string position, "substring from the `position` to the end");
scalar_expr!(Substr, substring, string position length, "substring from the `position` with `length` characters");
scalar_expr!(Translate, translate, string from to, "replaces the characters in `from` with the counterpart in `to`");
nary_scalar_expr!(Coalesce, coalesce, "returns `coalesce(args...)`, which evaluates to the value of the first [Expr] which is not NULL");
//there is a func concat_ws before, so use concat_ws_expr as name.c
Expand Down Expand Up @@ -1015,9 +1012,6 @@ mod test {
test_scalar_expr!(Lcm, lcm, arg_1, arg_2);
test_scalar_expr!(InitCap, initcap, string);
test_scalar_expr!(EndsWith, ends_with, string, characters);
test_scalar_expr!(Strpos, strpos, string, substring);
test_scalar_expr!(Substr, substr, string, position);
test_scalar_expr!(Substr, substring, string, position, count);
test_scalar_expr!(Translate, translate, string, from, to);
test_scalar_expr!(SubstrIndex, substr_index, string, delimiter, count);
test_scalar_expr!(FindInSet, find_in_set, string, stringlist);
Expand Down
31 changes: 31 additions & 0 deletions datafusion/functions/src/unicode/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ mod lpad;
mod reverse;
mod right;
mod rpad;
mod strpos;
mod substr;

// create UDFs
make_udf_function!(
Expand All @@ -39,6 +41,8 @@ make_udf_function!(lpad::LPadFunc, LPAD, lpad);
make_udf_function!(right::RightFunc, RIGHT, right);
make_udf_function!(reverse::ReverseFunc, REVERSE, reverse);
make_udf_function!(rpad::RPadFunc, RPAD, rpad);
make_udf_function!(strpos::StrposFunc, STRPOS, strpos);
make_udf_function!(substr::SubstrFunc, SUBSTR, substr);

pub mod expr_fn {
use datafusion_expr::Expr;
Expand All @@ -53,6 +57,11 @@ pub mod expr_fn {
super::character_length().call(vec![string])
}

#[doc = "finds the position from where the `substring` matches the `string`"]
pub fn instr(string: Expr, substring: Expr) -> Expr {
strpos(string, substring)
}

#[doc = "the number of characters in the `string`"]
pub fn length(string: Expr) -> Expr {
character_length(string)
Expand All @@ -68,6 +77,11 @@ pub mod expr_fn {
super::lpad().call(args)
}

#[doc = "finds the position from where the `substring` matches the `string`"]
pub fn position(string: Expr, substring: Expr) -> Expr {
strpos(string, substring)
}

#[doc = "reverses the `string`"]
pub fn reverse(string: Expr) -> Expr {
super::reverse().call(vec![string])
Expand All @@ -82,6 +96,21 @@ pub mod expr_fn {
pub fn rpad(args: Vec<Expr>) -> Expr {
super::rpad().call(args)
}

#[doc = "finds the position from where the `substring` matches the `string`"]
pub fn strpos(string: Expr, substring: Expr) -> Expr {
super::strpos().call(vec![string, substring])
}

#[doc = "substring from the `position` to the end"]
pub fn substr(string: Expr, position: Expr) -> Expr {
super::substr().call(vec![string, position])
}

#[doc = "substring from the `position` with `length` characters"]
pub fn substring(string: Expr, position: Expr, length: Expr) -> Expr {
super::substr().call(vec![string, position, length])
}
}

/// Return a list of all functions in this package
Expand All @@ -93,5 +122,7 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
reverse(),
right(),
rpad(),
strpos(),
substr(),
]
}
121 changes: 121 additions & 0 deletions datafusion/functions/src/unicode/strpos.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::sync::Arc;

use arrow::array::{
ArrayRef, ArrowPrimitiveType, GenericStringArray, OffsetSizeTrait, PrimitiveArray,
};
use arrow::datatypes::{ArrowNativeType, DataType, Int32Type, Int64Type};

use datafusion_common::cast::as_generic_string_array;
use datafusion_common::{exec_err, Result};
use datafusion_expr::TypeSignature::Exact;
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};

use crate::utils::{make_scalar_function, utf8_to_int_type};

#[derive(Debug)]
pub(super) struct StrposFunc {
signature: Signature,
aliases: Vec<String>,
}

impl StrposFunc {
pub fn new() -> Self {
use DataType::*;
Self {
signature: Signature::one_of(
vec![
Exact(vec![Utf8, Utf8]),
Exact(vec![Utf8, LargeUtf8]),
Exact(vec![LargeUtf8, Utf8]),
Exact(vec![LargeUtf8, LargeUtf8]),
],
Volatility::Immutable,
),
aliases: vec![String::from("instr"), String::from("position")],
}
}
}

impl ScalarUDFImpl for StrposFunc {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"strpos"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
utf8_to_int_type(&arg_types[0], "strpos/instr/position")
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
match args[0].data_type() {
DataType::Utf8 => make_scalar_function(strpos::<Int32Type>, vec![])(args),
DataType::LargeUtf8 => {
make_scalar_function(strpos::<Int64Type>, vec![])(args)
}
other => exec_err!("Unsupported data type {other:?} for function strpos"),
}
}

fn aliases(&self) -> &[String] {
&self.aliases
}
}

/// Returns starting index of specified substring within string, or zero if it's not present. (Same as position(substring in string), but note the reversed argument order.)
/// strpos('high', 'ig') = 2
/// The implementation uses UTF-8 code points as characters
fn strpos<T: ArrowPrimitiveType>(args: &[ArrayRef]) -> Result<ArrayRef>
where
T::Native: OffsetSizeTrait,
{
let string_array: &GenericStringArray<T::Native> =
as_generic_string_array::<T::Native>(&args[0])?;

let substring_array: &GenericStringArray<T::Native> =
as_generic_string_array::<T::Native>(&args[1])?;

let result = string_array
.iter()
.zip(substring_array.iter())
.map(|(string, substring)| match (string, substring) {
(Some(string), Some(substring)) => {
// the find method returns the byte index of the substring
// Next, we count the number of the chars until that byte
T::Native::from_usize(
string
.find(substring)
.map(|x| string[..x].chars().count() + 1)
.unwrap_or(0),
)
}
_ => None,
})
.collect::<PrimitiveArray<T>>();

Ok(Arc::new(result) as ArrayRef)
}
Loading

0 comments on commit c2879f5

Please sign in to comment.