From 81b0a011705b17a09f494f550a5382b0c3414597 Mon Sep 17 00:00:00 2001
From: Jay Zhan <jayzhan211@gmail.com>
Date: Fri, 15 Mar 2024 22:02:26 +0800
Subject: [PATCH] Port ArrayElem/Slice/PopFront/Back into `functions-array`
 (#9615)

* backup

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* array-elem

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* pop front and pop back

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* add roundtrip test

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

---------

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
---
 datafusion/expr/src/built_in_function.rs      |  45 --
 datafusion/expr/src/expr_fn.rs                |  28 -
 datafusion/functions-array/src/extract.rs     | 659 ++++++++++++++++++
 datafusion/functions-array/src/lib.rs         |   9 +
 datafusion/functions-array/src/rewrite.rs     |  25 +-
 .../physical-expr/src/array_expressions.rs    | 403 +----------
 datafusion/physical-expr/src/functions.rs     |  12 -
 datafusion/proto/proto/datafusion.proto       |   8 +-
 datafusion/proto/src/generated/pbjson.rs      |  12 -
 datafusion/proto/src/generated/prost.rs       |  16 +-
 .../proto/src/logical_plan/from_proto.rs      |  30 +-
 datafusion/proto/src/logical_plan/to_proto.rs |   4 -
 .../tests/cases/roundtrip_logical_plan.rs     |   9 +
 13 files changed, 697 insertions(+), 563 deletions(-)
 create mode 100644 datafusion/functions-array/src/extract.rs

diff --git a/datafusion/expr/src/built_in_function.rs b/datafusion/expr/src/built_in_function.rs
index ae2b46378c26..bc1bcd9f6861 100644
--- a/datafusion/expr/src/built_in_function.rs
+++ b/datafusion/expr/src/built_in_function.rs
@@ -103,12 +103,6 @@ pub enum BuiltinScalarFunction {
     Cot,
 
     // array functions
-    /// array_pop_front
-    ArrayPopFront,
-    /// array_pop_back
-    ArrayPopBack,
-    /// array_element
-    ArrayElement,
     /// array_position
     ArrayPosition,
     /// array_positions
@@ -127,8 +121,6 @@ pub enum BuiltinScalarFunction {
     ArrayReplaceAll,
     /// array_reverse
     ArrayReverse,
-    /// array_slice
-    ArraySlice,
     /// array_intersect
     ArrayIntersect,
     /// array_union
@@ -288,10 +280,7 @@ impl BuiltinScalarFunction {
             BuiltinScalarFunction::Cbrt => Volatility::Immutable,
             BuiltinScalarFunction::Cot => Volatility::Immutable,
             BuiltinScalarFunction::Trunc => Volatility::Immutable,
-            BuiltinScalarFunction::ArrayElement => Volatility::Immutable,
             BuiltinScalarFunction::ArrayExcept => Volatility::Immutable,
-            BuiltinScalarFunction::ArrayPopFront => Volatility::Immutable,
-            BuiltinScalarFunction::ArrayPopBack => Volatility::Immutable,
             BuiltinScalarFunction::ArrayPosition => Volatility::Immutable,
             BuiltinScalarFunction::ArrayPositions => Volatility::Immutable,
             BuiltinScalarFunction::ArrayRemove => Volatility::Immutable,
@@ -301,7 +290,6 @@ impl BuiltinScalarFunction {
             BuiltinScalarFunction::ArrayReplaceN => Volatility::Immutable,
             BuiltinScalarFunction::ArrayReplaceAll => Volatility::Immutable,
             BuiltinScalarFunction::ArrayReverse => Volatility::Immutable,
-            BuiltinScalarFunction::ArraySlice => Volatility::Immutable,
             BuiltinScalarFunction::ArrayIntersect => Volatility::Immutable,
             BuiltinScalarFunction::ArrayUnion => Volatility::Immutable,
             BuiltinScalarFunction::Ascii => Volatility::Immutable,
@@ -361,16 +349,6 @@ impl BuiltinScalarFunction {
         // the return type of the built in function.
         // Some built-in functions' return type depends on the incoming type.
         match self {
-            BuiltinScalarFunction::ArrayElement => match &input_expr_types[0] {
-                List(field)
-                | LargeList(field)
-                | FixedSizeList(field, _) => Ok(field.data_type().clone()),
-                _ => plan_err!(
-                    "The {self} function can only accept List, LargeList or FixedSizeList as the first argument"
-                ),
-            },
-            BuiltinScalarFunction::ArrayPopFront => Ok(input_expr_types[0].clone()),
-            BuiltinScalarFunction::ArrayPopBack => Ok(input_expr_types[0].clone()),
             BuiltinScalarFunction::ArrayPosition => Ok(UInt64),
             BuiltinScalarFunction::ArrayPositions => {
                 Ok(List(Arc::new(Field::new("item", UInt64, true))))
@@ -382,7 +360,6 @@ impl BuiltinScalarFunction {
             BuiltinScalarFunction::ArrayReplaceN => Ok(input_expr_types[0].clone()),
             BuiltinScalarFunction::ArrayReplaceAll => Ok(input_expr_types[0].clone()),
             BuiltinScalarFunction::ArrayReverse => Ok(input_expr_types[0].clone()),
-            BuiltinScalarFunction::ArraySlice => Ok(input_expr_types[0].clone()),
             BuiltinScalarFunction::ArrayIntersect => {
                 match (input_expr_types[0].clone(), input_expr_types[1].clone()) {
                     (DataType::Null, DataType::Null) | (DataType::Null, _) => {
@@ -500,8 +477,6 @@ impl BuiltinScalarFunction {
                 _ => Ok(Float64),
             },
 
-
-
             BuiltinScalarFunction::Atan2 => match &input_expr_types[0] {
                 Float32 => Ok(Float32),
                 _ => Ok(Float64),
@@ -563,11 +538,6 @@ impl BuiltinScalarFunction {
 
         // for now, the list is small, as we do not have many built-in functions.
         match self {
-            BuiltinScalarFunction::ArrayPopFront => Signature::array(self.volatility()),
-            BuiltinScalarFunction::ArrayPopBack => Signature::array(self.volatility()),
-            BuiltinScalarFunction::ArrayElement => {
-                Signature::array_and_index(self.volatility())
-            }
             BuiltinScalarFunction::ArrayExcept => Signature::any(2, self.volatility()),
             BuiltinScalarFunction::ArrayPosition => {
                 Signature::array_and_element_and_optional_index(self.volatility())
@@ -588,10 +558,6 @@ impl BuiltinScalarFunction {
                 Signature::any(3, self.volatility())
             }
             BuiltinScalarFunction::ArrayReverse => Signature::any(1, self.volatility()),
-            BuiltinScalarFunction::ArraySlice => {
-                Signature::variadic_any(self.volatility())
-            }
-
             BuiltinScalarFunction::ArrayIntersect => Signature::any(2, self.volatility()),
             BuiltinScalarFunction::ArrayUnion => Signature::any(2, self.volatility()),
 
@@ -894,17 +860,7 @@ impl BuiltinScalarFunction {
             BuiltinScalarFunction::FindInSet => &["find_in_set"],
 
             // hashing functions
-            BuiltinScalarFunction::ArrayElement => &[
-                "array_element",
-                "array_extract",
-                "list_element",
-                "list_extract",
-            ],
             BuiltinScalarFunction::ArrayExcept => &["array_except", "list_except"],
-            BuiltinScalarFunction::ArrayPopFront => {
-                &["array_pop_front", "list_pop_front"]
-            }
-            BuiltinScalarFunction::ArrayPopBack => &["array_pop_back", "list_pop_back"],
             BuiltinScalarFunction::ArrayPosition => &[
                 "array_position",
                 "list_position",
@@ -927,7 +883,6 @@ impl BuiltinScalarFunction {
                 &["array_replace_all", "list_replace_all"]
             }
             BuiltinScalarFunction::ArrayReverse => &["array_reverse", "list_reverse"],
-            BuiltinScalarFunction::ArraySlice => &["array_slice", "list_slice"],
             BuiltinScalarFunction::ArrayUnion => &["array_union", "list_union"],
             BuiltinScalarFunction::ArrayIntersect => {
                 &["array_intersect", "list_intersect"]
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index 9e09ddb0670d..538a8a75ce5e 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -584,26 +584,6 @@ scalar_expr!(
 scalar_expr!(Uuid, uuid, , "returns uuid v4 as a string value");
 scalar_expr!(Log, log, base x, "logarithm of a `x` for a particular `base`");
 
-scalar_expr!(
-    ArrayPopBack,
-    array_pop_back,
-    array,
-    "returns the array without the last element."
-);
-
-scalar_expr!(
-    ArrayPopFront,
-    array_pop_front,
-    array,
-    "returns the array without the first element."
-);
-
-scalar_expr!(
-    ArrayElement,
-    array_element,
-    array element,
-    "extracts the element with the index n from the array."
-);
 scalar_expr!(
     ArrayExcept,
     array_except,
@@ -664,12 +644,6 @@ scalar_expr!(
     array,
     "reverses the order of elements in the array."
 );
-scalar_expr!(
-    ArraySlice,
-    array_slice,
-    array begin end stride,
-    "returns a slice of the array."
-);
 scalar_expr!(ArrayUnion, array_union, array1 array2, "returns an array of the elements in the union of array1 and array2 without duplicates.");
 
 scalar_expr!(
@@ -1222,8 +1196,6 @@ mod test {
         test_scalar_expr!(Trim, trim, string);
         test_scalar_expr!(Upper, upper, string);
 
-        test_scalar_expr!(ArrayPopFront, array_pop_front, array);
-        test_scalar_expr!(ArrayPopBack, array_pop_back, array);
         test_scalar_expr!(ArrayPosition, array_position, array, element, index);
         test_scalar_expr!(ArrayPositions, array_positions, array, element);
         test_scalar_expr!(ArrayRemove, array_remove, array, element);
diff --git a/datafusion/functions-array/src/extract.rs b/datafusion/functions-array/src/extract.rs
new file mode 100644
index 000000000000..86eeaea3c9b4
--- /dev/null
+++ b/datafusion/functions-array/src/extract.rs
@@ -0,0 +1,659 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Array Element and Array Slice
+
+use arrow::array::Array;
+use arrow::array::ArrayRef;
+use arrow::array::ArrowNativeTypeOp;
+use arrow::array::Capacities;
+use arrow::array::GenericListArray;
+use arrow::array::Int64Array;
+use arrow::array::MutableArrayData;
+use arrow::array::OffsetSizeTrait;
+use arrow::buffer::OffsetBuffer;
+use arrow::datatypes::DataType;
+use arrow_schema::Field;
+use datafusion_common::cast::as_int64_array;
+use datafusion_common::cast::as_large_list_array;
+use datafusion_common::cast::as_list_array;
+use datafusion_common::exec_err;
+use datafusion_common::internal_datafusion_err;
+use datafusion_common::plan_err;
+use datafusion_common::DataFusionError;
+use datafusion_common::Result;
+use datafusion_expr::expr::ScalarFunction;
+use datafusion_expr::Expr;
+use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
+use std::any::Any;
+use std::sync::Arc;
+
+use crate::utils::make_scalar_function;
+
+// Create static instances of ScalarUDFs for each function
+make_udf_function!(
+    ArrayElement,
+    array_element,
+    array element,
+    "extracts the element with the index n from the array.",
+    array_element_udf
+);
+
+make_udf_function!(
+    ArraySlice,
+    array_slice,
+    array begin end stride,
+    "returns a slice of the array.",
+    array_slice_udf
+);
+
+make_udf_function!(
+    ArrayPopFront,
+    array_pop_front,
+    array,
+    "returns the array without the first element.",
+    array_pop_front_udf
+);
+
+make_udf_function!(
+    ArrayPopBack,
+    array_pop_back,
+    array,
+    "returns the array without the last element.",
+    array_pop_back_udf
+);
+
+#[derive(Debug)]
+pub(super) struct ArrayElement {
+    signature: Signature,
+    aliases: Vec<String>,
+}
+
+impl ArrayElement {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::array_and_index(Volatility::Immutable),
+            aliases: vec![
+                String::from("array_element"),
+                String::from("array_extract"),
+                String::from("list_element"),
+                String::from("list_extract"),
+            ],
+        }
+    }
+}
+
+impl ScalarUDFImpl for ArrayElement {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+    fn name(&self) -> &str {
+        "array_element"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        use DataType::*;
+        match &arg_types[0] {
+            List(field)
+            | LargeList(field)
+            | FixedSizeList(field, _) => Ok(field.data_type().clone()),
+            _ => plan_err!(
+                "ArrayElement can only accept List, LargeList or FixedSizeList as the first argument"
+            ),
+        }
+    }
+
+    fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+        make_scalar_function(array_element_inner)(args)
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.aliases
+    }
+}
+
+/// array_element SQL function
+///
+/// There are two arguments for array_element, the first one is the array, the second one is the 1-indexed index.
+/// `array_element(array, index)`
+///
+/// For example:
+/// > array_element(\[1, 2, 3], 2) -> 2
+fn array_element_inner(args: &[ArrayRef]) -> datafusion_common::Result<ArrayRef> {
+    if args.len() != 2 {
+        return exec_err!("array_element needs two arguments");
+    }
+
+    match &args[0].data_type() {
+        DataType::List(_) => {
+            let array = as_list_array(&args[0])?;
+            let indexes = as_int64_array(&args[1])?;
+            general_array_element::<i32>(array, indexes)
+        }
+        DataType::LargeList(_) => {
+            let array = as_large_list_array(&args[0])?;
+            let indexes = as_int64_array(&args[1])?;
+            general_array_element::<i64>(array, indexes)
+        }
+        _ => exec_err!(
+            "array_element does not support type: {:?}",
+            args[0].data_type()
+        ),
+    }
+}
+
+fn general_array_element<O: OffsetSizeTrait>(
+    array: &GenericListArray<O>,
+    indexes: &Int64Array,
+) -> datafusion_common::Result<ArrayRef>
+where
+    i64: TryInto<O>,
+{
+    let values = array.values();
+    let original_data = values.to_data();
+    let capacity = Capacities::Array(original_data.len());
+
+    // use_nulls: true, we don't construct List for array_element, so we need explicit nulls.
+    let mut mutable =
+        MutableArrayData::with_capacities(vec![&original_data], true, capacity);
+
+    fn adjusted_array_index<O: OffsetSizeTrait>(
+        index: i64,
+        len: O,
+    ) -> datafusion_common::Result<Option<O>>
+    where
+        i64: TryInto<O>,
+    {
+        let index: O = index.try_into().map_err(|_| {
+            DataFusionError::Execution(format!(
+                "array_element got invalid index: {}",
+                index
+            ))
+        })?;
+        // 0 ~ len - 1
+        let adjusted_zero_index = if index < O::usize_as(0) {
+            index + len
+        } else {
+            index - O::usize_as(1)
+        };
+
+        if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
+            Ok(Some(adjusted_zero_index))
+        } else {
+            // Out of bounds
+            Ok(None)
+        }
+    }
+
+    for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
+        let start = offset_window[0];
+        let end = offset_window[1];
+        let len = end - start;
+
+        // array is null
+        if len == O::usize_as(0) {
+            mutable.extend_nulls(1);
+            continue;
+        }
+
+        let index = adjusted_array_index::<O>(indexes.value(row_index), len)?;
+
+        if let Some(index) = index {
+            let start = start.as_usize() + index.as_usize();
+            mutable.extend(0, start, start + 1_usize);
+        } else {
+            // Index out of bounds
+            mutable.extend_nulls(1);
+        }
+    }
+
+    let data = mutable.freeze();
+    Ok(arrow::array::make_array(data))
+}
+
+#[derive(Debug)]
+pub(super) struct ArraySlice {
+    signature: Signature,
+    aliases: Vec<String>,
+}
+
+impl ArraySlice {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::variadic_any(Volatility::Immutable),
+            aliases: vec![String::from("array_slice"), String::from("list_slice")],
+        }
+    }
+}
+
+impl ScalarUDFImpl for ArraySlice {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+    fn name(&self) -> &str {
+        "array_slice"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        Ok(arg_types[0].clone())
+    }
+
+    fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+        make_scalar_function(array_slice_inner)(args)
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.aliases
+    }
+}
+
+/// array_slice SQL function
+///
+/// We follow the behavior of array_slice in DuckDB
+/// Note that array_slice is 1-indexed. And there are two additional arguments `from` and `to` in array_slice.
+///
+/// > array_slice(array, from, to)
+///
+/// Positive index is treated as the index from the start of the array. If the
+/// `from` index is smaller than 1, it is treated as 1. If the `to` index is larger than the
+/// length of the array, it is treated as the length of the array.
+///
+/// Negative index is treated as the index from the end of the array. If the index
+/// is larger than the length of the array, it is NOT VALID, either in `from` or `to`.
+/// The `to` index is exclusive like python slice syntax.
+///
+/// See test cases in `array.slt` for more details.
+fn array_slice_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
+    let args_len = args.len();
+    if args_len != 3 && args_len != 4 {
+        return exec_err!("array_slice needs three or four arguments");
+    }
+
+    let stride = if args_len == 4 {
+        Some(as_int64_array(&args[3])?)
+    } else {
+        None
+    };
+
+    let from_array = as_int64_array(&args[1])?;
+    let to_array = as_int64_array(&args[2])?;
+
+    let array_data_type = args[0].data_type();
+    match array_data_type {
+        DataType::List(_) => {
+            let array = as_list_array(&args[0])?;
+            general_array_slice::<i32>(array, from_array, to_array, stride)
+        }
+        DataType::LargeList(_) => {
+            let array = as_large_list_array(&args[0])?;
+            let from_array = as_int64_array(&args[1])?;
+            let to_array = as_int64_array(&args[2])?;
+            general_array_slice::<i64>(array, from_array, to_array, stride)
+        }
+        _ => exec_err!("array_slice does not support type: {:?}", array_data_type),
+    }
+}
+
+fn general_array_slice<O: OffsetSizeTrait>(
+    array: &GenericListArray<O>,
+    from_array: &Int64Array,
+    to_array: &Int64Array,
+    stride: Option<&Int64Array>,
+) -> Result<ArrayRef>
+where
+    i64: TryInto<O>,
+{
+    let values = array.values();
+    let original_data = values.to_data();
+    let capacity = Capacities::Array(original_data.len());
+
+    // use_nulls: false, we don't need nulls but empty array for array_slice, so we don't need explicit nulls but adjust offset to indicate nulls.
+    let mut mutable =
+        MutableArrayData::with_capacities(vec![&original_data], false, capacity);
+
+    // We have the slice syntax compatible with DuckDB v0.8.1.
+    // The rule `adjusted_from_index` and `adjusted_to_index` follows the rule of array_slice in duckdb.
+
+    fn adjusted_from_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
+    where
+        i64: TryInto<O>,
+    {
+        // 0 ~ len - 1
+        let adjusted_zero_index = if index < 0 {
+            if let Ok(index) = index.try_into() {
+                index + len
+            } else {
+                return exec_err!("array_slice got invalid index: {}", index);
+            }
+        } else {
+            // array_slice(arr, 1, to) is the same as array_slice(arr, 0, to)
+            if let Ok(index) = index.try_into() {
+                std::cmp::max(index - O::usize_as(1), O::usize_as(0))
+            } else {
+                return exec_err!("array_slice got invalid index: {}", index);
+            }
+        };
+
+        if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
+            Ok(Some(adjusted_zero_index))
+        } else {
+            // Out of bounds
+            Ok(None)
+        }
+    }
+
+    fn adjusted_to_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
+    where
+        i64: TryInto<O>,
+    {
+        // 0 ~ len - 1
+        let adjusted_zero_index = if index < 0 {
+            // array_slice in duckdb with negative to_index is python-like, so index itself is exclusive
+            if let Ok(index) = index.try_into() {
+                index + len
+            } else {
+                return exec_err!("array_slice got invalid index: {}", index);
+            }
+        } else {
+            // array_slice(arr, from, len + 1) is the same as array_slice(arr, from, len)
+            if let Ok(index) = index.try_into() {
+                std::cmp::min(index - O::usize_as(1), len - O::usize_as(1))
+            } else {
+                return exec_err!("array_slice got invalid index: {}", index);
+            }
+        };
+
+        if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
+            Ok(Some(adjusted_zero_index))
+        } else {
+            // Out of bounds
+            Ok(None)
+        }
+    }
+
+    let mut offsets = vec![O::usize_as(0)];
+
+    for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
+        let start = offset_window[0];
+        let end = offset_window[1];
+        let len = end - start;
+
+        // len 0 indicate array is null, return empty array in this row.
+        if len == O::usize_as(0) {
+            offsets.push(offsets[row_index]);
+            continue;
+        }
+
+        // If index is null, we consider it as the minimum / maximum index of the array.
+        let from_index = if from_array.is_null(row_index) {
+            Some(O::usize_as(0))
+        } else {
+            adjusted_from_index::<O>(from_array.value(row_index), len)?
+        };
+
+        let to_index = if to_array.is_null(row_index) {
+            Some(len - O::usize_as(1))
+        } else {
+            adjusted_to_index::<O>(to_array.value(row_index), len)?
+        };
+
+        if let (Some(from), Some(to)) = (from_index, to_index) {
+            let stride = stride.map(|s| s.value(row_index));
+            // array_slice with stride in duckdb, return empty array if stride is not supported and from > to.
+            if stride.is_none() && from > to {
+                // return empty array
+                offsets.push(offsets[row_index]);
+                continue;
+            }
+            let stride = stride.unwrap_or(1);
+            if stride.is_zero() {
+                return exec_err!(
+                    "array_slice got invalid stride: {:?}, it cannot be 0",
+                    stride
+                );
+            } else if from <= to && stride.is_negative() {
+                // return empty array
+                offsets.push(offsets[row_index]);
+                continue;
+            }
+
+            let stride: O = stride.try_into().map_err(|_| {
+                internal_datafusion_err!("array_slice got invalid stride: {}", stride)
+            })?;
+
+            if from <= to {
+                assert!(start + to <= end);
+                if stride.eq(&O::one()) {
+                    // stride is default to 1
+                    mutable.extend(
+                        0,
+                        (start + from).to_usize().unwrap(),
+                        (start + to + O::usize_as(1)).to_usize().unwrap(),
+                    );
+                    offsets.push(offsets[row_index] + (to - from + O::usize_as(1)));
+                    continue;
+                }
+                let mut index = start + from;
+                let mut cnt = 0;
+                while index <= start + to {
+                    mutable.extend(
+                        0,
+                        index.to_usize().unwrap(),
+                        index.to_usize().unwrap() + 1,
+                    );
+                    index += stride;
+                    cnt += 1;
+                }
+                offsets.push(offsets[row_index] + O::usize_as(cnt));
+            } else {
+                let mut index = start + from;
+                let mut cnt = 0;
+                while index >= start + to {
+                    mutable.extend(
+                        0,
+                        index.to_usize().unwrap(),
+                        index.to_usize().unwrap() + 1,
+                    );
+                    index += stride;
+                    cnt += 1;
+                }
+                // invalid range, return empty array
+                offsets.push(offsets[row_index] + O::usize_as(cnt));
+            }
+        } else {
+            // invalid range, return empty array
+            offsets.push(offsets[row_index]);
+        }
+    }
+
+    let data = mutable.freeze();
+
+    Ok(Arc::new(GenericListArray::<O>::try_new(
+        Arc::new(Field::new("item", array.value_type(), true)),
+        OffsetBuffer::<O>::new(offsets.into()),
+        arrow_array::make_array(data),
+        None,
+    )?))
+}
+
+#[derive(Debug)]
+pub(super) struct ArrayPopFront {
+    signature: Signature,
+    aliases: Vec<String>,
+}
+
+impl ArrayPopFront {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::array(Volatility::Immutable),
+            aliases: vec![
+                String::from("array_pop_front"),
+                String::from("list_pop_front"),
+            ],
+        }
+    }
+}
+
+impl ScalarUDFImpl for ArrayPopFront {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+    fn name(&self) -> &str {
+        "array_pop_front"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        Ok(arg_types[0].clone())
+    }
+
+    fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+        make_scalar_function(array_pop_front_inner)(args)
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.aliases
+    }
+}
+
+/// array_pop_front SQL function
+fn array_pop_front_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
+    let array_data_type = args[0].data_type();
+    match array_data_type {
+        DataType::List(_) => {
+            let array = as_list_array(&args[0])?;
+            general_pop_front_list::<i32>(array)
+        }
+        DataType::LargeList(_) => {
+            let array = as_large_list_array(&args[0])?;
+            general_pop_front_list::<i64>(array)
+        }
+        _ => exec_err!(
+            "array_pop_front does not support type: {:?}",
+            array_data_type
+        ),
+    }
+}
+
+fn general_pop_front_list<O: OffsetSizeTrait>(
+    array: &GenericListArray<O>,
+) -> Result<ArrayRef>
+where
+    i64: TryInto<O>,
+{
+    let from_array = Int64Array::from(vec![2; array.len()]);
+    let to_array = Int64Array::from(
+        array
+            .iter()
+            .map(|arr| arr.map_or(0, |arr| arr.len() as i64))
+            .collect::<Vec<i64>>(),
+    );
+    general_array_slice::<O>(array, &from_array, &to_array, None)
+}
+
+#[derive(Debug)]
+pub(super) struct ArrayPopBack {
+    signature: Signature,
+    aliases: Vec<String>,
+}
+
+impl ArrayPopBack {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::array(Volatility::Immutable),
+            aliases: vec![
+                String::from("array_pop_back"),
+                String::from("list_pop_back"),
+            ],
+        }
+    }
+}
+
+impl ScalarUDFImpl for ArrayPopBack {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+    fn name(&self) -> &str {
+        "array_pop_back"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        Ok(arg_types[0].clone())
+    }
+
+    fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+        make_scalar_function(array_pop_back_inner)(args)
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.aliases
+    }
+}
+
+/// array_pop_back SQL function
+fn array_pop_back_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
+    if args.len() != 1 {
+        return exec_err!("array_pop_back needs one argument");
+    }
+
+    let array_data_type = args[0].data_type();
+    match array_data_type {
+        DataType::List(_) => {
+            let array = as_list_array(&args[0])?;
+            general_pop_back_list::<i32>(array)
+        }
+        DataType::LargeList(_) => {
+            let array = as_large_list_array(&args[0])?;
+            general_pop_back_list::<i64>(array)
+        }
+        _ => exec_err!(
+            "array_pop_back does not support type: {:?}",
+            array_data_type
+        ),
+    }
+}
+
+fn general_pop_back_list<O: OffsetSizeTrait>(
+    array: &GenericListArray<O>,
+) -> Result<ArrayRef>
+where
+    i64: TryInto<O>,
+{
+    let from_array = Int64Array::from(vec![1; array.len()]);
+    let to_array = Int64Array::from(
+        array
+            .iter()
+            .map(|arr| arr.map_or(0, |arr| arr.len() as i64 - 1))
+            .collect::<Vec<i64>>(),
+    );
+    general_array_slice::<O>(array, &from_array, &to_array, None)
+}
diff --git a/datafusion/functions-array/src/lib.rs b/datafusion/functions-array/src/lib.rs
index 944cc820ac9f..e982290f66c0 100644
--- a/datafusion/functions-array/src/lib.rs
+++ b/datafusion/functions-array/src/lib.rs
@@ -30,6 +30,7 @@ pub mod macros;
 
 mod array_has;
 mod concat;
+mod extract;
 mod kernels;
 mod make_array;
 mod rewrite;
@@ -50,6 +51,10 @@ pub mod expr_fn {
     pub use super::concat::array_append;
     pub use super::concat::array_concat;
     pub use super::concat::array_prepend;
+    pub use super::extract::array_element;
+    pub use super::extract::array_pop_back;
+    pub use super::extract::array_pop_front;
+    pub use super::extract::array_slice;
     pub use super::make_array::make_array;
     pub use super::udf::array_dims;
     pub use super::udf::array_distinct;
@@ -80,6 +85,10 @@ pub fn register_all(registry: &mut dyn FunctionRegistry) -> Result<()> {
         concat::array_append_udf(),
         concat::array_prepend_udf(),
         concat::array_concat_udf(),
+        extract::array_element_udf(),
+        extract::array_pop_back_udf(),
+        extract::array_pop_front_udf(),
+        extract::array_slice_udf(),
         make_array::make_array_udf(),
         array_has::array_has_udf(),
         array_has::array_has_all_udf(),
diff --git a/datafusion/functions-array/src/rewrite.rs b/datafusion/functions-array/src/rewrite.rs
index a9e79f54a52d..6a91e9078232 100644
--- a/datafusion/functions-array/src/rewrite.rs
+++ b/datafusion/functions-array/src/rewrite.rs
@@ -17,17 +17,16 @@
 
 //! Rewrites for using Array Functions
 
-use crate::concat::{array_append, array_concat};
-use crate::expr_fn::{array_has_all, array_prepend};
+use crate::array_has::array_has_all;
+use crate::concat::{array_append, array_concat, array_prepend};
+use crate::extract::{array_element, array_slice};
 use datafusion_common::config::ConfigOptions;
 use datafusion_common::tree_node::Transformed;
 use datafusion_common::utils::list_ndims;
 use datafusion_common::{Column, DFSchema};
 use datafusion_expr::expr::ScalarFunction;
 use datafusion_expr::expr_rewriter::FunctionRewrite;
-use datafusion_expr::{
-    BinaryExpr, BuiltinScalarFunction, Expr, GetFieldAccess, GetIndexedField, Operator,
-};
+use datafusion_expr::{BinaryExpr, Expr, GetFieldAccess, GetIndexedField, Operator};
 use datafusion_functions::expr_fn::get_field;
 
 /// Rewrites expressions into function calls to array functions
@@ -161,13 +160,7 @@ impl FunctionRewrite for ArrayFunctionRewriter {
             Expr::GetIndexedField(GetIndexedField {
                 expr,
                 field: GetFieldAccess::ListIndex { key },
-            }) => {
-                let args = vec![*expr, *key];
-                Transformed::yes(Expr::ScalarFunction(ScalarFunction::new(
-                    BuiltinScalarFunction::ArrayElement,
-                    args,
-                )))
-            }
+            }) => Transformed::yes(array_element(*expr, *key)),
 
             // expr[start, stop, stride] ==> array_slice(expr, start, stop, stride)
             Expr::GetIndexedField(GetIndexedField {
@@ -178,13 +171,7 @@ impl FunctionRewrite for ArrayFunctionRewriter {
                         stop,
                         stride,
                     },
-            }) => {
-                let args = vec![*expr, *start, *stop, *stride];
-                Transformed::yes(Expr::ScalarFunction(ScalarFunction::new(
-                    BuiltinScalarFunction::ArraySlice,
-                    args,
-                )))
-            }
+            }) => Transformed::yes(array_slice(*expr, *start, *stop, *stride)),
 
             _ => Transformed::no(expr),
         };
diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs
index 3bad9347e1b4..78bfe09e37d9 100644
--- a/datafusion/physical-expr/src/array_expressions.rs
+++ b/datafusion/physical-expr/src/array_expressions.rs
@@ -33,9 +33,7 @@ use datafusion_common::cast::{
     as_generic_list_array, as_int64_array, as_large_list_array, as_list_array,
 };
 use datafusion_common::utils::array_into_list_array;
-use datafusion_common::{
-    exec_err, internal_datafusion_err, internal_err, plan_err, DataFusionError, Result,
-};
+use datafusion_common::{exec_err, internal_err, plan_err, Result};
 use itertools::Itertools;
 
 /// Computes a BooleanArray indicating equality or inequality between elements in a list array and a specified element array.
@@ -267,102 +265,6 @@ pub fn make_array(arrays: &[ArrayRef]) -> Result<ArrayRef> {
     }
 }
 
