Skip to content

Commit

Permalink
all done
Browse files Browse the repository at this point in the history
Signed-off-by: jayzhan211 <[email protected]>
  • Loading branch information
jayzhan211 committed Apr 23, 2024
1 parent adc87a1 commit 1adfcde
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 476 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ use std::borrow::Cow;
use std::hash::{Hash, Hasher};
use std::{any::Any, sync::Arc};

use crate::expressions::{try_cast, NoOp};
use crate::expressions::no_op::NoOp;
use crate::expressions::try_cast::try_cast;
use crate::physical_expr::down_cast_any_ref;
use crate::PhysicalExpr;
use crate::physical_expr::PhysicalExpr;

use arrow::array::*;
use arrow::compute::kernels::cmp::eq;
Expand Down Expand Up @@ -414,7 +415,11 @@ pub fn case(
#[cfg(test)]
mod tests {
use super::*;
use crate::expressions::{binary, cast, col, lit};

use crate::expressions::binary::binary;
use crate::expressions::cast::cast;
use crate::expressions::column::col;
use crate::expressions::literal::{lit, Literal};

use arrow::array::StringArray;
use arrow::buffer::Buffer;
Expand Down Expand Up @@ -959,16 +964,15 @@ mod tests {
let expr2 = expr
.clone()
.transform(|e| {
let transformed =
match e.as_any().downcast_ref::<crate::expressions::Literal>() {
Some(lit_value) => match lit_value.value() {
ScalarValue::Utf8(Some(str_value)) => {
Some(lit(str_value.to_uppercase()))
}
_ => None,
},
let transformed = match e.as_any().downcast_ref::<Literal>() {
Some(lit_value) => match lit_value.value() {
ScalarValue::Utf8(Some(str_value)) => {
Some(lit(str_value.to_uppercase()))
}
_ => None,
};
},
_ => None,
};
Ok(if let Some(transformed) = transformed {
Transformed::yes(transformed)
} else {
Expand All @@ -981,16 +985,15 @@ mod tests {
let expr3 = expr
.clone()
.transform_down(|e| {
let transformed =
match e.as_any().downcast_ref::<crate::expressions::Literal>() {
Some(lit_value) => match lit_value.value() {
ScalarValue::Utf8(Some(str_value)) => {
Some(lit(str_value.to_uppercase()))
}
_ => None,
},
let transformed = match e.as_any().downcast_ref::<Literal>() {
Some(lit_value) => match lit_value.value() {
ScalarValue::Utf8(Some(str_value)) => {
Some(lit(str_value.to_uppercase()))
}
_ => None,
};
},
_ => None,
};
Ok(if let Some(transformed) = transformed {
Transformed::yes(transformed)
} else {
Expand Down
2 changes: 2 additions & 0 deletions datafusion/physical-expr-common/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#[macro_use]
pub mod binary;
pub mod case;
pub mod cast;
pub mod column;
pub mod datum;
Expand All @@ -28,5 +29,6 @@ pub mod is_null;
pub mod like;
pub mod literal;
pub mod negative;
pub mod no_op;
pub mod not;
pub mod try_cast;
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use arrow::{
};

use crate::physical_expr::down_cast_any_ref;
use crate::PhysicalExpr;
use crate::physical_expr::PhysicalExpr;
use datafusion_common::{internal_err, Result};
use datafusion_expr::ColumnarValue;

Expand Down
115 changes: 77 additions & 38 deletions datafusion/physical-expr-common/src/physical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use datafusion_expr::{
};

use crate::expressions::binary::binary;
use crate::expressions::case;
use crate::expressions::cast::cast;
use crate::expressions::column::Column;
use crate::expressions::in_list::in_list;
Expand Down Expand Up @@ -332,7 +333,6 @@ pub fn physical_exprs_equal(
/// * `e` - The logical expression
/// * `input_dfschema` - The DataFusion schema for the input, used to resolve `Column` references
/// to qualified or unqualified fields by name.
#[allow(clippy::only_used_in_recursion)]
pub fn create_physical_expr(
e: &Expr,
input_dfschema: &DFSchema,
Expand Down Expand Up @@ -451,43 +451,43 @@ pub fn create_physical_expr(
input_schema,
)
}
// Expr::Case(case) => {
// let expr: Option<Arc<dyn PhysicalExpr>> = if let Some(e) = &case.expr {
// Some(create_physical_expr(
// e.as_ref(),
// input_dfschema,
// execution_props,
// )?)
// } else {
// None
// };
// let (when_expr, then_expr): (Vec<&Expr>, Vec<&Expr>) = case
// .when_then_expr
// .iter()
// .map(|(w, t)| (w.as_ref(), t.as_ref()))
// .unzip();
// let when_expr =
// create_physical_exprs(when_expr, input_dfschema, execution_props)?;
// let then_expr =
// create_physical_exprs(then_expr, input_dfschema, execution_props)?;
// let when_then_expr: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> =
// when_expr
// .iter()
// .zip(then_expr.iter())
// .map(|(w, t)| (w.clone(), t.clone()))
// .collect();
// let else_expr: Option<Arc<dyn PhysicalExpr>> =
// if let Some(e) = &case.else_expr {
// Some(create_physical_expr(
// e.as_ref(),
// input_dfschema,
// execution_props,
// )?)
// } else {
// None
// };
// Ok(expressions::case(expr, when_then_expr, else_expr)?)
// }
Expr::Case(case) => {
let expr: Option<Arc<dyn PhysicalExpr>> = if let Some(e) = &case.expr {
Some(create_physical_expr(
e.as_ref(),
input_dfschema,
execution_props,
)?)
} else {
None
};
let (when_expr, then_expr): (Vec<&Expr>, Vec<&Expr>) = case
.when_then_expr
.iter()
.map(|(w, t)| (w.as_ref(), t.as_ref()))
.unzip();
let when_expr =
create_physical_exprs(when_expr, input_dfschema, execution_props)?;
let then_expr =
create_physical_exprs(then_expr, input_dfschema, execution_props)?;
let when_then_expr: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> =
when_expr
.iter()
.zip(then_expr.iter())
.map(|(w, t)| (w.clone(), t.clone()))
.collect();
let else_expr: Option<Arc<dyn PhysicalExpr>> =
if let Some(e) = &case.else_expr {
Some(create_physical_expr(
e.as_ref(),
input_dfschema,
execution_props,
)?)
} else {
None
};
Ok(case::case(expr, when_then_expr, else_expr)?)
}
Expr::Cast(Cast { expr, data_type }) => cast(
create_physical_expr(expr, input_dfschema, execution_props)?,
input_schema,
Expand Down Expand Up @@ -604,3 +604,42 @@ where
.map(|expr| create_physical_expr(expr, input_dfschema, execution_props))
.collect::<Result<Vec<_>>>()
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use arrow::array::{ArrayRef, BooleanArray, RecordBatch, StringArray};
use arrow::datatypes::{DataType, Field, Schema};

use datafusion_common::{DFSchema, Result};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::{col, lit};

use super::create_physical_expr;

#[test]
fn test_create_physical_expr_scalar_input_output() -> Result<()> {
let expr = col("letter").eq(lit("A"));

let schema = Schema::new(vec![Field::new("letter", DataType::Utf8, false)]);
let df_schema = DFSchema::try_from_qualified_schema("data", &schema)?;
let p = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())?;

let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(StringArray::from_iter_values(vec![
"A", "B", "C", "D",
]))],
)?;
let result = p.evaluate(&batch)?;
let result = result.into_array(4).expect("Failed to convert to array");

assert_eq!(
&result,
&(Arc::new(BooleanArray::from(vec![true, false, false, false,])) as ArrayRef)
);

Ok(())
}
}
6 changes: 2 additions & 4 deletions datafusion/physical-expr/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

//! Defines physical expressions that can evaluated at runtime during query execution

mod case;
mod column;
mod no_op;

/// Module with some convenient methods used in expression building
pub mod helpers {
Expand Down Expand Up @@ -67,10 +65,10 @@ pub use datafusion_functions_aggregate::first_last::{
FirstValuePhysicalExpr as FirstValue, LastValuePhysicalExpr as LastValue,
};

pub use case::{case, CaseExpr};
pub use column::UnKnownColumn;
pub use datafusion_expr::utils::format_state_name;
pub use datafusion_physical_expr_common::expressions::binary::{binary, BinaryExpr};
pub use datafusion_physical_expr_common::expressions::case::{case, CaseExpr};
pub use datafusion_physical_expr_common::expressions::cast::{
cast, cast_with_options, CastExpr,
};
Expand All @@ -85,9 +83,9 @@ pub use datafusion_physical_expr_common::expressions::literal::{lit, Literal};
pub use datafusion_physical_expr_common::expressions::negative::{
negative, NegativeExpr,
};
pub use datafusion_physical_expr_common::expressions::no_op::NoOp;
pub use datafusion_physical_expr_common::expressions::not::{not, NotExpr};
pub use datafusion_physical_expr_common::expressions::try_cast::{try_cast, TryCastExpr};
pub use no_op::NoOp;

#[cfg(test)]
pub(crate) mod tests {
Expand Down
Loading

0 comments on commit 1adfcde

Please sign in to comment.