Skip to content

Commit

Permalink
Move levenshtein, uuid, overlay to datafusion-functions
Browse files Browse the repository at this point in the history
  • Loading branch information
Omega359 committed Mar 24, 2024
1 parent ee3ff9f commit c0758b3
Show file tree
Hide file tree
Showing 20 changed files with 458 additions and 334 deletions.
3 changes: 1 addition & 2 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,4 @@ tempfile = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
tonic = "0.11"
url = { workspace = true }
uuid = "1.2"
uuid = "1.7"
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ tempfile = { workspace = true }
tokio = { workspace = true }
tokio-util = { version = "0.7.4", features = ["io"], optional = true }
url = { workspace = true }
uuid = { version = "1.0", features = ["v4"] }
uuid = { version = "1.7", features = ["v4"] }
xz2 = { version = "0.1", optional = true, features = ["static"] }
zstd = { version = "0.13", optional = true, default-features = false }

Expand Down
35 changes: 0 additions & 35 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,6 @@ pub enum BuiltinScalarFunction {
Substr,
/// translate
Translate,
/// uuid
Uuid,
/// overlay
OverLay,
/// levenshtein
Levenshtein,
/// substr_index
SubstrIndex,
/// find_in_set
Expand Down Expand Up @@ -253,14 +247,11 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Strpos => Volatility::Immutable,
BuiltinScalarFunction::Substr => Volatility::Immutable,
BuiltinScalarFunction::Translate => Volatility::Immutable,
BuiltinScalarFunction::OverLay => Volatility::Immutable,
BuiltinScalarFunction::Levenshtein => Volatility::Immutable,
BuiltinScalarFunction::SubstrIndex => Volatility::Immutable,
BuiltinScalarFunction::FindInSet => Volatility::Immutable,

// Volatile builtin functions
BuiltinScalarFunction::Random => Volatility::Volatile,
BuiltinScalarFunction::Uuid => Volatility::Volatile,
}
}

Expand Down Expand Up @@ -302,7 +293,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Lpad => utf8_to_str_type(&input_expr_types[0], "lpad"),
BuiltinScalarFunction::Pi => Ok(Float64),
BuiltinScalarFunction::Random => Ok(Float64),
BuiltinScalarFunction::Uuid => Ok(Utf8),
BuiltinScalarFunction::Repeat => {
utf8_to_str_type(&input_expr_types[0], "repeat")
}
Expand Down Expand Up @@ -362,14 +352,6 @@ impl BuiltinScalarFunction {

BuiltinScalarFunction::Iszero => Ok(Boolean),

BuiltinScalarFunction::OverLay => {
utf8_to_str_type(&input_expr_types[0], "overlay")
}

BuiltinScalarFunction::Levenshtein => {
utf8_to_int_type(&input_expr_types[0], "levenshtein")
}

BuiltinScalarFunction::Atan
| BuiltinScalarFunction::Acosh
| BuiltinScalarFunction::Asinh
Expand Down Expand Up @@ -490,7 +472,6 @@ impl BuiltinScalarFunction {
}
BuiltinScalarFunction::Pi => Signature::exact(vec![], self.volatility()),
BuiltinScalarFunction::Random => Signature::exact(vec![], self.volatility()),
BuiltinScalarFunction::Uuid => Signature::exact(vec![], self.volatility()),
BuiltinScalarFunction::Power => Signature::one_of(
vec![Exact(vec![Int64, Int64]), Exact(vec![Float64, Float64])],
self.volatility(),
Expand Down Expand Up @@ -536,19 +517,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Gcd | BuiltinScalarFunction::Lcm => {
Signature::uniform(2, vec![Int64], self.volatility())
}
BuiltinScalarFunction::OverLay => Signature::one_of(
vec![
Exact(vec![Utf8, Utf8, Int64, Int64]),
Exact(vec![LargeUtf8, LargeUtf8, Int64, Int64]),
Exact(vec![Utf8, Utf8, Int64]),
Exact(vec![LargeUtf8, LargeUtf8, Int64]),
],
self.volatility(),
),
BuiltinScalarFunction::Levenshtein => Signature::one_of(
vec![Exact(vec![Utf8, Utf8]), Exact(vec![LargeUtf8, LargeUtf8])],
self.volatility(),
),
BuiltinScalarFunction::Atan
| BuiltinScalarFunction::Acosh
| BuiltinScalarFunction::Asinh
Expand Down Expand Up @@ -678,11 +646,8 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Strpos => &["strpos", "instr", "position"],
BuiltinScalarFunction::Substr => &["substr"],
BuiltinScalarFunction::Translate => &["translate"],
BuiltinScalarFunction::Uuid => &["uuid"],
BuiltinScalarFunction::Levenshtein => &["levenshtein"],
BuiltinScalarFunction::SubstrIndex => &["substr_index", "substring_index"],
BuiltinScalarFunction::FindInSet => &["find_in_set"],
BuiltinScalarFunction::OverLay => &["overlay"],
}
}
}
Expand Down
26 changes: 0 additions & 26 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,6 @@ scalar_expr!(Log10, log10, num, "base 10 logarithm of number");
scalar_expr!(Ln, ln, num, "natural logarithm (base e) of number");
scalar_expr!(Power, power, base exponent, "`base` raised to the power of `exponent`");
scalar_expr!(Atan2, atan2, y x, "inverse tangent of a division given in the argument");
scalar_expr!(Uuid, uuid, , "returns uuid v4 as a string value");
scalar_expr!(Log, log, base x, "logarithm of a `x` for a particular `base`");

