Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
panics on accumulator update batch
Browse files Browse the repository at this point in the history
  • Loading branch information
SarveshOO7 committed Apr 2, 2024
1 parent ab12e42 commit 60d4bd3
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use arrow::array::{downcast_primitive, ArrayRef, RecordBatch};
use arrow::array::{ArrayRef, RecordBatch};
use arrow::datatypes::SchemaRef;
use arrow_schema::Schema;
use datafusion::common::Result;
Expand All @@ -8,8 +8,6 @@ use datafusion::physical_plan::{aggregates::PhysicalGroupBy, PhysicalExpr};
use datafusion_expr::GroupsAccumulator;
use std::sync::Arc;

use super::GroupValues;

/// Evaluates expressions against a record batch.
pub(crate) fn evaluate(
expr: &[Arc<dyn PhysicalExpr>],
Expand Down
3 changes: 1 addition & 2 deletions eggstrain/src/execution/operators/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ mod datafusion_aggregate;
use datafusion_aggregate::*;

mod group_values;
mod operator;
use operator::*;
pub mod operator;

use group_values::*;
35 changes: 18 additions & 17 deletions eggstrain/src/execution/operators/aggregate/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,15 @@
use crate::BATCH_SIZE;

use crate::execution::operators::{Operator, UnaryOperator};
use arrow::array::{ArrayRef, AsArray};
use arrow::array::AsArray;
use arrow::compute::concat_batches;
use arrow::compute::kernels::aggregate;
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use async_trait::async_trait;
use datafusion::logical_expr::EmitTo;
use datafusion::physical_expr::equivalence::ProjectionMapping;
use datafusion::physical_expr::{LexOrdering, LexRequirement, PhysicalSortExpr};
use datafusion::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::{AggregateExpr, InputOrderMode, PhysicalExpr};
use std::io::SeekFrom;
// use datafusion_common::Result;
use datafusion::physical_plan::{AggregateExpr, PhysicalExpr};
use datafusion_common::Result;
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
Expand Down Expand Up @@ -118,10 +114,7 @@ impl Aggregate {
let filter_values = evaluate_optional(&filter_expr, &merged_batch).unwrap();

//TODO: maybe this should be input schema not output schema
let group_schema = group_schema(
&Arc::clone(&output_schema),
group_by.clone().expr().len(),
);
let group_schema = group_schema(&Arc::clone(&output_schema), group_by.clone().expr().len());

let mut group_values_struct = new_group_values(group_schema).unwrap();

Expand All @@ -131,12 +124,12 @@ impl Aggregate {
.clone()
.iter()
.map(create_group_accumulator)
.collect::<Result<_, _>>()
.collect::<Result<_>>()
.unwrap();

for group_values in &group_by_values {
// calculate the group indices for each input row
let starting_num_groups = group_values_struct.len();
// let starting_num_groups = group_values_struct.len();
group_values_struct
.intern(group_values, &mut current_group_indices)
.unwrap();
Expand Down Expand Up @@ -169,7 +162,8 @@ impl Aggregate {

let schema = output_schema.clone();
if group_values_struct.is_empty() {
tx.send(RecordBatch::new_empty(schema));
tx.send(RecordBatch::new_empty(schema))
.expect("Unable to send empty Record Batch for aggregate");
return;
}

Expand Down Expand Up @@ -227,15 +221,22 @@ impl UnaryOperator for Aggregate {
}

let merged_batch = concat_batches(&self.input_schema, &batches).unwrap();
let limit_size = self.limit_size;


let aggregate_expressions = self.aggr_expr.clone();
let group_by = self.group_by.clone();
let filter_expr = self.filter_expr.clone();
let output_schema = self.output_schema.clone();

//TODO: maybe this *self is a problem
rayon::spawn(|| Aggregate::aggregate_sync(aggregate_expressions, group_by, filter_expr, output_schema, merged_batch, tx));
rayon::spawn(|| {
Aggregate::aggregate_sync(
aggregate_expressions,
group_by,
filter_expr,
output_schema,
merged_batch,
tx,
)
});
}
}
20 changes: 15 additions & 5 deletions eggstrain/src/execution/query_dag.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::operators::aggregate::operator::Aggregate;
use super::operators::filter::Filter;
use super::operators::project::Project;
use super::operators::sort::Sort;
Expand Down Expand Up @@ -25,11 +26,10 @@ enum EggstrainOperator {
Project(Arc<dyn UnaryOperator<In = RecordBatch, Out = RecordBatch>>),
Filter(Arc<dyn UnaryOperator<In = RecordBatch, Out = RecordBatch>>),
Sort(Arc<dyn UnaryOperator<In = RecordBatch, Out = RecordBatch>>),
Aggregate(Arc<dyn UnaryOperator<In = RecordBatch, Out = RecordBatch>>),

// TODO remove `dead_code` once implemented
#[allow(dead_code)]
Aggregate(Arc<dyn UnaryOperator<In = RecordBatch, Out = RecordBatch>>),
#[allow(dead_code)]
TableScan(Arc<dyn UnaryOperator<In = RecordBatch, Out = RecordBatch>>),
#[allow(dead_code)]
HashJoin(
Expand All @@ -44,6 +44,7 @@ impl EggstrainOperator {
Self::Project(x) => x.children(),
Self::Filter(x) => x.children(),
Self::Sort(x) => x.children(),
Self::Aggregate(x) => x.children(),
_ => unimplemented!(),
}
}
Expand Down Expand Up @@ -88,15 +89,23 @@ fn parse_execution_plan_root(plan: &Arc<dyn ExecutionPlan>) -> Result<EggstrainO
} else if id == TypeId::of::<SortExec>() {
let Some(sort_plan) = root.downcast_ref::<SortExec>() else {
return Err(DataFusionError::NotImplemented(
"Unable to downcast DataFusion ExecutionPlan to ProjectionExec".to_string(),
"Unable to downcast DataFusion ExecutionPlan to SortExec".to_string(),
));
};

let node = Sort::new(sort_plan);

Ok(EggstrainOperator::Sort(node.into_unary()))
} else if id == TypeId::of::<AggregateExec>() {
unimplemented!("Aggregate not implemented");
let Some(agg_plan) = root.downcast_ref::<AggregateExec>() else {
return Err(DataFusionError::NotImplemented(
"Unable to downcast DataFusion ExecutionPlan to AggregateExec".to_string(),
));
};

let node = Aggregate::new(agg_plan);

Ok(EggstrainOperator::Aggregate(node.into_unary()))
} else {
Err(DataFusionError::NotImplemented(
"Other operators not implemented".to_string(),
Expand Down Expand Up @@ -151,7 +160,8 @@ fn setup_unary_operator(
match node.clone() {
EggstrainOperator::Project(eggnode)
| EggstrainOperator::Filter(eggnode)
| EggstrainOperator::Sort(eggnode) => {
| EggstrainOperator::Sort(eggnode)
| EggstrainOperator::Aggregate(eggnode) => {
let tx = tx.clone();
tokio::spawn(async move {
eggnode.execute(child_rx, tx).await;
Expand Down
4 changes: 3 additions & 1 deletion eggstrain/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ async fn main() -> Result<()> {

// Run our execution engine on the physical plan
let df_physical_plan = sql.clone().create_physical_plan().await?;
let df_physical_plan = df_physical_plan.children()[0].clone();
println!("{:#?}", df_physical_plan.clone());
// let df_physical_plan = df_physical_plan.children()[0].clone();
// let df_physical_plan = df_physical_plan.children()[0].clone();
let results = run(df_physical_plan).await;

results.into_iter().for_each(|batch| {
Expand Down
8 changes: 8 additions & 0 deletions queries/basic_aggregate.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
SELECT
SUM(orders.o_totalprice), orders.o_shippriority
FROM
orders
WHERE
orders.o_shippriority > 2
GROUP BY orders.o_shippriority
;

0 comments on commit 60d4bd3

Please sign in to comment.