Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
Co-authored-by: Sarvesh Tandon <[email protected]>
Browse files Browse the repository at this point in the history
  • Loading branch information
connortsui20 committed Mar 29, 2024
1 parent 649683c commit 20d2a04
Show file tree
Hide file tree
Showing 4 changed files with 331 additions and 227 deletions.
227 changes: 0 additions & 227 deletions eggstrain/src/execution/operators/aggregate.rs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use arrow::array::{ArrayRef, RecordBatch};
use datafusion::common::Result;
use datafusion::physical_plan::{aggregates::PhysicalGroupBy, PhysicalExpr};
use std::sync::Arc;

/// Evaluates expressions against a record batch.
pub(crate) fn evaluate(
expr: &[Arc<dyn PhysicalExpr>],
batch: &RecordBatch,
) -> Result<Vec<ArrayRef>> {
expr.iter()
.map(|expr| {
expr.evaluate(batch)
.and_then(|v| v.into_array(batch.num_rows()))
})
.collect()
}

/// Evaluates expressions against a record batch.
pub(crate) fn evaluate_many(
expr: &[Vec<Arc<dyn PhysicalExpr>>],
batch: &RecordBatch,
) -> Result<Vec<Vec<ArrayRef>>> {
expr.iter().map(|expr| evaluate(expr, batch)).collect()
}

pub(crate) fn evaluate_optional(
expr: &[Option<Arc<dyn PhysicalExpr>>],
batch: &RecordBatch,
) -> Result<Vec<Option<ArrayRef>>> {
expr.iter()
.map(|expr| {
expr.as_ref()
.map(|expr| {
expr.evaluate(batch)
.and_then(|v| v.into_array(batch.num_rows()))
})
.transpose()
})
.collect()
}

/// Evaluate a group by expression against a `RecordBatch`
///
/// Arguments:
/// `group_by`: the expression to evaluate
/// `batch`: the `RecordBatch` to evaluate against
///
/// Returns: A Vec of Vecs of Array of results
/// The outer Vect appears to be for grouping sets
/// The inner Vect contains the results per expression
/// The inner-inner Array contains the results per row
pub(crate) fn evaluate_group_by(
group_by: &PhysicalGroupBy,
batch: &RecordBatch,
) -> Result<Vec<Vec<ArrayRef>>> {
let exprs: Vec<ArrayRef> = group_by
.expr()
.iter()
.map(|(expr, _)| {
let value = expr.evaluate(batch)?;
value.into_array(batch.num_rows())
})
.collect::<Result<Vec<_>>>()?;

let null_exprs: Vec<ArrayRef> = group_by
.null_expr()
.iter()
.map(|(expr, _)| {
let value = expr.evaluate(batch)?;
value.into_array(batch.num_rows())
})
.collect::<Result<Vec<_>>>()?;

Ok(group_by
.groups()
.iter()
.map(|group| {
group
.iter()
.enumerate()
.map(|(idx, is_null)| {
if *is_null {
null_exprs[idx].clone()
} else {
exprs[idx].clone()
}
})
.collect()
})
.collect())
}
10 changes: 10 additions & 0 deletions eggstrain/src/execution/operators/aggregate/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
mod datafusion_aggregate;
use datafusion_aggregate::*;

mod operator;
use operator::*;





Loading

0 comments on commit 20d2a04

Please sign in to comment.