Skip to content

Commit

Permalink
Merge branch 'apache:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
Omega359 authored Apr 7, 2024
2 parents e7de59a + 85b4e40 commit f583e5c
Show file tree
Hide file tree
Showing 57 changed files with 883 additions and 675 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ members = [
"datafusion/core",
"datafusion/expr",
"datafusion/execution",
"datafusion/functions-aggregate",
"datafusion/functions",
"datafusion/functions-array",
"datafusion/optimizer",
Expand All @@ -49,7 +50,7 @@ homepage = "https://github.com/apache/arrow-datafusion"
license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/apache/arrow-datafusion"
rust-version = "1.72"
rust-version = "1.73"
version = "37.0.0"

[workspace.dependencies]
Expand Down Expand Up @@ -78,6 +79,7 @@ datafusion-common-runtime = { path = "datafusion/common-runtime", version = "37.
datafusion-execution = { path = "datafusion/execution", version = "37.0.0" }
datafusion-expr = { path = "datafusion/expr", version = "37.0.0" }
datafusion-functions = { path = "datafusion/functions", version = "37.0.0" }
datafusion-functions-aggregate = { path = "datafusion/functions-aggregate", version = "37.0.0" }
datafusion-functions-array = { path = "datafusion/functions-array", version = "37.0.0" }
datafusion-optimizer = { path = "datafusion/optimizer", version = "37.0.0", default-features = false }
datafusion-physical-expr = { path = "datafusion/physical-expr", version = "37.0.0", default-features = false }
Expand Down
17 changes: 17 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ license = "Apache-2.0"
homepage = "https://github.com/apache/arrow-datafusion"
repository = "https://github.com/apache/arrow-datafusion"
# Specify MSRV here as `cargo msrv` doesn't support workspace version
rust-version = "1.72"
rust-version = "1.73"
readme = "README.md"

[dependencies]
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.

FROM rust:1.72-bullseye as builder
FROM rust:1.73-bullseye as builder

COPY . /usr/src/arrow-datafusion
COPY ./datafusion /usr/src/arrow-datafusion/datafusion
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ authors = { workspace = true }
# Specify MSRV here as `cargo msrv` doesn't support workspace version and fails with
# "Unable to find key 'package.rust-version' (or 'package.metadata.msrv') in 'arrow-datafusion/Cargo.toml'"
# https://github.com/foresterre/cargo-msrv/issues/590
rust-version = "1.72"
rust-version = "1.73"

[lib]
name = "datafusion"
Expand Down Expand Up @@ -98,6 +98,7 @@ datafusion-common-runtime = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-functions = { workspace = true }
datafusion-functions-aggregate = { workspace = true }
datafusion-functions-array = { workspace = true, optional = true }
datafusion-optimizer = { workspace = true }
datafusion-physical-expr = { workspace = true }
Expand Down
29 changes: 7 additions & 22 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use crate::{
datasource::{provider_as_source, MemTable, TableProvider, ViewTable},
error::{DataFusionError, Result},
execution::{options::ArrowReadOptions, runtime_env::RuntimeEnv, FunctionRegistry},
logical_expr::AggregateUDF,
logical_expr::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction,
CreateMemoryTable, CreateView, DropCatalogSchema, DropFunction, DropTable,
Expand All @@ -53,10 +54,11 @@ use crate::{
optimizer::analyzer::{Analyzer, AnalyzerRule},
optimizer::optimizer::{Optimizer, OptimizerConfig, OptimizerRule},
physical_optimizer::optimizer::{PhysicalOptimizer, PhysicalOptimizerRule},
physical_plan::{udaf::AggregateUDF, udf::ScalarUDF, ExecutionPlan},
physical_plan::{udf::ScalarUDF, ExecutionPlan},
physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner},
variable::{VarProvider, VarType},
};
use crate::{functions, functions_aggregate, functions_array};

use arrow::datatypes::{DataType, SchemaRef};
use arrow::record_batch::RecordBatch;
Expand All @@ -69,14 +71,11 @@ use datafusion_common::{
SchemaReference, TableReference,
};
use datafusion_execution::registry::SerializerRegistry;
use datafusion_expr::type_coercion::aggregates::NUMERICS;
use datafusion_expr::{create_first_value, Signature, Volatility};
use datafusion_expr::{
logical_plan::{DdlStatement, Statement},
var_provider::is_system_variables,
Expr, StringifiedPlan, UserDefinedLogicalNode, WindowUDF,
};
use datafusion_physical_expr::create_first_value_accumulator;
use datafusion_sql::{
parser::{CopyToSource, CopyToStatement, DFParser},
planner::{object_name_to_table_reference, ContextProvider, ParserOptions, SqlToRel},
Expand All @@ -85,7 +84,6 @@ use datafusion_sql::{

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use log::debug;
use parking_lot::RwLock;
use sqlparser::dialect::dialect_from_str;
use url::Url;
Expand Down Expand Up @@ -1452,29 +1450,16 @@ impl SessionState {
};

// register built in functions
datafusion_functions::register_all(&mut new_self)
functions::register_all(&mut new_self)
.expect("can not register built in functions");

// register crate of array expressions (if enabled)
#[cfg(feature = "array_expressions")]
datafusion_functions_array::register_all(&mut new_self)
functions_array::register_all(&mut new_self)
.expect("can not register array expressions");

let first_value = create_first_value(
"FIRST_VALUE",
Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable),
Arc::new(create_first_value_accumulator),
);

match new_self.register_udaf(Arc::new(first_value)) {
Ok(Some(existing_udaf)) => {
debug!("Overwrite existing UDAF: {}", existing_udaf.name());
}
Ok(None) => {}
Err(err) => {
panic!("Failed to register UDAF: {}", err);
}
}
functions_aggregate::register_all(&mut new_self)
.expect("can not register aggregate functions");

new_self
}
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,11 @@ pub mod functions_array {
pub use datafusion_functions_array::*;
}

/// re-export of [`datafusion_functions_aggregate`] crate
pub mod functions_aggregate {
pub use datafusion_functions_aggregate::*;
}

#[cfg(test)]
pub mod test;
pub mod test_util;
Expand Down
84 changes: 0 additions & 84 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use crate::expr::{
use crate::function::{
AccumulatorArgs, AccumulatorFactoryFunction, PartitionEvaluatorFactory,
};
use crate::udaf::format_state_name;
use crate::{
aggregate_function, built_in_function, conditional_expressions::CaseBuilder,
logical_plan::Subquery, AggregateUDF, BuiltinScalarFunction, Expr, LogicalPlan,
Expand Down Expand Up @@ -696,17 +695,6 @@ pub fn create_udaf(
))
}

/// Creates a new UDAF with a specific signature, state type and return type.
/// The signature and state type must match the `Accumulator's implementation`.
/// TOOD: We plan to move aggregate function to its own crate. This function will be deprecated then.
pub fn create_first_value(
name: &str,
signature: Signature,
accumulator: AccumulatorFactoryFunction,
) -> AggregateUDF {
AggregateUDF::from(FirstValue::new(name, signature, accumulator))
}

/// Implements [`AggregateUDFImpl`] for functions that have a single signature and
/// return type.
pub struct SimpleAggregateUDF {
Expand Down Expand Up @@ -801,78 +789,6 @@ impl AggregateUDFImpl for SimpleAggregateUDF {
}
}

pub struct FirstValue {
name: String,
signature: Signature,
accumulator: AccumulatorFactoryFunction,
}

impl Debug for FirstValue {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("FirstValue")
.field("name", &self.name)
.field("signature", &self.signature)
.field("accumulator", &"<FUNC>")
.finish()
}
}

impl FirstValue {
pub fn new(
name: impl Into<String>,
signature: Signature,
accumulator: AccumulatorFactoryFunction,
) -> Self {
let name = name.into();
Self {
name,
signature,
accumulator,
}
}
}

impl AggregateUDFImpl for FirstValue {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
&self.name
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
Ok(arg_types[0].clone())
}

fn accumulator(
&self,
acc_args: AccumulatorArgs,
) -> Result<Box<dyn crate::Accumulator>> {
(self.accumulator)(acc_args)
}

fn state_fields(
&self,
name: &str,
value_type: DataType,
ordering_fields: Vec<Field>,
) -> Result<Vec<Field>> {
let mut fields = vec![Field::new(
format_state_name(name, "first_value"),
value_type,
true,
)];
fields.extend(ordering_fields);
fields.push(Field::new("is_set", DataType::Boolean, true));
Ok(fields)
}
}

