Skip to content

Commit

Permalink
refactor(rust): Add pl.length() reduction and small new-streaming fix…
Browse files Browse the repository at this point in the history
…es (#18429)
  • Loading branch information
orlp authored Aug 28, 2024
1 parent 79cffee commit a00e37b
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 38 deletions.
36 changes: 18 additions & 18 deletions crates/polars-expr/src/reduce/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,15 @@ use polars_utils::arena::{Arena, Node};
use super::extrema::*;
use super::sum::SumReduce;
use super::*;
use crate::reduce::len::LenReduce;
use crate::reduce::mean::MeanReduce;

pub fn can_convert_into_reduction(node: Node, expr_arena: &Arena<AExpr>) -> bool {
match expr_arena.get(node) {
AExpr::Agg(agg) => matches!(
agg,
IRAggExpr::Min { .. }
| IRAggExpr::Max { .. }
| IRAggExpr::Mean { .. }
| IRAggExpr::Sum(_)
),
_ => false,
}
}

/// Converts a node into a reduction + its associated selector expression.
pub fn into_reduction(
node: Node,
expr_arena: &Arena<AExpr>,
expr_arena: &mut Arena<AExpr>,
schema: &Schema,
) -> PolarsResult<Option<(Box<dyn Reduction>, Node)>> {
) -> PolarsResult<(Box<dyn Reduction>, Node)> {
let e = expr_arena.get(node);
let field = e.to_field(schema, Context::Default, expr_arena)?;
let out = match expr_arena.get(node) {
Expand Down Expand Up @@ -74,9 +63,20 @@ pub fn into_reduction(
let out: Box<dyn Reduction> = Box::new(MeanReduce::new(field.dtype.clone()));
(out, *input)
},
_ => return Ok(None),
_ => unreachable!(),
},
AExpr::Len => {
// Compute length on the first column, or if none exist we'll never
// be called and correctly return 0 as length anyway.
let out: Box<dyn Reduction> = Box::new(LenReduce::new());
let expr = if let Some(first_column) = schema.iter_names().next() {
expr_arena.add(AExpr::Column(first_column.as_str().into()))
} else {
expr_arena.add(AExpr::Literal(LiteralValue::Null))
};
(out, expr)
},
_ => return Ok(None),
_ => unreachable!(),
};
Ok(Some(out))
Ok(out)
}
45 changes: 45 additions & 0 deletions crates/polars-expr/src/reduce/len.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use polars_core::error::constants::LENGTH_LIMIT_MSG;

use super::*;

#[derive(Clone)]
pub struct LenReduce {
len: u64,
}

impl LenReduce {
pub(crate) fn new() -> Self {
Self { len: 0 }
}
}

impl Reduction for LenReduce {
fn init_dyn(&self) -> Box<dyn Reduction> {
Box::new(Self::new())
}

fn reset(&mut self) {
self.len = 0;
}

fn update(&mut self, batch: &Series) -> PolarsResult<()> {
self.len += batch.len() as u64;
Ok(())
}

fn combine(&mut self, other: &dyn Reduction) -> PolarsResult<()> {
let other = other.as_any().downcast_ref::<Self>().unwrap();
self.len += other.len;
Ok(())
}

fn finalize(&mut self) -> PolarsResult<Scalar> {
#[allow(clippy::useless_conversion)]
let as_idx: IdxSize = self.len.try_into().expect(LENGTH_LIMIT_MSG);
Ok(Scalar::new(IDX_DTYPE, as_idx.into()))
}

fn as_any(&self) -> &dyn Any {
self
}
}
2 changes: 0 additions & 2 deletions crates/polars-expr/src/reduce/mean.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use polars_core::utils::Container;

use super::*;

#[derive(Clone)]
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-expr/src/reduce/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
mod convert;
mod extrema;
mod len;
mod mean;
mod sum;

use std::any::Any;

pub use convert::{can_convert_into_reduction, into_reduction};
pub use convert::into_reduction;
use polars_core::prelude::*;