// string functions
Expand Down Expand Up @@ -628,12 +627,6 @@ nary_scalar_expr!(
"concatenates several strings, placing a seperator between each one"
);
nary_scalar_expr!(Concat, concat_expr, "concatenates several strings");
nary_scalar_expr!(
OverLay,
overlay,
"replace the substring of string that starts at the start'th character and extends for count characters with new substring"
);

scalar_expr!(Nanvl, nanvl, x y, "returns x if x is not NaN otherwise returns y");
scalar_expr!(
Iszero,
Expand All @@ -642,7 +635,6 @@ scalar_expr!(
"returns true if a given number is +0.0 or -0.0 otherwise returns false"
);

scalar_expr!(Levenshtein, levenshtein, string1 string2, "Returns the Levenshtein distance between the two given strings");
scalar_expr!(SubstrIndex, substr_index, string delimiter count, "Returns the substring from str before count occurrences of the delimiter");
scalar_expr!(FindInSet, find_in_set, str strlist, "Returns a value in the range of 1 to N if the string str is in the string list strlist consisting of N substrings");

Expand Down Expand Up @@ -1076,25 +1068,7 @@ mod test {
test_scalar_expr!(Substr, substr, string, position);
test_scalar_expr!(Substr, substring, string, position, count);
test_scalar_expr!(Translate, translate, string, from, to);
test_nary_scalar_expr!(OverLay, overlay, string, characters, position, len);
test_nary_scalar_expr!(OverLay, overlay, string, characters, position);
test_scalar_expr!(Levenshtein, levenshtein, string1, string2);
test_scalar_expr!(SubstrIndex, substr_index, string, delimiter, count);
test_scalar_expr!(FindInSet, find_in_set, string, stringlist);
}

#[test]
fn uuid_function_definitions() {
if let Expr::ScalarFunction(ScalarFunction {
func_def: ScalarFunctionDefinition::BuiltIn(fun),
args,
}) = uuid()
{
let name = BuiltinScalarFunction::Uuid;
assert_eq!(name, fun);
assert_eq!(0, args.len());
} else {
unreachable!();
}
}
}
2 changes: 2 additions & 0 deletions datafusion/functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ log = { workspace = true }
md-5 = { version = "^0.10.0", optional = true }
regex = { version = "1.8", optional = true }
sha2 = { version = "^0.10.1", optional = true }
uuid = { version = "1.7", features = ["v4"] }