/// Creates a new UDWF with a specific signature, state type and return type.
///
/// The signature and state type must match the [`PartitionEvaluator`]'s implementation`.
Expand Down
7 changes: 1 addition & 6 deletions datafusion/expr/src/udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use crate::function::AccumulatorArgs;
use crate::groups_accumulator::GroupsAccumulator;
use crate::utils::format_state_name;
use crate::{Accumulator, Expr};
use crate::{AccumulatorFactoryFunction, ReturnTypeFunction, Signature};
use arrow::datatypes::{DataType, Field};
Expand Down Expand Up @@ -447,9 +448,3 @@ impl AggregateUDFImpl for AggregateUDFLegacyWrapper {
(self.accumulator)(acc_args)
}
}

/// returns the name of the state
/// TODO: Remove duplicated function in physical-expr
pub(crate) fn format_state_name(name: &str, state_name: &str) -> String {
format!("{name}[{state_name}]")
}
18 changes: 12 additions & 6 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ pub fn grouping_set_to_exprlist(group_expr: &[Expr]) -> Result<Vec<Expr>> {
/// Recursively walk an expression tree, collecting the unique set of columns
/// referenced in the expression
pub fn expr_to_columns(expr: &Expr, accum: &mut HashSet<Column>) -> Result<()> {
inspect_expr_pre(expr, |expr| {
expr.apply(&mut |expr| {
match expr {
Expr::Column(qc) => {
accum.insert(qc.clone());
Expand Down Expand Up @@ -307,8 +307,9 @@ pub fn expr_to_columns(expr: &Expr, accum: &mut HashSet<Column>) -> Result<()> {
| Expr::Placeholder(_)
| Expr::OuterReferenceColumn { .. } => {}
}
Ok(())
Ok(TreeNodeRecursion::Continue)
})
.map(|_| ())
}

/// Find excluded columns in the schema, if any
Expand Down Expand Up @@ -838,11 +839,11 @@ pub fn find_column_exprs(exprs: &[Expr]) -> Vec<Expr> {

pub(crate) fn find_columns_referenced_by_expr(e: &Expr) -> Vec<Column> {
let mut exprs = vec![];
inspect_expr_pre(e, |expr| {
e.apply(&mut |expr| {
if let Expr::Column(c) = expr {
exprs.push(c.clone())
}
Ok(()) as Result<()>
Ok(TreeNodeRecursion::Continue)
})
// As the closure always returns Ok, this "can't" error
.expect("Unexpected error");
Expand All @@ -867,7 +868,7 @@ pub(crate) fn find_column_indexes_referenced_by_expr(
schema: &DFSchemaRef,
) -> Vec<usize> {
let mut indexes = vec![];
inspect_expr_pre(e, |expr| {
e.apply(&mut |expr| {
match expr {
Expr::Column(qc) => {
if let Ok(idx) = schema.index_of_column(qc) {
Expand All @@ -879,7 +880,7 @@ pub(crate) fn find_column_indexes_referenced_by_expr(
}
_ => {}
}
Ok(()) as Result<()>
Ok(TreeNodeRecursion::Continue)
})
.unwrap();
indexes
Expand Down Expand Up @@ -1240,6 +1241,11 @@ pub fn merge_schema(inputs: Vec<&LogicalPlan>) -> DFSchema {
}
}

/// Build state name. State is the intermidiate state of the aggregate function.
pub fn format_state_name(name: &str, state_name: &str) -> String {
format!("{name}[{state_name}]")
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading

0 comments on commit f583e5c

Please sign in to comment.