Skip to content

Commit

Permalink
feat(rust, python): add relaxed concatenation (pola-rs#9382)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored and c-peters committed Jul 14, 2023
1 parent 81225f9 commit 1138b73
Show file tree
Hide file tree
Showing 17 changed files with 228 additions and 51 deletions.
16 changes: 16 additions & 0 deletions polars/polars-core/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize};
use smartstring::alias::String as SmartString;

use crate::prelude::*;
use crate::utils::try_get_supertype;

/// A map from field/column name (`String`) to the type of that field/column (`DataType`)
#[derive(Eq, Clone, Default)]
Expand Down Expand Up @@ -367,6 +368,21 @@ impl Schema {
pub fn iter(&self) -> impl Iterator<Item = (&SmartString, &DataType)> + '_ {
self.inner.iter()
}

/// Take another [`Schema`] and try to find the supertypes between them.
pub fn to_supertype(&mut self, other: &Schema) -> PolarsResult<bool> {
polars_ensure!(self.len() == other.len(), ComputeError: "schema lengths differ");

let mut changed = false;
for ((k, dt), (other_k, other_dt)) in self.inner.iter_mut().zip(other.iter()) {
polars_ensure!(k == other_k, ComputeError: "schema names differ: got {}, expected {}", k, other_k);

let st = try_get_supertype(dt, other_dt)?;
changed |= &st != dt;
*dt = st
}
Ok(changed)
}
}

pub type SchemaRef = Arc<Schema>;
Expand Down
88 changes: 78 additions & 10 deletions polars/polars-lazy/src/dsl/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub(crate) fn concat_impl<L: AsRef<[LazyFrame]>>(
rechunk: bool,
parallel: bool,
from_partitioned_ds: bool,
convert_supertypes: bool,
) -> PolarsResult<LazyFrame> {
let mut inputs = inputs.as_ref().to_vec();

Expand All @@ -30,7 +31,7 @@ pub(crate) fn concat_impl<L: AsRef<[LazyFrame]>>(
..Default::default()
};

match &mut lf.logical_plan {
let lf = match &mut lf.logical_plan {
// re-use the same union
LogicalPlan::Union {
inputs: existing_inputs,
Expand All @@ -42,7 +43,7 @@ pub(crate) fn concat_impl<L: AsRef<[LazyFrame]>>(
let lp = std::mem::take(&mut lf.logical_plan);
existing_inputs.push(lp)
}
Ok(lf)
lf
}
_ => {
let mut lps = Vec::with_capacity(inputs.len());
Expand All @@ -62,8 +63,49 @@ pub(crate) fn concat_impl<L: AsRef<[LazyFrame]>>(
let mut lf = LazyFrame::from(lp);
lf.opt_state = opt_state;

Ok(lf)
lf
}
};

if convert_supertypes {
let LogicalPlan::Union {mut inputs, options} = lf.logical_plan else { unreachable!()} ;
let mut schema = inputs[0].schema()?.as_ref().as_ref().clone();

let mut changed = false;
for input in inputs[1..].iter() {
changed |= schema.to_supertype(input.schema()?.as_ref().as_ref())?;
}

let mut placeholder = LogicalPlan::default();
if changed {
let mut exprs = vec![];
for input in &mut inputs {
std::mem::swap(input, &mut placeholder);
let input_schema = placeholder.schema()?;

exprs.clear();
let to_cast = input_schema.iter().zip(schema.iter_dtypes()).flat_map(
|((left_name, left_type), st)| {
if left_type != st {
Some(col(left_name.as_ref()).cast(st.clone()))
} else {
None
}
},
);
exprs.extend(to_cast);
let mut lf = LazyFrame::from(placeholder);
if !exprs.is_empty() {
lf = lf.with_columns(exprs.as_slice());
}

placeholder = lf.logical_plan;
std::mem::swap(&mut placeholder, input);
}
}
Ok(LazyFrame::from(LogicalPlan::Union { inputs, options }))
} else {
Ok(lf)
}
}

Expand Down Expand Up @@ -120,16 +162,42 @@ pub fn diag_concat_lf<L: AsRef<[LazyFrame]>>(
})
.collect::<PolarsResult<Vec<_>>>()?;

concat(lfs_with_all_columns, rechunk, parallel)
concat(
lfs_with_all_columns,
UnionArgs {
rechunk,
parallel,
to_supertypes: false,
},
)
}

#[derive(Clone, Copy)]
pub struct UnionArgs {
pub parallel: bool,
pub rechunk: bool,
pub to_supertypes: bool,
}

impl Default for UnionArgs {
fn default() -> Self {
Self {
parallel: true,
rechunk: true,
to_supertypes: false,
}
}
}