[dev-dependencies]
criterion = "0.5"
rand = { workspace = true }
Expand Down
146 changes: 146 additions & 0 deletions datafusion/functions/src/string/levenshtein.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::sync::Arc;

use arrow::array::{ArrayRef, Int32Array, Int64Array, OffsetSizeTrait};
use arrow::datatypes::DataType;

use datafusion_common::cast::as_generic_string_array;
use datafusion_common::utils::datafusion_strsim;
use datafusion_common::{exec_err, Result};
use datafusion_expr::ColumnarValue;
use datafusion_expr::TypeSignature::*;
use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};

use crate::string::common::{make_scalar_function, utf8_to_int_type};

#[derive(Debug)]
pub(super) struct LevenshteinFunc {
signature: Signature,
}

impl LevenshteinFunc {
pub fn new() -> Self {
use DataType::*;
Self {
signature: Signature::one_of(
vec![Exact(vec![Utf8, Utf8]), Exact(vec![LargeUtf8, LargeUtf8])],
Volatility::Immutable,
),
}
}
}

impl ScalarUDFImpl for LevenshteinFunc {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"levenshtein"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
utf8_to_int_type(&arg_types[0], "levenshtein")
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
match args[0].data_type() {
DataType::Utf8 => make_scalar_function(levenshtein::<i32>, vec![])(args),
DataType::LargeUtf8 => make_scalar_function(levenshtein::<i64>, vec![])(args),
other => {
exec_err!("Unsupported data type {other:?} for function levenshtein")
}
}
}
}

///Returns the Levenshtein distance between the two given strings.
/// LEVENSHTEIN('kitten', 'sitting') = 3
pub fn levenshtein<T: OffsetSizeTrait>(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.len() != 2 {
return exec_err!(
"levenshtein function requires two arguments, got {}",
args.len()
);
}
let str1_array = as_generic_string_array::<T>(&args[0])?;
let str2_array = as_generic_string_array::<T>(&args[1])?;
match args[0].data_type() {
DataType::Utf8 => {
let result = str1_array
.iter()
.zip(str2_array.iter())
.map(|(string1, string2)| match (string1, string2) {
(Some(string1), Some(string2)) => {
Some(datafusion_strsim::levenshtein(string1, string2) as i32)
}
_ => None,
})
.collect::<Int32Array>();
Ok(Arc::new(result) as ArrayRef)
}
DataType::LargeUtf8 => {
let result = str1_array
.iter()
.zip(str2_array.iter())
.map(|(string1, string2)| match (string1, string2) {
(Some(string1), Some(string2)) => {
Some(datafusion_strsim::levenshtein(string1, string2) as i64)
}
_ => None,
})
.collect::<Int64Array>();
Ok(Arc::new(result) as ArrayRef)
}
other => {
exec_err!(
"levenshtein was called with {other} datatype arguments. It requires Utf8 or LargeUtf8."
)
}
}
}

#[cfg(test)]
mod tests {
use arrow::array::{Int32Array, StringArray};

use datafusion_common::cast::as_int32_array;

use super::*;

#[test]
fn to_levenshtein() -> Result<()> {
let string1_array =
Arc::new(StringArray::from(vec!["123", "abc", "xyz", "kitten"]));
let string2_array =
Arc::new(StringArray::from(vec!["321", "def", "zyx", "sitting"]));
let res = levenshtein::<i32>(&[string1_array, string2_array]).unwrap();
let result =
as_int32_array(&res).expect("failed to initialized function levenshtein");
let expected = Int32Array::from(vec![2, 3, 2, 3]);
assert_eq!(&expected, result);

Ok(())
}
}
Loading

0 comments on commit c0758b3

Please sign in to comment.