Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Allow nested is_in() in when()/then() for full-streaming #20052

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,18 +163,16 @@ pub(crate) fn insert_streaming_nodes(
execution_id += 1;
match lp_arena.get(root) {
Filter { input, predicate }
if is_streamable(
predicate.node(),
expr_arena,
IsStreamableContext::new(Default::default()),
) =>
if is_elementwise_rec(expr_arena.get(predicate.node()), expr_arena) =>
{
state.streamable = true;
state.operators_sinks.push(PipelineNode::Operator(root));
stack.push(StackFrame::new(*input, state, current_idx))
},
HStack { input, exprs, .. }
if all_streamable(exprs, expr_arena, Default::default()) =>
if exprs
.iter()
.all(|e| is_elementwise_rec(expr_arena.get(e.node()), expr_arena)) =>
{
state.streamable = true;
state.operators_sinks.push(PipelineNode::Operator(root));
Expand All @@ -201,11 +199,9 @@ pub(crate) fn insert_streaming_nodes(
stack.push(StackFrame::new(*input, state, current_idx))
},
Select { input, expr, .. }
if all_streamable(
expr,
expr_arena,
IsStreamableContext::new(Default::default()),
) =>
if expr
.iter()
.all(|e| is_elementwise_rec(expr_arena.get(e.node()), expr_arena)) =>
{
state.streamable = true;
state.operators_sinks.push(PipelineNode::Operator(root));
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-mem-engine/src/executors/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub struct ProjectionExec {
pub(crate) schema: SchemaRef,
pub(crate) options: ProjectionOptions,
// Can run all operations elementwise
pub(crate) streamable: bool,
pub(crate) allow_vertical_parallelism: bool,
}

impl ProjectionExec {
Expand All @@ -23,7 +23,7 @@ impl ProjectionExec {
mut df: DataFrame,
) -> PolarsResult<DataFrame> {
// Vertical and horizontal parallelism.
let df = if self.streamable
let df = if self.allow_vertical_parallelism
&& df.first_col_n_chunks() > 1
&& df.height() > POOL.current_num_threads() * 2
&& self.options.run_parallel
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-mem-engine/src/executors/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub struct StackExec {
pub(crate) output_schema: SchemaRef,
pub(crate) options: ProjectionOptions,
// Can run all operations elementwise
pub(crate) streamable: bool,
pub(crate) allow_vertical_parallelism: bool,
}

impl StackExec {
Expand All @@ -23,7 +23,7 @@ impl StackExec {
let schema = &*self.output_schema;

// Vertical and horizontal parallelism.
let df = if self.streamable
let df = if self.allow_vertical_parallelism
&& df.first_col_n_chunks() > 1
&& df.height() > 0
&& self.options.run_parallel
Expand Down
23 changes: 9 additions & 14 deletions crates/polars-mem-engine/src/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,8 @@ fn create_physical_plan_impl(
Ok(Box::new(executors::SliceExec { input, offset, len }))
},
Filter { input, predicate } => {
let mut streamable = is_streamable(
predicate.node(),
expr_arena,
IsStreamableContext::new(Context::Default).with_allow_cast_categorical(false),
);
let mut streamable =
is_elementwise_rec_no_cat_cast(expr_arena.get(predicate.node()), expr_arena);
let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
if streamable {
// This can cause problems with string caches
Expand Down Expand Up @@ -386,7 +383,7 @@ fn create_physical_plan_impl(
&mut state,
)?;

let streamable = options.should_broadcast && all_streamable(&expr, expr_arena, IsStreamableContext::new(Context::Default).with_allow_cast_categorical(false))
let allow_vertical_parallelism = options.should_broadcast && expr.iter().all(|e| is_elementwise_rec_no_cat_cast(expr_arena.get(e.node()), expr_arena))
// If all columns are literal we would get a 1 row per thread.
&& !phys_expr.iter().all(|p| {
p.is_literal()
Expand All @@ -400,7 +397,7 @@ fn create_physical_plan_impl(
#[cfg(test)]
schema: _schema,
options,
streamable,
allow_vertical_parallelism,
}))
},
Reduce {
Expand Down Expand Up @@ -635,12 +632,10 @@ fn create_physical_plan_impl(
let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
let input = create_physical_plan_impl(input, lp_arena, expr_arena, state)?;

let streamable = options.should_broadcast
&& all_streamable(
&exprs,
expr_arena,
IsStreamableContext::new(Context::Default).with_allow_cast_categorical(false),
);
let allow_vertical_parallelism = options.should_broadcast
&& exprs
.iter()
.all(|e| is_elementwise_rec_no_cat_cast(expr_arena.get(e.node()), expr_arena));

let mut state = ExpressionConversionState::new(
POOL.current_num_threads() > exprs.len(),
Expand All @@ -661,7 +656,7 @@ fn create_physical_plan_impl(
input_schema,
output_schema,
options,
streamable,
allow_vertical_parallelism,
}))
},
MapFunction {
Expand Down
57 changes: 32 additions & 25 deletions crates/polars-plan/src/plans/aexpr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub use scalar::is_scalar_ae;
use serde::{Deserialize, Serialize};
use strum_macros::IntoStaticStr;
pub use traverse::*;
pub(crate) use utils::permits_filter_pushdown;
pub use utils::*;

use crate::constants::LEN;
Expand Down Expand Up @@ -218,35 +219,41 @@ impl AExpr {
pub(crate) fn col(name: PlSmallStr) -> Self {
AExpr::Column(name)
}
/// Any expression that is sensitive to the number of elements in a group
/// - Aggregations
/// - Sorts
/// - Counts
/// - ..
pub(crate) fn groups_sensitive(&self) -> bool {

/// Checks whether this expression is elementwise. This only checks the top level expression.
pub(crate) fn is_elementwise_top_level(&self) -> bool {
Copy link
Collaborator Author

@nameexhaustion nameexhaustion Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is_elementwise_top_level() replaces groups_sensitive(). It is also changed to consider Explode and Filter as non-elementwise

use AExpr::*;

match self {
Function { options, .. } | AnonymousFunction { options, .. } => {
options.is_groups_sensitive()
}
Sort { .. }
| SortBy { .. }
| Agg { .. }
| Window { .. }
AnonymousFunction { options, .. } => options.is_elementwise(),

// Non-strict strptime must be done in-memory to ensure the format
// is consistent across the entire dataframe.
#[cfg(feature = "strings")]
Function {
options,
function: FunctionExpr::StringExpr(StringFunction::Strptime(_, opts)),
..
} => {
assert!(options.is_elementwise());
opts.strict
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rule was moved from new-streaming below

},

Function { options, .. } => options.is_elementwise(),

Literal(v) => v.projects_as_scalar(),

Alias(_, _) | BinaryExpr { .. } | Column(_) | Ternary { .. } | Cast { .. } => true,

Agg { .. }
| Explode(_)
| Filter { .. }
| Gather { .. }
| Len
| Slice { .. }
| Gather { .. }
=> true,
Alias(_, _)
| Explode(_)
| Column(_)
| Literal(_)
// a caller should traverse binary and ternary
// to determine if the whole expr. is group sensitive
| BinaryExpr { .. }
| Ternary { .. }
| Cast { .. }
| Filter { .. } => false,
| Sort { .. }
| SortBy { .. }
| Window { .. } => false,
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/plans/aexpr/traverse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::*;

impl AExpr {
/// Push nodes at this level to a pre-allocated stack.
pub(crate) fn nodes<C: PushNode>(&self, container: &mut C) {
pub(crate) fn nodes(&self, container: &mut impl PushNode) {
use AExpr::*;

match self {
Expand Down
Loading
Loading