-fn general_array_element<O: OffsetSizeTrait>(
-    array: &GenericListArray<O>,
-    indexes: &Int64Array,
-) -> Result<ArrayRef>
-where
-    i64: TryInto<O>,
-{
-    let values = array.values();
-    let original_data = values.to_data();
-    let capacity = Capacities::Array(original_data.len());
-
-    // use_nulls: true, we don't construct List for array_element, so we need explicit nulls.
-    let mut mutable =
-        MutableArrayData::with_capacities(vec![&original_data], true, capacity);
-
-    fn adjusted_array_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
-    where
-        i64: TryInto<O>,
-    {
-        let index: O = index.try_into().map_err(|_| {
-            DataFusionError::Execution(format!(
-                "array_element got invalid index: {}",
-                index
-            ))
-        })?;
-        // 0 ~ len - 1
-        let adjusted_zero_index = if index < O::usize_as(0) {
-            index + len
-        } else {
-            index - O::usize_as(1)
-        };
-
-        if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
-            Ok(Some(adjusted_zero_index))
-        } else {
-            // Out of bounds
-            Ok(None)
-        }
-    }
-
-    for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
-        let start = offset_window[0];
-        let end = offset_window[1];
-        let len = end - start;
-
-        // array is null
-        if len == O::usize_as(0) {
-            mutable.extend_nulls(1);
-            continue;
-        }
-
-        let index = adjusted_array_index::<O>(indexes.value(row_index), len)?;
-
-        if let Some(index) = index {
-            let start = start.as_usize() + index.as_usize();
-            mutable.extend(0, start, start + 1_usize);
-        } else {
-            // Index out of bounds
-            mutable.extend_nulls(1);
-        }
-    }
-
-    let data = mutable.freeze();
-    Ok(arrow_array::make_array(data))
-}
-
-/// array_element SQL function
-///
-/// There are two arguments for array_element, the first one is the array, the second one is the 1-indexed index.
-/// `array_element(array, index)`
-///
-/// For example:
-/// > array_element(\[1, 2, 3], 2) -> 2
-pub fn array_element(args: &[ArrayRef]) -> Result<ArrayRef> {
-    if args.len() != 2 {
-        return exec_err!("array_element needs two arguments");
-    }
-
-    match &args[0].data_type() {
-        DataType::List(_) => {
-            let array = as_list_array(&args[0])?;
-            let indexes = as_int64_array(&args[1])?;
-            general_array_element::<i32>(array, indexes)
-        }
-        DataType::LargeList(_) => {
-            let array = as_large_list_array(&args[0])?;
-            let indexes = as_int64_array(&args[1])?;
-            general_array_element::<i64>(array, indexes)
-        }
-        _ => exec_err!(
-            "array_element does not support type: {:?}",
-            args[0].data_type()
-        ),
-    }
-}
-
 fn general_except<OffsetSize: OffsetSizeTrait>(
     l: &GenericListArray<OffsetSize>,
     r: &GenericListArray<OffsetSize>,
@@ -441,309 +343,6 @@ pub fn array_except(args: &[ArrayRef]) -> Result<ArrayRef> {
     }
 }
 