/// Concat multiple
pub fn concat<L: AsRef<[LazyFrame]>>(
inputs: L,
rechunk: bool,
parallel: bool,
) -> PolarsResult<LazyFrame> {
concat_impl(inputs, rechunk, parallel, false)
pub fn concat<L: AsRef<[LazyFrame]>>(inputs: L, args: UnionArgs) -> PolarsResult<LazyFrame> {
concat_impl(
inputs,
args.rechunk,
args.parallel,
false,
args.to_supertypes,
)
}

/// Collect all `LazyFrame` computations.
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/frame/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,6 @@ impl LazyFileListReader for LazyCsvReader<'_> {

fn concat_impl(&self, lfs: Vec<LazyFrame>) -> PolarsResult<LazyFrame> {
// set to false, as the csv parser has full thread utilization
concat_impl(&lfs, self.rechunk(), false, true)
concat_impl(&lfs, self.rechunk(), false, true, false)
}
}
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/frame/file_list_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub trait LazyFileListReader: Clone {
/// This method should not take into consideration [LazyFileListReader::n_rows]
/// nor [LazyFileListReader::row_count].
fn concat_impl(&self, lfs: Vec<LazyFrame>) -> PolarsResult<LazyFrame> {
concat_impl(&lfs, self.rechunk(), true, true)
concat_impl(&lfs, self.rechunk(), true, true, false)
}

/// Get the final [LazyFrame].
Expand Down
9 changes: 8 additions & 1 deletion polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1265,7 +1265,14 @@ impl LazyFrame {
// this trick allows us to reuse the `Union` architecture to get map over
// two DataFrames
let left = self.map_private(FunctionNode::Rechunk);
let q = concat(&[left, other], false, true)?;
let q = concat(
&[left, other],
UnionArgs {
rechunk: false,
parallel: true,
..Default::default()
},
)?;
Ok(q.map_private(FunctionNode::MergeSorted {
column: Arc::from(key),
}))
Expand Down
22 changes: 17 additions & 5 deletions polars/polars-lazy/src/tests/cse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,16 @@ fn test_cse_unions() -> PolarsResult<()> {

let lf1 = lf.clone().with_column(col("category").str().to_uppercase());

let lf = concat(&[lf1.clone(), lf, lf1], false, false)?
.select([col("category"), col("fats_g")])
.with_common_subplan_elimination(true);
let lf = concat(
&[lf1.clone(), lf, lf1],
UnionArgs {
rechunk: false,
parallel: false,
..Default::default()
},
)?
.select([col("category"), col("fats_g")])
.with_common_subplan_elimination(true);

let (mut expr_arena, mut lp_arena) = get_arenas();
let lp = lf.clone().optimize(&mut lp_arena, &mut expr_arena).unwrap();
Expand Down Expand Up @@ -108,8 +115,13 @@ fn test_cse_union2_4925() -> PolarsResult<()> {
]?
.lazy();

let lf1 = concat(&[lf1.clone(), lf1], false, false)?;
let lf2 = concat(&[lf2.clone(), lf2], false, false)?;
let args = UnionArgs {
parallel: false,
rechunk: false,
..Default::default()
};
let lf1 = concat(&[lf1.clone(), lf1], args)?;
let lf2 = concat(&[lf2.clone(), lf2], args)?;

let q = lf1.inner_join(lf2, col("ts"), col("ts")).select([
col("ts"),
Expand Down
11 changes: 8 additions & 3 deletions polars/polars-lazy/src/tests/optimization_checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,9 +573,14 @@ fn test_flatten_unions() -> PolarsResult<()> {
.unwrap()
.lazy();

let lf2 = concat(&[lf.clone(), lf.clone()], false, true).unwrap();
let lf3 = concat(&[lf.clone(), lf.clone(), lf.clone()], false, true).unwrap();
let lf4 = concat(&[lf2.clone(), lf3], false, true).unwrap();
let args = UnionArgs {
rechunk: false,
parallel: true,
..Default::default()
};
let lf2 = concat(&[lf.clone(), lf.clone()], args).unwrap();
let lf3 = concat(&[lf.clone(), lf.clone(), lf.clone()], args).unwrap();
let lf4 = concat(&[lf2.clone(), lf3], args).unwrap();
let root = lf4.optimize(&mut lp_arena, &mut expr_arena).unwrap();
let lp = lp_arena.get(root);
match lp {
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/tests/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ fn test_streaming_glob() -> PolarsResult<()> {
#[test]
fn test_streaming_union_order() -> PolarsResult<()> {
let q = get_csv_glob();
let q = concat([q.clone(), q], false, false)?;
let q = concat([q.clone(), q], Default::default())?;
let q = q.select([col("sugars_g"), col("calories")]);

assert_streaming_with_default(q, true, false);
Expand Down
22 changes: 11 additions & 11 deletions polars/polars-sql/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,20 +170,20 @@ impl SQLContext {
quantifier: &SetQuantifier,
query: &Query,
) -> PolarsResult<LazyFrame> {
let left = self.process_set_expr(left, query)?;
let right = self.process_set_expr(right, query)?;
let concatenated = polars_lazy::dsl::concat(
vec![left, right],
UnionArgs {
parallel: true,
..Default::default()
},
);
match quantifier {
// UNION ALL
SetQuantifier::All => {
let left = self.process_set_expr(left, query)?;
let right = self.process_set_expr(right, query)?;
polars_lazy::dsl::concat(vec![left, right], false, true)
}
SetQuantifier::All => concatenated,
// UNION DISTINCT | UNION
_ => {
let left = self.process_set_expr(left, query)?;
let right = self.process_set_expr(right, query)?;
Ok(polars_lazy::dsl::concat(vec![left, right], true, true)?
.unique(None, UniqueKeepStrategy::Any))
}
_ => concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any)),
}
}
// EXPLAIN SELECT * FROM DF
Expand Down
15 changes: 11 additions & 4 deletions polars/polars-sql/tests/statements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,17 @@ fn test_union_all() {
SELECT * FROM test2
)
"#;
let expected = polars_lazy::dsl::concat(vec![df1.lazy(), df2.lazy()], false, true)
.unwrap()
.collect()
.unwrap();
let expected = polars_lazy::dsl::concat(
vec![df1.lazy(), df2.lazy()],
UnionArgs {
rechunk: false,
parallel: true,
..Default::default()
},
)
.unwrap()
.collect()
.unwrap();

let actual = ctx.execute(sql).unwrap().collect().unwrap();
assert!(actual.frame_equal(&expected));
Expand Down
15 changes: 11 additions & 4 deletions polars/tests/it/lazy/cse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,17 @@ fn test_cse_union_schema_6504() -> PolarsResult<()> {
.with_column(lit(0).alias("a"))
.select([col("a"), col("b")]);

let out = concat([q1, q3], true, true)
.unwrap()
.with_common_subplan_elimination(true)
.collect()?;
let out = concat(
[q1, q3],
UnionArgs {
rechunk: false,
parallel: false,
..Default::default()
},
)
.unwrap()
.with_common_subplan_elimination(true)
.collect()?;
let expected = df![
"a" => [1, 0],
"b" => [2, 1],
Expand Down
13 changes: 10 additions & 3 deletions polars/tests/it/lazy/predicate_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,16 @@ fn test_count_blocked_at_union_3963() -> PolarsResult<()> {
]?;

for rechunk in [true, false] {
let out = concat([lf1.clone(), lf2.clone()], rechunk, true)?
.filter(count().over([col("k")]).gt(lit(1)))
.collect()?;
let out = concat(
[lf1.clone(), lf2.clone()],
UnionArgs {
rechunk,
parallel: true,
..Default::default()
},
)?
.filter(count().over([col("k")]).gt(lit(1)))
.collect()?;

assert!(out.frame_equal(&expected));
}
Expand Down
14 changes: 11 additions & 3 deletions py-polars/polars/functions/eager.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ def concat(
LazyFrames do not support the `horizontal` strategy.
* vertical: Applies multiple `vstack` operations.
* vertical_relaxed: Applies multiple `vstack` operations and coerces column
dtypes that are not equal to their supertypes.
* diagonal: Finds a union between the column schemas and fills missing column
values with ``null``.
* horizontal: Stacks Series from DataFrames horizontally and fills with ``null``
Expand Down Expand Up @@ -162,23 +164,29 @@ def concat(
if isinstance(first, pl.DataFrame):
if how == "vertical":
out = wrap_df(plr.concat_df(elems))
elif how == "vertical_relaxed":
out = wrap_ldf(
plr.concat_lf([df.lazy() for df in elems], rechunk, parallel, True)
).collect(no_optimization=True)
elif how == "diagonal":
out = wrap_df(plr.diag_concat_df(elems))
elif how == "horizontal":
out = wrap_df(plr.hor_concat_df(elems))
else:
raise ValueError(
f"`how` must be one of {{'vertical','diagonal','horizontal','align'}}, "
f"`how` must be one of {{'vertical','vertical_relaxed','diagonal','horizontal','align'}}, "
f"got {how!r}"
)
elif isinstance(first, pl.LazyFrame):
if how == "vertical":
return wrap_ldf(plr.concat_lf(elems, rechunk, parallel))
return wrap_ldf(plr.concat_lf(elems, rechunk, parallel, False))
if how == "vertical_relaxed":
return wrap_ldf(plr.concat_lf(elems, rechunk, parallel, True))
if how == "diagonal":
return wrap_ldf(plr.diag_concat_lf(elems, rechunk, parallel))
else:
raise ValueError(
"'LazyFrame' only allows {'vertical','diagonal','align'} concat strategies."
"'LazyFrame' only allows {'vertical','vertical_relaxed','diagonal','align'} concat strategies."
)
elif isinstance(first, pl.Series):
if how == "vertical":
Expand Down
Loading

0 comments on commit 1138b73

Please sign in to comment.