#[allow(dead_code)]
Expand Down
26 changes: 17 additions & 9 deletions crates/polars-stream/src/physical_plan/lower_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,8 @@ fn lower_exprs_with_ctx(
let (trans_input, trans_exprs) = lower_exprs_with_ctx(input, &[inner], ctx)?;
let exploded_name = unique_column_name();
let trans_inner = ctx.expr_arena.add(AExpr::Explode(trans_exprs[0]));
let explode_expr = ExprIR::new(trans_inner, OutputName::Alias(exploded_name.clone()));
let explode_expr =
ExprIR::new(trans_inner, OutputName::Alias(exploded_name.clone()));
let output_schema = schema_for_select(trans_input, &[explode_expr.clone()], ctx)?;
let node_kind = PhysNodeKind::Select {
input: trans_input,
Expand Down Expand Up @@ -596,19 +597,26 @@ fn lower_exprs_with_ctx(
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));
},
},
AExpr::AnonymousFunction {
..
}
| AExpr::Function {
..
}
| AExpr::Len // TODO: this one makes me really sad, make this streaming ASAP.
AExpr::Len => {
let out_name = unique_column_name();
let expr_ir = ExprIR::new(expr, OutputName::Alias(out_name.clone()));
let output_schema = schema_for_select(input, &[expr_ir.clone()], ctx)?;
let kind = PhysNodeKind::Reduce {
input,
exprs: vec![expr_ir],
};
let reduce_node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));
input_nodes.insert(reduce_node_key);
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));
},
AExpr::AnonymousFunction { .. }
| AExpr::Function { .. }
| AExpr::Slice { .. }
| AExpr::Window { .. } => {
let out_name = unique_column_name();
fallback_subset.push(ExprIR::new(expr, OutputName::Alias(out_name.clone())));
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));
}
},
}
}

Expand Down
7 changes: 3 additions & 4 deletions crates/polars-stream/src/physical_plan/to_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ fn create_stream_expr(

struct GraphConversionContext<'a> {
phys_sm: &'a SlotMap<PhysNodeKey, PhysNode>,
expr_arena: &'a Arena<AExpr>,
expr_arena: &'a mut Arena<AExpr>,
graph: Graph,
phys_to_graph: SecondaryMap<PhysNodeKey, GraphNodeKey>,
expr_conversion_state: ExpressionConversionState,
Expand All @@ -56,7 +56,7 @@ struct GraphConversionContext<'a> {
pub fn physical_plan_to_graph(
root: PhysNodeKey,
phys_sm: &SlotMap<PhysNodeKey, PhysNode>,
expr_arena: &Arena<AExpr>,
expr_arena: &mut Arena<AExpr>,
) -> PolarsResult<(Graph, SecondaryMap<PhysNodeKey, GraphNodeKey>)> {
let expr_depth_limit = get_expr_depth_limit()?;
let mut ctx = GraphConversionContext {
Expand Down Expand Up @@ -138,8 +138,7 @@ fn to_graph_rec<'a>(
let mut inputs = Vec::with_capacity(reductions.len());

for e in exprs {
let (red, input_node) =
into_reduction(e.node(), ctx.expr_arena, input_schema)?.expect("invariant");
let (red, input_node) = into_reduction(e.node(), ctx.expr_arena, input_schema)?;
reductions.push(red);

let input_phys =
Expand Down
1 change: 1 addition & 0 deletions py-polars/tests/unit/constructors/test_constructors.py
Original file line number Diff line number Diff line change
Expand Up @@ -1677,6 +1677,7 @@ def __arrow_c_array__(self, requested_schema: object = None) -> object:


def test_pycapsule_interface(df: pl.DataFrame) -> None:
df = df.rechunk()
pyarrow_table = df.to_arrow()

# Array via C data interface
Expand Down
16 changes: 12 additions & 4 deletions py-polars/tests/unit/test_projections.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,19 @@ def test_unnest_projection_pushdown() -> None:
pl.col("field_2").cast(pl.Categorical).alias("col"),
pl.col("value"),
)
out = mlf.collect().to_dict(as_series=False)

out = (
mlf.sort(
[pl.col.row.cast(pl.String), pl.col.col.cast(pl.String)],
maintain_order=True,
)
.collect()
.to_dict(as_series=False)
)
assert out == {
"row": ["y", "y", "b", "b"],
"col": ["z", "z", "c", "c"],
"value": [1, 2, 2, 3],
"row": ["b", "b", "y", "y"],
"col": ["c", "c", "z", "z"],
"value": [2, 3, 1, 2],
}


Expand Down

0 comments on commit a00e37b

Please sign in to comment.