-/// array_slice SQL function
-///
-/// We follow the behavior of array_slice in DuckDB
-/// Note that array_slice is 1-indexed. And there are two additional arguments `from` and `to` in array_slice.
-///
-/// > array_slice(array, from, to)
-///
-/// Positive index is treated as the index from the start of the array. If the
-/// `from` index is smaller than 1, it is treated as 1. If the `to` index is larger than the
-/// length of the array, it is treated as the length of the array.
-///
-/// Negative index is treated as the index from the end of the array. If the index
-/// is larger than the length of the array, it is NOT VALID, either in `from` or `to`.
-/// The `to` index is exclusive like python slice syntax.
-///
-/// See test cases in `array.slt` for more details.
-pub fn array_slice(args: &[ArrayRef]) -> Result<ArrayRef> {
-    let args_len = args.len();
-    if args_len != 3 && args_len != 4 {
-        return exec_err!("array_slice needs three or four arguments");
-    }
-
-    let stride = if args_len == 4 {
-        Some(as_int64_array(&args[3])?)
-    } else {
-        None
-    };
-
-    let from_array = as_int64_array(&args[1])?;
-    let to_array = as_int64_array(&args[2])?;
-
-    let array_data_type = args[0].data_type();
-    match array_data_type {
-        DataType::List(_) => {
-            let array = as_list_array(&args[0])?;
-            general_array_slice::<i32>(array, from_array, to_array, stride)
-        }
-        DataType::LargeList(_) => {
-            let array = as_large_list_array(&args[0])?;
-            let from_array = as_int64_array(&args[1])?;
-            let to_array = as_int64_array(&args[2])?;
-            general_array_slice::<i64>(array, from_array, to_array, stride)
-        }
-        _ => exec_err!("array_slice does not support type: {:?}", array_data_type),
-    }
-}
-
-fn general_array_slice<O: OffsetSizeTrait>(
-    array: &GenericListArray<O>,
-    from_array: &Int64Array,
-    to_array: &Int64Array,
-    stride: Option<&Int64Array>,
-) -> Result<ArrayRef>
-where
-    i64: TryInto<O>,
-{
-    let values = array.values();
-    let original_data = values.to_data();
-    let capacity = Capacities::Array(original_data.len());
-
-    // use_nulls: false, we don't need nulls but empty array for array_slice, so we don't need explicit nulls but adjust offset to indicate nulls.
-    let mut mutable =
-        MutableArrayData::with_capacities(vec![&original_data], false, capacity);
-
-    // We have the slice syntax compatible with DuckDB v0.8.1.
-    // The rule `adjusted_from_index` and `adjusted_to_index` follows the rule of array_slice in duckdb.
-
-    fn adjusted_from_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
-    where
-        i64: TryInto<O>,
-    {
-        // 0 ~ len - 1
-        let adjusted_zero_index = if index < 0 {
-            if let Ok(index) = index.try_into() {
-                index + len
-            } else {
-                return exec_err!("array_slice got invalid index: {}", index);
-            }
-        } else {
-            // array_slice(arr, 1, to) is the same as array_slice(arr, 0, to)
-            if let Ok(index) = index.try_into() {
-                std::cmp::max(index - O::usize_as(1), O::usize_as(0))
-            } else {
-                return exec_err!("array_slice got invalid index: {}", index);
-            }
-        };
-
-        if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
-            Ok(Some(adjusted_zero_index))
-        } else {
-            // Out of bounds
-            Ok(None)
-        }
-    }
-
-    fn adjusted_to_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
-    where
-        i64: TryInto<O>,
-    {
-        // 0 ~ len - 1
-        let adjusted_zero_index = if index < 0 {
-            // array_slice in duckdb with negative to_index is python-like, so index itself is exclusive
-            if let Ok(index) = index.try_into() {
-                index + len
-            } else {
-                return exec_err!("array_slice got invalid index: {}", index);
-            }
-        } else {
-            // array_slice(arr, from, len + 1) is the same as array_slice(arr, from, len)
-            if let Ok(index) = index.try_into() {
-                std::cmp::min(index - O::usize_as(1), len - O::usize_as(1))
-            } else {
-                return exec_err!("array_slice got invalid index: {}", index);
-            }
-        };
-
-        if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
-            Ok(Some(adjusted_zero_index))
-        } else {
-            // Out of bounds
-            Ok(None)
-        }
-    }
-
-    let mut offsets = vec![O::usize_as(0)];
-
-    for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
-        let start = offset_window[0];
-        let end = offset_window[1];
-        let len = end - start;
-
-        // len 0 indicate array is null, return empty array in this row.
-        if len == O::usize_as(0) {
-            offsets.push(offsets[row_index]);
-            continue;
-        }
-
-        // If index is null, we consider it as the minimum / maximum index of the array.
-        let from_index = if from_array.is_null(row_index) {
-            Some(O::usize_as(0))
-        } else {
-            adjusted_from_index::<O>(from_array.value(row_index), len)?
-        };
-
-        let to_index = if to_array.is_null(row_index) {
-            Some(len - O::usize_as(1))
-        } else {
-            adjusted_to_index::<O>(to_array.value(row_index), len)?
-        };
-
-        if let (Some(from), Some(to)) = (from_index, to_index) {
-            let stride = stride.map(|s| s.value(row_index));
-            // array_slice with stride in duckdb, return empty array if stride is not supported and from > to.
-            if stride.is_none() && from > to {
-                // return empty array
-                offsets.push(offsets[row_index]);
-                continue;
-            }
-            let stride = stride.unwrap_or(1);
-            if stride.is_zero() {
-                return exec_err!(
-                    "array_slice got invalid stride: {:?}, it cannot be 0",
-                    stride
-                );
-            } else if from <= to && stride.is_negative() {
-                // return empty array
-                offsets.push(offsets[row_index]);
-                continue;
-            }
-
-            let stride: O = stride.try_into().map_err(|_| {
-                internal_datafusion_err!("array_slice got invalid stride: {}", stride)
-            })?;
-
-            if from <= to {
-                assert!(start + to <= end);
-                if stride.eq(&O::one()) {
-                    // stride is default to 1
-                    mutable.extend(
-                        0,
-                        (start + from).to_usize().unwrap(),
-                        (start + to + O::usize_as(1)).to_usize().unwrap(),
-                    );
-                    offsets.push(offsets[row_index] + (to - from + O::usize_as(1)));
-                    continue;
-                }
-                let mut index = start + from;
-                let mut cnt = 0;
-                while index <= start + to {
-                    mutable.extend(
-                        0,
-                        index.to_usize().unwrap(),
-                        index.to_usize().unwrap() + 1,
-                    );
-                    index += stride;
-                    cnt += 1;
-                }
-                offsets.push(offsets[row_index] + O::usize_as(cnt));
-            } else {
-                let mut index = start + from;
-                let mut cnt = 0;
-                while index >= start + to {
-                    mutable.extend(
-                        0,
-                        index.to_usize().unwrap(),
-                        index.to_usize().unwrap() + 1,
-                    );
-                    index += stride;
-                    cnt += 1;
-                }
-                // invalid range, return empty array
-                offsets.push(offsets[row_index] + O::usize_as(cnt));
-            }
-        } else {
-            // invalid range, return empty array
-            offsets.push(offsets[row_index]);
-        }
-    }
-
-    let data = mutable.freeze();
-
-    Ok(Arc::new(GenericListArray::<O>::try_new(
-        Arc::new(Field::new("item", array.value_type(), true)),
-        OffsetBuffer::<O>::new(offsets.into()),
-        arrow_array::make_array(data),
-        None,
-    )?))
-}
-
-fn general_pop_front_list<O: OffsetSizeTrait>(
-    array: &GenericListArray<O>,
-) -> Result<ArrayRef>
-where
-    i64: TryInto<O>,
-{
-    let from_array = Int64Array::from(vec![2; array.len()]);
-    let to_array = Int64Array::from(
-        array
-            .iter()
-            .map(|arr| arr.map_or(0, |arr| arr.len() as i64))
-            .collect::<Vec<i64>>(),
-    );
-    general_array_slice::<O>(array, &from_array, &to_array, None)
-}
-
-fn general_pop_back_list<O: OffsetSizeTrait>(
-    array: &GenericListArray<O>,
-) -> Result<ArrayRef>
-where
-    i64: TryInto<O>,
-{
-    let from_array = Int64Array::from(vec![1; array.len()]);
-    let to_array = Int64Array::from(
-        array
-            .iter()
-            .map(|arr| arr.map_or(0, |arr| arr.len() as i64 - 1))
-            .collect::<Vec<i64>>(),
-    );
-    general_array_slice::<O>(array, &from_array, &to_array, None)
-}
-
-/// array_pop_front SQL function
-pub fn array_pop_front(args: &[ArrayRef]) -> Result<ArrayRef> {
-    let array_data_type = args[0].data_type();
-    match array_data_type {
-        DataType::List(_) => {
-            let array = as_list_array(&args[0])?;
-            general_pop_front_list::<i32>(array)
-        }
-        DataType::LargeList(_) => {
-            let array = as_large_list_array(&args[0])?;
-            general_pop_front_list::<i64>(array)
-        }
-        _ => exec_err!(
-            "array_pop_front does not support type: {:?}",
-            array_data_type
-        ),
-    }
-}
-
-/// array_pop_back SQL function
-pub fn array_pop_back(args: &[ArrayRef]) -> Result<ArrayRef> {
-    if args.len() != 1 {
-        return exec_err!("array_pop_back needs one argument");
-    }
-
-    let array_data_type = args[0].data_type();
-    match array_data_type {
-        DataType::List(_) => {
-            let array = as_list_array(&args[0])?;
-            general_pop_back_list::<i32>(array)
-        }
-        DataType::LargeList(_) => {
-            let array = as_large_list_array(&args[0])?;
-            general_pop_back_list::<i64>(array)
-        }
-        _ => exec_err!(
-            "array_pop_back does not support type: {:?}",
-            array_data_type
-        ),
-    }
-}
-
 /// Array_position SQL function
 pub fn array_position(args: &[ArrayRef]) -> Result<ArrayRef> {
     if args.len() < 2 || args.len() > 3 {
diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs
index 9169d83022f1..e05cabc18b3a 100644
--- a/datafusion/physical-expr/src/functions.rs
+++ b/datafusion/physical-expr/src/functions.rs
@@ -255,18 +255,9 @@ pub fn create_physical_fun(
         }
 
         // array functions
-        BuiltinScalarFunction::ArrayElement => Arc::new(|args| {
-            make_scalar_function_inner(array_expressions::array_element)(args)
-        }),
         BuiltinScalarFunction::ArrayExcept => Arc::new(|args| {
             make_scalar_function_inner(array_expressions::array_except)(args)
         }),
-        BuiltinScalarFunction::ArrayPopFront => Arc::new(|args| {
-            make_scalar_function_inner(array_expressions::array_pop_front)(args)
-        }),
-        BuiltinScalarFunction::ArrayPopBack => Arc::new(|args| {
-            make_scalar_function_inner(array_expressions::array_pop_back)(args)
-        }),
         BuiltinScalarFunction::ArrayPosition => Arc::new(|args| {
             make_scalar_function_inner(array_expressions::array_position)(args)
         }),
@@ -294,9 +285,6 @@ pub fn create_physical_fun(
         BuiltinScalarFunction::ArrayReverse => Arc::new(|args| {
             make_scalar_function_inner(array_expressions::array_reverse)(args)
         }),
-        BuiltinScalarFunction::ArraySlice => Arc::new(|args| {
-            make_scalar_function_inner(array_expressions::array_slice)(args)
-        }),
         BuiltinScalarFunction::ArrayIntersect => Arc::new(|args| {
             make_scalar_function_inner(array_expressions::array_intersect)(args)
         }),
diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto
index f63602ccb06f..04a8c5568322 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -640,8 +640,8 @@ enum ScalarFunction {
   ArrayReplace = 96;
   // 97 was ArrayToString
   // 98 was Cardinality
-  ArrayElement = 99;
-  ArraySlice = 100;
+  // 99 was ArrayElement
+  // 100 was ArraySlice
   Cot = 103;
   // 104 was ArrayHas
   // 105 was ArrayHasAny
@@ -655,7 +655,7 @@ enum ScalarFunction {
   // 113 was IsNan
   Iszero = 114;
   // 115 was ArrayEmpty
-  ArrayPopBack = 116;
+  // 116 was ArrayPopBack
   // 117 was StringToArray
   // 118 was ToTimestampNanos
   ArrayIntersect = 119;
@@ -663,7 +663,7 @@ enum ScalarFunction {
   OverLay = 121;
   /// 122 is Range
   ArrayExcept = 123;
-  ArrayPopFront = 124;
+  // 124 was ArrayPopFront
   Levenshtein = 125;
   SubstrIndex = 126;
   FindInSet = 127;
diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs
index ef7f0279f16e..6b64c87122a6 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -22955,8 +22955,6 @@ impl serde::Serialize for ScalarFunction {
             Self::ArrayPositions => "ArrayPositions",
             Self::ArrayRemove => "ArrayRemove",
             Self::ArrayReplace => "ArrayReplace",
-            Self::ArrayElement => "ArrayElement",
-            Self::ArraySlice => "ArraySlice",
             Self::Cot => "Cot",
             Self::ArrayRemoveN => "ArrayRemoveN",
             Self::ArrayReplaceN => "ArrayReplaceN",
@@ -22964,12 +22962,10 @@ impl serde::Serialize for ScalarFunction {
             Self::ArrayReplaceAll => "ArrayReplaceAll",
             Self::Nanvl => "Nanvl",
             Self::Iszero => "Iszero",
-            Self::ArrayPopBack => "ArrayPopBack",
             Self::ArrayIntersect => "ArrayIntersect",
             Self::ArrayUnion => "ArrayUnion",
             Self::OverLay => "OverLay",
             Self::ArrayExcept => "ArrayExcept",
-            Self::ArrayPopFront => "ArrayPopFront",
             Self::Levenshtein => "Levenshtein",
             Self::SubstrIndex => "SubstrIndex",
             Self::FindInSet => "FindInSet",
@@ -23049,8 +23045,6 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction {
             "ArrayPositions",
             "ArrayRemove",
             "ArrayReplace",
-            "ArrayElement",
-            "ArraySlice",
             "Cot",
             "ArrayRemoveN",
             "ArrayReplaceN",
@@ -23058,12 +23052,10 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction {
             "ArrayReplaceAll",
             "Nanvl",
             "Iszero",
-            "ArrayPopBack",
             "ArrayIntersect",
             "ArrayUnion",
             "OverLay",
             "ArrayExcept",
-            "ArrayPopFront",
             "Levenshtein",
             "SubstrIndex",
             "FindInSet",
@@ -23172,8 +23164,6 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction {
                     "ArrayPositions" => Ok(ScalarFunction::ArrayPositions),
                     "ArrayRemove" => Ok(ScalarFunction::ArrayRemove),
                     "ArrayReplace" => Ok(ScalarFunction::ArrayReplace),
-                    "ArrayElement" => Ok(ScalarFunction::ArrayElement),
-                    "ArraySlice" => Ok(ScalarFunction::ArraySlice),
                     "Cot" => Ok(ScalarFunction::Cot),
                     "ArrayRemoveN" => Ok(ScalarFunction::ArrayRemoveN),
                     "ArrayReplaceN" => Ok(ScalarFunction::ArrayReplaceN),
@@ -23181,12 +23171,10 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction {
                     "ArrayReplaceAll" => Ok(ScalarFunction::ArrayReplaceAll),
                     "Nanvl" => Ok(ScalarFunction::Nanvl),
                     "Iszero" => Ok(ScalarFunction::Iszero),
-                    "ArrayPopBack" => Ok(ScalarFunction::ArrayPopBack),
                     "ArrayIntersect" => Ok(ScalarFunction::ArrayIntersect),
                     "ArrayUnion" => Ok(ScalarFunction::ArrayUnion),
                     "OverLay" => Ok(ScalarFunction::OverLay),
                     "ArrayExcept" => Ok(ScalarFunction::ArrayExcept),
-                    "ArrayPopFront" => Ok(ScalarFunction::ArrayPopFront),
                     "Levenshtein" => Ok(ScalarFunction::Levenshtein),
                     "SubstrIndex" => Ok(ScalarFunction::SubstrIndex),
                     "FindInSet" => Ok(ScalarFunction::FindInSet),
diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs
index 4e59f4c69e66..4606b5ea2815 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -2913,8 +2913,8 @@ pub enum ScalarFunction {
     ArrayReplace = 96,
     /// 97 was ArrayToString
     /// 98 was Cardinality
-    ArrayElement = 99,
-    ArraySlice = 100,
+    /// 99 was ArrayElement
+    /// 100 was ArraySlice
     Cot = 103,
     /// 104 was ArrayHas
     /// 105 was ArrayHasAny
@@ -2928,7 +2928,7 @@ pub enum ScalarFunction {
     /// 113 was IsNan
     Iszero = 114,
     /// 115 was ArrayEmpty
-    ArrayPopBack = 116,
+    /// 116 was ArrayPopBack
     /// 117 was StringToArray
     /// 118 was ToTimestampNanos
     ArrayIntersect = 119,
@@ -2936,7 +2936,7 @@ pub enum ScalarFunction {
     OverLay = 121,
     /// / 122 is Range
     ArrayExcept = 123,
-    ArrayPopFront = 124,
+    /// 124 was ArrayPopFront
     Levenshtein = 125,
     SubstrIndex = 126,
     FindInSet = 127,
@@ -3023,8 +3023,6 @@ impl ScalarFunction {
             ScalarFunction::ArrayPositions => "ArrayPositions",
             ScalarFunction::ArrayRemove => "ArrayRemove",
             ScalarFunction::ArrayReplace => "ArrayReplace",
-            ScalarFunction::ArrayElement => "ArrayElement",
-            ScalarFunction::ArraySlice => "ArraySlice",
             ScalarFunction::Cot => "Cot",
             ScalarFunction::ArrayRemoveN => "ArrayRemoveN",
             ScalarFunction::ArrayReplaceN => "ArrayReplaceN",
@@ -3032,12 +3030,10 @@ impl ScalarFunction {
             ScalarFunction::ArrayReplaceAll => "ArrayReplaceAll",
             ScalarFunction::Nanvl => "Nanvl",
             ScalarFunction::Iszero => "Iszero",
-            ScalarFunction::ArrayPopBack => "ArrayPopBack",
             ScalarFunction::ArrayIntersect => "ArrayIntersect",
             ScalarFunction::ArrayUnion => "ArrayUnion",
             ScalarFunction::OverLay => "OverLay",
             ScalarFunction::ArrayExcept => "ArrayExcept",
-            ScalarFunction::ArrayPopFront => "ArrayPopFront",
             ScalarFunction::Levenshtein => "Levenshtein",
             ScalarFunction::SubstrIndex => "SubstrIndex",
             ScalarFunction::FindInSet => "FindInSet",
@@ -3111,8 +3107,6 @@ impl ScalarFunction {
             "ArrayPositions" => Some(Self::ArrayPositions),
             "ArrayRemove" => Some(Self::ArrayRemove),
             "ArrayReplace" => Some(Self::ArrayReplace),
-            "ArrayElement" => Some(Self::ArrayElement),
-            "ArraySlice" => Some(Self::ArraySlice),
             "Cot" => Some(Self::Cot),
             "ArrayRemoveN" => Some(Self::ArrayRemoveN),
             "ArrayReplaceN" => Some(Self::ArrayReplaceN),
@@ -3120,12 +3114,10 @@ impl ScalarFunction {
             "ArrayReplaceAll" => Some(Self::ArrayReplaceAll),
             "Nanvl" => Some(Self::Nanvl),
             "Iszero" => Some(Self::Iszero),
-            "ArrayPopBack" => Some(Self::ArrayPopBack),
             "ArrayIntersect" => Some(Self::ArrayIntersect),
             "ArrayUnion" => Some(Self::ArrayUnion),
             "OverLay" => Some(Self::OverLay),
             "ArrayExcept" => Some(Self::ArrayExcept),
-            "ArrayPopFront" => Some(Self::ArrayPopFront),
             "Levenshtein" => Some(Self::Levenshtein),
             "SubstrIndex" => Some(Self::SubstrIndex),
             "FindInSet" => Some(Self::FindInSet),
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs
index 0b6d979e4603..41824f7a028d 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -47,11 +47,11 @@ use datafusion_common::{
 use datafusion_expr::expr::Unnest;
 use datafusion_expr::window_frame::{check_window_frame, regularize_window_order_by};
 use datafusion_expr::{
-    acosh, array_element, array_except, array_intersect, array_pop_back, array_pop_front,
-    array_position, array_positions, array_remove, array_remove_all, array_remove_n,
-    array_replace, array_replace_all, array_replace_n, array_slice, array_union, ascii,
-    asinh, atan, atan2, atanh, bit_length, btrim, cbrt, ceil, character_length, chr,
-    coalesce, concat_expr, concat_ws_expr, cos, cosh, cot, degrees, ends_with, exp,
+    acosh, array_except, array_intersect, array_position, array_positions, array_remove,
+    array_remove_all, array_remove_n, array_replace, array_replace_all, array_replace_n,
+    array_union, ascii, asinh, atan, atan2, atanh, bit_length, btrim, cbrt, ceil,
+    character_length, chr, coalesce, concat_expr, concat_ws_expr, cos, cosh, cot,
+    degrees, ends_with, exp,
     expr::{self, InList, Sort, WindowFunction},
     factorial, find_in_set, floor, gcd, initcap, iszero, lcm, left, levenshtein, ln, log,
     log10, log2,
@@ -472,9 +472,6 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
             ScalarFunction::Ltrim => Self::Ltrim,
             ScalarFunction::Rtrim => Self::Rtrim,
             ScalarFunction::ArrayExcept => Self::ArrayExcept,
-            ScalarFunction::ArrayElement => Self::ArrayElement,
-            ScalarFunction::ArrayPopFront => Self::ArrayPopFront,
-            ScalarFunction::ArrayPopBack => Self::ArrayPopBack,
             ScalarFunction::ArrayPosition => Self::ArrayPosition,
             ScalarFunction::ArrayPositions => Self::ArrayPositions,
             ScalarFunction::ArrayRemove => Self::ArrayRemove,
@@ -484,7 +481,6 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
             ScalarFunction::ArrayReplaceN => Self::ArrayReplaceN,
             ScalarFunction::ArrayReplaceAll => Self::ArrayReplaceAll,
             ScalarFunction::ArrayReverse => Self::ArrayReverse,
-            ScalarFunction::ArraySlice => Self::ArraySlice,
             ScalarFunction::ArrayIntersect => Self::ArrayIntersect,
             ScalarFunction::ArrayUnion => Self::ArrayUnion,
             ScalarFunction::Log2 => Self::Log2,
@@ -1380,12 +1376,6 @@ pub fn parse_expr(
                 ScalarFunction::Acosh => {
                     Ok(acosh(parse_expr(&args[0], registry, codec)?))
                 }
-                ScalarFunction::ArrayPopFront => {
-                    Ok(array_pop_front(parse_expr(&args[0], registry, codec)?))
-                }
-                ScalarFunction::ArrayPopBack => {
-                    Ok(array_pop_back(parse_expr(&args[0], registry, codec)?))
-                }
                 ScalarFunction::ArrayExcept => Ok(array_except(
                     parse_expr(&args[0], registry, codec)?,
                     parse_expr(&args[1], registry, codec)?,
@@ -1435,16 +1425,6 @@ pub fn parse_expr(
                 ScalarFunction::ArrayReverse => {
                     Ok(array_reverse(parse_expr(&args[0], registry, codec)?))
                 }
-                ScalarFunction::ArraySlice => Ok(array_slice(
-                    parse_expr(&args[0], registry, codec)?,
-                    parse_expr(&args[1], registry, codec)?,
-                    parse_expr(&args[2], registry, codec)?,
-                    parse_expr(&args[3], registry, codec)?,
-                )),
-                ScalarFunction::ArrayElement => Ok(array_element(
-                    parse_expr(&args[0], registry, codec)?,
-                    parse_expr(&args[1], registry, codec)?,
-                )),
                 ScalarFunction::ArrayUnion => Ok(array_union(
                     parse_expr(&args[0], registry, codec)?,
                     parse_expr(&args[1], registry, codec)?,
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs
index 3835c67b9192..196e02c93e67 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -1454,9 +1454,6 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction {
             BuiltinScalarFunction::Ltrim => Self::Ltrim,
             BuiltinScalarFunction::Rtrim => Self::Rtrim,
             BuiltinScalarFunction::ArrayExcept => Self::ArrayExcept,
-            BuiltinScalarFunction::ArrayElement => Self::ArrayElement,
-            BuiltinScalarFunction::ArrayPopFront => Self::ArrayPopFront,
-            BuiltinScalarFunction::ArrayPopBack => Self::ArrayPopBack,
             BuiltinScalarFunction::ArrayPosition => Self::ArrayPosition,
             BuiltinScalarFunction::ArrayPositions => Self::ArrayPositions,
             BuiltinScalarFunction::ArrayRemove => Self::ArrayRemove,
@@ -1466,7 +1463,6 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction {
             BuiltinScalarFunction::ArrayReplaceN => Self::ArrayReplaceN,
             BuiltinScalarFunction::ArrayReplaceAll => Self::ArrayReplaceAll,
             BuiltinScalarFunction::ArrayReverse => Self::ArrayReverse,
-            BuiltinScalarFunction::ArraySlice => Self::ArraySlice,
             BuiltinScalarFunction::ArrayIntersect => Self::ArrayIntersect,
             BuiltinScalarFunction::ArrayUnion => Self::ArrayUnion,
             BuiltinScalarFunction::Log2 => Self::Log2,
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 9bf68105f426..2ce044d36b5b 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -567,6 +567,15 @@ async fn roundtrip_expr_api() -> Result<()> {
         ),
         array_distinct(make_array(vec![lit(1), lit(3), lit(3), lit(2), lit(2)])),
         array_resize(make_array(vec![lit(1), lit(2), lit(3)]), lit(5), lit(0)),
+        array_element(make_array(vec![lit(1), lit(2), lit(3)]), lit(2)),
+        array_slice(
+            make_array(vec![lit(1), lit(2), lit(3)]),
+            lit(1),
+            lit(2),
+            lit(1),
+        ),
+        array_pop_front(make_array(vec![lit(1), lit(2), lit(3)])),
+        array_pop_back(make_array(vec![lit(1), lit(2), lit(3)])),
     ];
 
     // ensure expressions created with the expr api can be round tripped