Skip to content

Commit

Permalink
[Minor]: Remove input_schema field from window executor (#7810)
Browse files Browse the repository at this point in the history
* Initial commit

* Remove input schema from proto
  • Loading branch information
mustafasrepo authored Oct 13, 2023
1 parent 3ccbcfc commit 485b80e
Show file tree
Hide file tree
Showing 12 changed files with 5 additions and 75 deletions.
3 changes: 0 additions & 3 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,20 +608,17 @@ fn analyze_window_sort_removal(
add_sort_above(&mut window_child, sort_expr, None)?;

let uses_bounded_memory = window_expr.iter().all(|e| e.uses_bounded_memory());
let input_schema = window_child.schema();
let new_window = if uses_bounded_memory {
Arc::new(BoundedWindowAggExec::try_new(
window_expr.to_vec(),
window_child,
input_schema,
partitionby_exprs.to_vec(),
PartitionSearchMode::Sorted,
)?) as _
} else {
Arc::new(WindowAggExec::try_new(
window_expr.to_vec(),
window_child,
input_schema,
partitionby_exprs.to_vec(),
)?) as _
};
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ pub fn bounded_window_exec(
)
.unwrap()],
input.clone(),
input.schema(),
vec![],
crate::physical_plan::windows::PartitionSearchMode::Sorted,
)
Expand Down
2 changes: 0 additions & 2 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,15 +751,13 @@ impl DefaultPhysicalPlanner {
Arc::new(BoundedWindowAggExec::try_new(
window_expr,
input_exec,
physical_input_schema,
physical_partition_keys,
PartitionSearchMode::Sorted,
)?)
} else {
Arc::new(WindowAggExec::try_new(
window_expr,
input_exec,
physical_input_schema,
physical_partition_keys,
)?)
})
Expand Down
2 changes: 0 additions & 2 deletions datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,6 @@ async fn run_window_test(
)
.unwrap()],
exec1,
schema.clone(),
vec![],
)
.unwrap(),
Expand All @@ -484,7 +483,6 @@ async fn run_window_test(
)
.unwrap()],
exec2,
schema.clone(),
vec![],
search_mode,
)
Expand Down
14 changes: 2 additions & 12 deletions datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ pub struct BoundedWindowAggExec {
window_expr: Vec<Arc<dyn WindowExpr>>,
/// Schema after the window is run
schema: SchemaRef,
/// Schema before the window
input_schema: SchemaRef,
/// Partition Keys
pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
/// Execution metrics
Expand All @@ -110,11 +108,10 @@ impl BoundedWindowAggExec {
pub fn try_new(
window_expr: Vec<Arc<dyn WindowExpr>>,
input: Arc<dyn ExecutionPlan>,
input_schema: SchemaRef,
partition_keys: Vec<Arc<dyn PhysicalExpr>>,
partition_search_mode: PartitionSearchMode,
) -> Result<Self> {
let schema = create_schema(&input_schema, &window_expr)?;
let schema = create_schema(&input.schema(), &window_expr)?;
let schema = Arc::new(schema);
let partition_by_exprs = window_expr[0].partition_by();
let ordered_partition_by_indices = match &partition_search_mode {
Expand All @@ -140,7 +137,6 @@ impl BoundedWindowAggExec {
input,
window_expr,
schema,
input_schema,
partition_keys,
metrics: ExecutionPlanMetricsSet::new(),
partition_search_mode,
Expand All @@ -158,11 +154,6 @@ impl BoundedWindowAggExec {
&self.input
}

/// Get the input schema before any window functions are applied
pub fn input_schema(&self) -> SchemaRef {
self.input_schema.clone()
}

/// Return the output sort order of partition keys: For example
/// OVER(PARTITION BY a, ORDER BY b) -> would give sorting of the column a
// We are sure that partition by columns are always at the beginning of sort_keys
Expand Down Expand Up @@ -303,7 +294,6 @@ impl ExecutionPlan for BoundedWindowAggExec {
Ok(Arc::new(BoundedWindowAggExec::try_new(
self.window_expr.clone(),
children[0].clone(),
self.input_schema.clone(),
self.partition_keys.clone(),
self.partition_search_mode.clone(),
)?))
Expand Down Expand Up @@ -333,7 +323,7 @@ impl ExecutionPlan for BoundedWindowAggExec {
fn statistics(&self) -> Statistics {
let input_stat = self.input.statistics();
let win_cols = self.window_expr.len();
let input_cols = self.input_schema.fields().len();
let input_cols = self.input.schema().fields().len();
// TODO stats: some windowing function will maintain invariants such as min, max...
let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
if let Some(input_col_stats) = input_stat.column_statistics {
Expand Down
3 changes: 0 additions & 3 deletions datafusion/physical-plan/src/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,6 @@ pub fn get_best_fitting_window(
Ok(Some(Arc::new(BoundedWindowAggExec::try_new(
window_expr,
input.clone(),
input.schema(),
physical_partition_keys.to_vec(),
partition_search_mode,
)?) as _))
Expand All @@ -435,7 +434,6 @@ pub fn get_best_fitting_window(
Ok(Some(Arc::new(WindowAggExec::try_new(
window_expr,
input.clone(),
input.schema(),
physical_partition_keys.to_vec(),
)?) as _))
}
Expand Down Expand Up @@ -759,7 +757,6 @@ mod tests {
schema.as_ref(),
)?],
blocking_exec,
schema,
vec![],
)?);

Expand Down
14 changes: 2 additions & 12 deletions datafusion/physical-plan/src/windows/window_agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ pub struct WindowAggExec {
window_expr: Vec<Arc<dyn WindowExpr>>,
/// Schema after the window is run
schema: SchemaRef,
/// Schema before the window
input_schema: SchemaRef,
/// Partition Keys
pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
/// Execution metrics
Expand All @@ -75,10 +73,9 @@ impl WindowAggExec {
pub fn try_new(
window_expr: Vec<Arc<dyn WindowExpr>>,
input: Arc<dyn ExecutionPlan>,
input_schema: SchemaRef,
partition_keys: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Self> {
let schema = create_schema(&input_schema, &window_expr)?;
let schema = create_schema(&input.schema(), &window_expr)?;
let schema = Arc::new(schema);

let ordered_partition_by_indices =
Expand All @@ -87,7 +84,6 @@ impl WindowAggExec {
input,
window_expr,
schema,
input_schema,
partition_keys,
metrics: ExecutionPlanMetricsSet::new(),
ordered_partition_by_indices,
Expand All @@ -104,11 +100,6 @@ impl WindowAggExec {
&self.input
}

/// Get the input schema before any window functions are applied
pub fn input_schema(&self) -> SchemaRef {
self.input_schema.clone()
}

/// Return the output sort order of partition keys: For example
/// OVER(PARTITION BY a, ORDER BY b) -> would give sorting of the column a
// We are sure that partition by columns are always at the beginning of sort_keys
Expand Down Expand Up @@ -230,7 +221,6 @@ impl ExecutionPlan for WindowAggExec {
Ok(Arc::new(WindowAggExec::try_new(
self.window_expr.clone(),
children[0].clone(),
self.input_schema.clone(),
self.partition_keys.clone(),
)?))
}
Expand Down Expand Up @@ -259,7 +249,7 @@ impl ExecutionPlan for WindowAggExec {
fn statistics(&self) -> Statistics {
let input_stat = self.input.statistics();
let win_cols = self.window_expr.len();
let input_cols = self.input_schema.fields().len();
let input_cols = self.input.schema().fields().len();
// TODO stats: some windowing function will maintain invariants such as min, max...
let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
if let Some(input_col_stats) = input_stat.column_statistics {
Expand Down
1 change: 0 additions & 1 deletion datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1419,7 +1419,6 @@ message PartiallySortedPartitionSearchMode {
message WindowAggExecNode {
PhysicalPlanNode input = 1;
repeated PhysicalWindowExprNode window_expr = 2;
Schema input_schema = 4;
repeated PhysicalExprNode partition_keys = 5;
// Set optional to `None` for `BoundedWindowAggExec`.
oneof partition_search_mode {
Expand Down
18 changes: 0 additions & 18 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 1 addition & 18 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,16 +282,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
runtime,
extension_codec,
)?;
let input_schema = window_agg
.input_schema
.as_ref()
.ok_or_else(|| {
DataFusionError::Internal(
"input_schema in WindowAggrNode is missing.".to_owned(),
)
})?
.clone();
let input_schema: SchemaRef = SchemaRef::new((&input_schema).try_into()?);
let input_schema = input.schema();

let physical_window_expr: Vec<Arc<dyn WindowExpr>> = window_agg
.window_expr
Expand Down Expand Up @@ -333,15 +324,13 @@ impl AsExecutionPlan for PhysicalPlanNode {
Ok(Arc::new(BoundedWindowAggExec::try_new(
physical_window_expr,
input,
input_schema,
partition_keys,
partition_search_mode,
)?))
} else {
Ok(Arc::new(WindowAggExec::try_new(
physical_window_expr,
input,
input_schema,
partition_keys,
)?))
}
Expand Down Expand Up @@ -1315,8 +1304,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
extension_codec,
)?;

let input_schema = protobuf::Schema::try_from(exec.input_schema().as_ref())?;

let window_expr =
exec.window_expr()
.iter()
Expand All @@ -1334,7 +1321,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
protobuf::WindowAggExecNode {
input: Some(Box::new(input)),
window_expr,
input_schema: Some(input_schema),
partition_keys,
partition_search_mode: None,
},
Expand All @@ -1346,8 +1332,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
extension_codec,
)?;

let input_schema = protobuf::Schema::try_from(exec.input_schema().as_ref())?;

let window_expr =
exec.window_expr()
.iter()
Expand Down Expand Up @@ -1385,7 +1369,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
protobuf::WindowAggExecNode {
input: Some(Box::new(input)),
window_expr,
input_schema: Some(input_schema),
partition_keys,
partition_search_mode: Some(partition_search_mode),
},
Expand Down
1 change: 0 additions & 1 deletion datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,6 @@ fn roundtrip_window() -> Result<()> {
sliding_aggr_window_expr,
],
input,
schema.clone(),
vec![col("b", &schema)?],
)?))
}
Expand Down

0 comments on commit 485b80e

Please sign in to comment.