Skip to content

Commit

Permalink
move accumulator and columnar value (#1765)
Browse files Browse the repository at this point in the history
  • Loading branch information
jimexist authored Feb 9, 2022
1 parent e8c198b commit ed9b049
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 65 deletions.
44 changes: 44 additions & 0 deletions datafusion-expr/src/accumulator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::ArrayRef;
use datafusion_common::{Result, ScalarValue};
use std::fmt::Debug;

/// An accumulator represents a stateful object that lives throughout the evaluation of multiple rows and
/// generically accumulates values.
///
/// An accumulator knows how to:
/// * update its state from inputs via `update_batch`
/// * convert its internal state to a vector of scalar values
/// * update its state from multiple accumulators' states via `merge_batch`
/// * compute the final value from its internal state via `evaluate`
pub trait Accumulator: Send + Sync + Debug {
/// Returns the state of the accumulator at the end of the accumulation.
// in the case of an average on which we track `sum` and `n`, this function should return a vector
// of two values, sum and n.
fn state(&self) -> Result<Vec<ScalarValue>>;

/// updates the accumulator's state from a vector of arrays.
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>;

/// updates the accumulator's state from a vector of states.
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()>;

/// returns its value based on its current state.
fn evaluate(&self) -> Result<ScalarValue>;
}
60 changes: 60 additions & 0 deletions datafusion-expr/src/columnar_value.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::ArrayRef;
use arrow::array::NullArray;
use arrow::datatypes::DataType;
use arrow::record_batch::RecordBatch;
use datafusion_common::ScalarValue;
use std::sync::Arc;

/// Represents the result from an expression
#[derive(Clone)]
pub enum ColumnarValue {
/// Array of values
Array(ArrayRef),
/// A single value
Scalar(ScalarValue),
}

impl ColumnarValue {
pub fn data_type(&self) -> DataType {
match self {
ColumnarValue::Array(array_value) => array_value.data_type().clone(),
ColumnarValue::Scalar(scalar_value) => scalar_value.get_datatype(),
}
}

/// Convert a columnar value into an ArrayRef
pub fn into_array(self, num_rows: usize) -> ArrayRef {
match self {
ColumnarValue::Array(array) => array,
ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
}
}
}

/// null columnar values are implemented as a null array in order to pass batch
/// num_rows
pub type NullColumnarValue = ColumnarValue;

impl From<&RecordBatch> for NullColumnarValue {
fn from(batch: &RecordBatch) -> Self {
let num_rows = batch.num_rows();
ColumnarValue::Array(Arc::new(NullArray::new(num_rows)))
}
}
4 changes: 4 additions & 0 deletions datafusion-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@
// specific language governing permissions and limitations
// under the License.

mod accumulator;
mod aggregate_function;
mod built_in_function;
mod columnar_value;
mod operator;
mod signature;
mod window_frame;
mod window_function;

pub use accumulator::Accumulator;
pub use aggregate_function::AggregateFunction;
pub use built_in_function::BuiltinScalarFunction;
pub use columnar_value::{ColumnarValue, NullColumnarValue};
pub use operator::Operator;
pub use signature::{Signature, TypeSignature, Volatility};
pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};
Expand Down
18 changes: 3 additions & 15 deletions datafusion/src/physical_plan/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,17 @@ use crate::{
scalar::ScalarValue,
};
use arrow::{
array::{ArrayRef, NullArray},
array::ArrayRef,
compute::kernels::length::{bit_length, length},
datatypes::TimeUnit,
datatypes::{DataType, Field, Int32Type, Int64Type, Schema},
record_batch::RecordBatch,
};
pub use datafusion_expr::NullColumnarValue;
pub use datafusion_expr::{BuiltinScalarFunction, Signature, TypeSignature, Volatility};
use fmt::{Debug, Formatter};
use std::convert::From;
use std::{any::Any, fmt, sync::Arc};

pub use datafusion_expr::{BuiltinScalarFunction, Signature, TypeSignature, Volatility};

/// Scalar function
///
/// The Fn param is the wrapped function but be aware that the function will
Expand Down Expand Up @@ -1206,17 +1205,6 @@ impl fmt::Display for ScalarFunctionExpr {
}
}

/// null columnar values are implemented as a null array in order to pass batch
/// num_rows
type NullColumnarValue = ColumnarValue;

impl From<&RecordBatch> for NullColumnarValue {
fn from(batch: &RecordBatch) -> Self {
let num_rows = batch.num_rows();
ColumnarValue::Array(Arc::new(NullArray::new(num_rows)))
}
}

impl PhysicalExpr for ScalarFunctionExpr {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
Expand Down
52 changes: 2 additions & 50 deletions datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use arrow::{array::ArrayRef, datatypes::Field};
use async_trait::async_trait;
pub use datafusion_expr::Accumulator;
pub use datafusion_expr::ColumnarValue;
pub use display::DisplayFormatType;
use futures::stream::Stream;
use std::fmt;
Expand Down Expand Up @@ -419,32 +421,6 @@ pub enum Distribution {
HashPartitioned(Vec<Arc<dyn PhysicalExpr>>),
}

/// Represents the result from an expression
#[derive(Clone)]
pub enum ColumnarValue {
/// Array of values
Array(ArrayRef),
/// A single value
Scalar(ScalarValue),
}

impl ColumnarValue {
fn data_type(&self) -> DataType {
match self {
ColumnarValue::Array(array_value) => array_value.data_type().clone(),
ColumnarValue::Scalar(scalar_value) => scalar_value.get_datatype(),
}
}

/// Convert a columnar value into an ArrayRef
pub fn into_array(self, num_rows: usize) -> ArrayRef {
match self {
ColumnarValue::Array(array) => array,
ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
}
}
}

/// Expression that can be evaluated against a RecordBatch
/// A Physical expression knows its type, nullability and how to evaluate itself.
pub trait PhysicalExpr: Send + Sync + Display + Debug {
Expand Down Expand Up @@ -578,30 +554,6 @@ pub trait WindowExpr: Send + Sync + Debug {
}
}

/// An accumulator represents a stateful object that lives throughout the evaluation of multiple rows and
/// generically accumulates values.
///
/// An accumulator knows how to:
/// * update its state from inputs via `update_batch`
/// * convert its internal state to a vector of scalar values
/// * update its state from multiple accumulators' states via `merge_batch`
/// * compute the final value from its internal state via `evaluate`
pub trait Accumulator: Send + Sync + Debug {
/// Returns the state of the accumulator at the end of the accumulation.
// in the case of an average on which we track `sum` and `n`, this function should return a vector
// of two values, sum and n.
fn state(&self) -> Result<Vec<ScalarValue>>;

/// updates the accumulator's state from a vector of arrays.
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>;

/// updates the accumulator's state from a vector of states.
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()>;

/// returns its value based on its current state.
fn evaluate(&self) -> Result<ScalarValue>;
}

/// Applies an optional projection to a [`SchemaRef`], returning the
/// projected schema
///
Expand Down

0 comments on commit ed9b049

Please sign in to comment.