diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/builder.rs b/polars/polars-lazy/polars-plan/src/logical_plan/builder.rs index 03818698495c..12e172a6eaea 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/builder.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/builder.rs @@ -127,7 +127,7 @@ impl LogicalPlanBuilder { use polars_io::{is_cloud_url, SerReader as _}; let path = path.into(); - let file_info: PolarsResult = if is_cloud_url(&path) { + let (mut schema, num_rows) = if is_cloud_url(&path) { #[cfg(not(feature = "async"))] panic!( "One or more of the cloud storage features ('aws', 'gcp', ...) must be enabled." @@ -136,24 +136,22 @@ impl LogicalPlanBuilder { #[cfg(feature = "async")] { let uri = path.to_string_lossy(); - let (schema, num_rows) = - ParquetAsyncReader::file_info(&uri, cloud_options.as_ref())?; - Ok(FileInfo { - schema: Arc::new(schema), - row_estimation: (Some(num_rows), num_rows), - }) + ParquetAsyncReader::file_info(&uri, cloud_options.as_ref())? } } else { let file = std::fs::File::open(&path)?; let mut reader = ParquetReader::new(file); - let schema = Arc::new(reader.schema()?); - let num_rows = reader.num_rows()?; - Ok(FileInfo { - schema, - row_estimation: (Some(num_rows), num_rows), - }) + (reader.schema()?, reader.num_rows()?) + }; + + if let Some(rc) = &row_count { + let _ = schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE); + } + + let file_info = FileInfo { + schema: Arc::new(schema), + row_estimation: (Some(num_rows), num_rows), }; - let file_info = file_info?; Ok(LogicalPlan::ParquetScan { path, @@ -185,7 +183,12 @@ impl LogicalPlanBuilder { let path = path.into(); let file = std::fs::File::open(&path)?; let mut reader = IpcReader::new(file); - let schema = Arc::new(reader.schema()?); + + let mut schema = reader.schema()?; + if let Some(rc) = &options.row_count { + let _ = schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE); + } + let schema = Arc::new(schema); let num_rows = reader._num_rows()?; let file_info = FileInfo { @@ -211,7 +214,7 @@ impl LogicalPlanBuilder { mut skip_rows: usize, n_rows: Option, cache: bool, - schema: Option>, + mut schema: Option>, schema_overwrite: Option<&Schema>, low_memory: bool, comment_char: Option, @@ -239,7 +242,7 @@ impl LogicalPlanBuilder { // TODO! delay inferring schema until absolutely necessary // this needs a way to estimated bytes/rows. - let (inferred_schema, rows_read, bytes_read) = infer_file_schema( + let (mut inferred_schema, rows_read, bytes_read) = infer_file_schema( &reader_bytes, delimiter, infer_schema_length, @@ -254,6 +257,21 @@ impl LogicalPlanBuilder { try_parse_dates, )?; + if let Some(rc) = &row_count { + match schema { + None => { + let _ = inferred_schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE); + } + Some(inner) => { + schema = Some(Arc::new( + inner + .new_inserting_at_index(0, rc.name.as_str().into(), IDX_DTYPE) + .unwrap(), + )); + } + } + } + let schema = schema.unwrap_or_else(|| Arc::new(inferred_schema)); let n_bytes = reader_bytes.len(); let estimated_n_rows = (rows_read as f64 / bytes_read as f64 * n_bytes as f64) as usize; diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs index c4ac11311613..416c431be7f3 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs @@ -10,6 +10,7 @@ mod semi_anti_join; use polars_core::datatypes::PlHashSet; use polars_core::prelude::*; +use polars_io::RowCount; #[cfg(feature = "semi_anti_join")] use semi_anti_join::process_semi_anti_join; @@ -38,13 +39,23 @@ fn init_set() -> PlHashSet> { fn get_scan_columns( acc_projections: &mut Vec, expr_arena: &Arena, + row_count: Option<&RowCount>, ) -> Option>> { let mut with_columns = None; if !acc_projections.is_empty() { let mut columns = Vec::with_capacity(acc_projections.len()); for expr in acc_projections { for name in aexpr_to_leaf_names(*expr, expr_arena) { - columns.push((*name).to_owned()) + // we shouldn't project the row-count column, as that is generated + // in the scan + let push = match row_count { + Some(rc) if name.as_ref() != rc.name.as_str() => true, + None => true, + _ => false, + }; + if push { + columns.push((*name).to_owned()) + } } } with_columns = Some(Arc::new(columns)); @@ -116,10 +127,6 @@ fn update_scan_schema( acc_projections: &[Node], expr_arena: &Arena, schema: &Schema, - // this is only needed for parsers that sort the projections - // currently these are: - // sorting parsers: csv, - // non-sorting: parquet, ipc sort_projections: bool, ) -> PolarsResult { let mut new_schema = Schema::with_capacity(acc_projections.len()); @@ -355,7 +362,7 @@ impl ProjectionPushDown { output_schema, } => { if function.allows_projection_pushdown() { - options.with_columns = get_scan_columns(&mut acc_projections, expr_arena); + options.with_columns = get_scan_columns(&mut acc_projections, expr_arena, None); let output_schema = if options.with_columns.is_none() { None @@ -403,7 +410,7 @@ impl ProjectionPushDown { &schema, false, )?)); - projection = get_scan_columns(&mut acc_projections, expr_arena); + projection = get_scan_columns(&mut acc_projections, expr_arena, None); } let lp = DataFrameScan { df, @@ -422,7 +429,8 @@ impl ProjectionPushDown { mut options, .. } => { - let with_columns = get_scan_columns(&mut acc_projections, expr_arena); + let with_columns = + get_scan_columns(&mut acc_projections, expr_arena, options.row_count.as_ref()); let output_schema = if with_columns.is_none() { None } else { @@ -430,7 +438,7 @@ impl ProjectionPushDown { &acc_projections, expr_arena, &file_info.schema, - false, + options.row_count.is_some(), )?)) }; options.with_columns = with_columns; @@ -454,7 +462,8 @@ impl ProjectionPushDown { cloud_options, .. } => { - let with_columns = get_scan_columns(&mut acc_projections, expr_arena); + let with_columns = + get_scan_columns(&mut acc_projections, expr_arena, options.row_count.as_ref()); let output_schema = if with_columns.is_none() { None } else { @@ -462,7 +471,7 @@ impl ProjectionPushDown { &acc_projections, expr_arena, &file_info.schema, - false, + options.row_count.is_some(), )?)) }; options.with_columns = with_columns; @@ -482,7 +491,7 @@ impl ProjectionPushDown { mut options, predicate, } => { - options.with_columns = get_scan_columns(&mut acc_projections, expr_arena); + options.with_columns = get_scan_columns(&mut acc_projections, expr_arena, None); options.output_schema = if options.with_columns.is_none() { None @@ -504,7 +513,8 @@ impl ProjectionPushDown { predicate, .. } => { - options.with_columns = get_scan_columns(&mut acc_projections, expr_arena); + options.with_columns = + get_scan_columns(&mut acc_projections, expr_arena, options.row_count.as_ref()); let output_schema = if options.with_columns.is_none() { None diff --git a/polars/polars-lazy/src/frame/mod.rs b/polars/polars-lazy/src/frame/mod.rs index fe94fba57f19..93438a2e38bb 100644 --- a/polars/polars-lazy/src/frame/mod.rs +++ b/polars/polars-lazy/src/frame/mod.rs @@ -1202,67 +1202,81 @@ impl LazyFrame { let add_row_count_in_map = match &mut self.logical_plan { // Do the row count at scan #[cfg(feature = "csv")] - LogicalPlan::CsvScan { options, .. } => { + LogicalPlan::CsvScan { + options, file_info, .. + } => { options.row_count = Some(RowCount { name: name.to_string(), offset: offset.unwrap_or(0), }); + file_info.schema = Arc::new( + file_info + .schema + .new_inserting_at_index(0, name.into(), IDX_DTYPE) + .unwrap(), + ); false } #[cfg(feature = "ipc")] - LogicalPlan::IpcScan { options, .. } => { + LogicalPlan::IpcScan { + options, file_info, .. + } => { options.row_count = Some(RowCount { name: name.to_string(), offset: offset.unwrap_or(0), }); + file_info.schema = Arc::new( + file_info + .schema + .new_inserting_at_index(0, name.into(), IDX_DTYPE) + .unwrap(), + ); false } #[cfg(feature = "parquet")] - LogicalPlan::ParquetScan { options, .. } => { + LogicalPlan::ParquetScan { + options, file_info, .. + } => { options.row_count = Some(RowCount { name: name.to_string(), offset: offset.unwrap_or(0), }); + file_info.schema = Arc::new( + file_info + .schema + .new_inserting_at_index(0, name.into(), IDX_DTYPE) + .unwrap(), + ); false } _ => true, }; - let name2: SmartString = name.into(); - let udf_schema = move |s: &Schema| { - // Can't error, index 0 is always in bounds - let new = s - .new_inserting_at_index(0, name2.clone(), IDX_DTYPE) - .unwrap(); - Ok(Arc::new(new)) - }; - - let name = name.to_owned(); - - // if we do the row count at scan we add a dummy map, to update the schema - let opt = if add_row_count_in_map { - AllowedOptimizations { + if add_row_count_in_map { + let name: SmartString = name.into(); + let name2: SmartString = name.clone(); + let opt = AllowedOptimizations { slice_pushdown: false, predicate_pushdown: false, streaming: false, ..Default::default() - } + }; + let udf_schema = move |s: &Schema| { + // Can't error, index 0 is always in bounds + let new = s + .new_inserting_at_index(0, name2.clone(), IDX_DTYPE) + .unwrap(); + Ok(Arc::new(new)) + }; + self.map( + move |df: DataFrame| df.with_row_count(&name, offset), + opt, + Some(Arc::new(udf_schema)), + Some("WITH ROW COUNT"), + ) } else { - AllowedOptimizations::default() - }; - - self.map( - move |df: DataFrame| { - if add_row_count_in_map { - df.with_row_count(&name, offset) - } else { - Ok(df) - } - }, - opt, - Some(Arc::new(udf_schema)), - Some("WITH ROW COUNT"), - ) + self + } } /// Unnest the given `Struct` columns. This means that the fields of the `Struct` type will be diff --git a/polars/polars-lazy/src/physical_plan/executors/scan/ipc.rs b/polars/polars-lazy/src/physical_plan/executors/scan/ipc.rs index 7bcd391678ac..627a9a52328e 100644 --- a/polars/polars-lazy/src/physical_plan/executors/scan/ipc.rs +++ b/polars/polars-lazy/src/physical_plan/executors/scan/ipc.rs @@ -17,6 +17,7 @@ impl IpcExec { &mut self.options.with_columns, &mut self.schema, self.options.n_rows, + self.options.row_count.is_some(), ); IpcReader::new(file) .with_n_rows(n_rows) diff --git a/polars/polars-lazy/src/physical_plan/executors/scan/mod.rs b/polars/polars-lazy/src/physical_plan/executors/scan/mod.rs index cf3bd7262663..d3ab56e97675 100644 --- a/polars/polars-lazy/src/physical_plan/executors/scan/mod.rs +++ b/polars/polars-lazy/src/physical_plan/executors/scan/mod.rs @@ -40,6 +40,7 @@ fn prepare_scan_args( with_columns: &mut Option>>, schema: &mut SchemaRef, n_rows: Option, + has_row_count: bool, ) -> (std::fs::File, Projection, StopNRows, Predicate) { let file = std::fs::File::open(path).unwrap(); @@ -49,7 +50,7 @@ fn prepare_scan_args( let projection: Option> = with_columns.map(|with_columns| { with_columns .iter() - .map(|name| schema.index_of(name).unwrap()) + .map(|name| schema.index_of(name).unwrap() - has_row_count as usize) .collect() }); diff --git a/polars/polars-lazy/src/physical_plan/executors/scan/parquet.rs b/polars/polars-lazy/src/physical_plan/executors/scan/parquet.rs index 08975a2c206c..17f0b6a4bdba 100644 --- a/polars/polars-lazy/src/physical_plan/executors/scan/parquet.rs +++ b/polars/polars-lazy/src/physical_plan/executors/scan/parquet.rs @@ -37,6 +37,7 @@ impl ParquetExec { &mut self.options.with_columns, &mut self.schema, self.options.n_rows, + self.options.row_count.is_some(), ); ParquetReader::new(file) diff --git a/polars/polars-lazy/src/tests/mod.rs b/polars/polars-lazy/src/tests/mod.rs index a645b9614127..d8bf82b67f47 100644 --- a/polars/polars-lazy/src/tests/mod.rs +++ b/polars/polars-lazy/src/tests/mod.rs @@ -39,6 +39,7 @@ use polars_core::export::chrono::{NaiveDate, NaiveDateTime, NaiveTime}; use polars_core::prelude::*; pub(crate) use polars_core::SINGLE_LOCK; use polars_io::prelude::*; +use polars_io::RowCount; use polars_plan::logical_plan::{ ArenaLpIter, OptimizationRule, SimplifyExprRule, StackOptimizer, TypeCoercionRule, }; diff --git a/polars/tests/it/lazy/cse.rs b/polars/tests/it/lazy/cse.rs index 7069367aa3ac..fb46e67a7daa 100644 --- a/polars/tests/it/lazy/cse.rs +++ b/polars/tests/it/lazy/cse.rs @@ -1,6 +1,7 @@ use super::*; #[test] +#[cfg(feature = "semi_anti_join")] fn test_cse_union_schema_6504() -> PolarsResult<()> { use polars_core::df; let q1: LazyFrame = df![ diff --git a/py-polars/Cargo.lock b/py-polars/Cargo.lock index 0a9d9f59f8cd..29b6b3fd89ae 100644 --- a/py-polars/Cargo.lock +++ b/py-polars/Cargo.lock @@ -362,9 +362,9 @@ dependencies = [ [[package]] name = "comfy-table" -version = "6.2.0" +version = "7.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e959d788268e3bf9d35ace83e81b124190378e4c91c9067524675e33394b8ba" +checksum = "9ab77dbd8adecaf3f0db40581631b995f312a8a5ae3aa9993188bb8f23d83a5b" dependencies = [ "crossterm", "strum", diff --git a/py-polars/tests/unit/io/test_lazy_parquet.py b/py-polars/tests/unit/io/test_lazy_parquet.py index 220c1348d548..bc461c3adecc 100644 --- a/py-polars/tests/unit/io/test_lazy_parquet.py +++ b/py-polars/tests/unit/io/test_lazy_parquet.py @@ -197,7 +197,7 @@ def test_parquet_stats(tmp_path: Path) -> None: ).collect().shape == (8, 1) -def test_row_count_schema(parquet_file_path: Path) -> None: +def test_row_count_schema_parquet(parquet_file_path: Path) -> None: assert ( pl.scan_parquet(str(parquet_file_path), row_count_name="id") .select(["id", "b"]) diff --git a/py-polars/tests/unit/test_cse.py b/py-polars/tests/unit/test_cse.py index 22a69b5e027f..2573ec5155da 100644 --- a/py-polars/tests/unit/test_cse.py +++ b/py-polars/tests/unit/test_cse.py @@ -1,5 +1,8 @@ import re from datetime import date +from tempfile import NamedTemporaryFile + +import pytest import polars as pl @@ -104,3 +107,28 @@ def test_cse_9630() -> None: "value_right": [[1, 2]], "y": [2], } + + +@pytest.mark.write_disk() +def test_schema_row_count_cse() -> None: + csv_a = NamedTemporaryFile() + csv_a.write( + b""" + A,B + Gr1,A + Gr1,B + """.strip() + ) + csv_a.seek(0) + + df_a = pl.scan_csv(csv_a.name).with_row_count("Idx") + assert df_a.join(df_a, on="B").groupby( + "A", maintain_order=True + ).all().collect().to_dict(False) == { + "A": ["Gr1"], + "Idx": [[0, 1]], + "B": [["A", "B"]], + "Idx_right": [[0, 1]], + "A_right": [["Gr1", "Gr1"]], + } + csv_a.close()