Skip to content

Commit

Permalink
fix(rust, python): fix row-count schema (pola-rs#9797)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored and c-peters committed Jul 14, 2023
1 parent 9f77e02 commit 077e533
Show file tree
Hide file tree
Showing 11 changed files with 142 additions and 67 deletions.
52 changes: 35 additions & 17 deletions polars/polars-lazy/polars-plan/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl LogicalPlanBuilder {
use polars_io::{is_cloud_url, SerReader as _};

let path = path.into();
let file_info: PolarsResult<FileInfo> = if is_cloud_url(&path) {
let (mut schema, num_rows) = if is_cloud_url(&path) {
#[cfg(not(feature = "async"))]
panic!(
"One or more of the cloud storage features ('aws', 'gcp', ...) must be enabled."
Expand All @@ -136,24 +136,22 @@ impl LogicalPlanBuilder {
#[cfg(feature = "async")]
{
let uri = path.to_string_lossy();
let (schema, num_rows) =
ParquetAsyncReader::file_info(&uri, cloud_options.as_ref())?;
Ok(FileInfo {
schema: Arc::new(schema),
row_estimation: (Some(num_rows), num_rows),
})
ParquetAsyncReader::file_info(&uri, cloud_options.as_ref())?
}
} else {
let file = std::fs::File::open(&path)?;
let mut reader = ParquetReader::new(file);
let schema = Arc::new(reader.schema()?);
let num_rows = reader.num_rows()?;
Ok(FileInfo {
schema,
row_estimation: (Some(num_rows), num_rows),
})
(reader.schema()?, reader.num_rows()?)
};

if let Some(rc) = &row_count {
let _ = schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE);
}

let file_info = FileInfo {
schema: Arc::new(schema),
row_estimation: (Some(num_rows), num_rows),
};
let file_info = file_info?;

Ok(LogicalPlan::ParquetScan {
path,
Expand Down Expand Up @@ -185,7 +183,12 @@ impl LogicalPlanBuilder {
let path = path.into();
let file = std::fs::File::open(&path)?;
let mut reader = IpcReader::new(file);
let schema = Arc::new(reader.schema()?);

let mut schema = reader.schema()?;
if let Some(rc) = &options.row_count {
let _ = schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE);
}
let schema = Arc::new(schema);

let num_rows = reader._num_rows()?;
let file_info = FileInfo {
Expand All @@ -211,7 +214,7 @@ impl LogicalPlanBuilder {
mut skip_rows: usize,
n_rows: Option<usize>,
cache: bool,
schema: Option<Arc<Schema>>,
mut schema: Option<Arc<Schema>>,
schema_overwrite: Option<&Schema>,
low_memory: bool,
comment_char: Option<u8>,
Expand Down Expand Up @@ -239,7 +242,7 @@ impl LogicalPlanBuilder {

// TODO! delay inferring schema until absolutely necessary
// this needs a way to estimated bytes/rows.
let (inferred_schema, rows_read, bytes_read) = infer_file_schema(
let (mut inferred_schema, rows_read, bytes_read) = infer_file_schema(
&reader_bytes,
delimiter,
infer_schema_length,
Expand All @@ -254,6 +257,21 @@ impl LogicalPlanBuilder {
try_parse_dates,
)?;

if let Some(rc) = &row_count {
match schema {
None => {
let _ = inferred_schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE);
}
Some(inner) => {
schema = Some(Arc::new(
inner
.new_inserting_at_index(0, rc.name.as_str().into(), IDX_DTYPE)
.unwrap(),
));
}
}
}

let schema = schema.unwrap_or_else(|| Arc::new(inferred_schema));
let n_bytes = reader_bytes.len();
let estimated_n_rows = (rows_read as f64 / bytes_read as f64 * n_bytes as f64) as usize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod semi_anti_join;

use polars_core::datatypes::PlHashSet;
use polars_core::prelude::*;
use polars_io::RowCount;
#[cfg(feature = "semi_anti_join")]
use semi_anti_join::process_semi_anti_join;

Expand Down Expand Up @@ -38,13 +39,23 @@ fn init_set() -> PlHashSet<Arc<str>> {
fn get_scan_columns(
acc_projections: &mut Vec<Node>,
expr_arena: &Arena<AExpr>,
row_count: Option<&RowCount>,
) -> Option<Arc<Vec<String>>> {
let mut with_columns = None;
if !acc_projections.is_empty() {
let mut columns = Vec::with_capacity(acc_projections.len());
for expr in acc_projections {
for name in aexpr_to_leaf_names(*expr, expr_arena) {
columns.push((*name).to_owned())
// we shouldn't project the row-count column, as that is generated
// in the scan
let push = match row_count {
Some(rc) if name.as_ref() != rc.name.as_str() => true,
None => true,
_ => false,
};
if push {
columns.push((*name).to_owned())
}
}
}
with_columns = Some(Arc::new(columns));
Expand Down Expand Up @@ -116,10 +127,6 @@ fn update_scan_schema(
acc_projections: &[Node],
expr_arena: &Arena<AExpr>,
schema: &Schema,
// this is only needed for parsers that sort the projections
// currently these are:
// sorting parsers: csv,
// non-sorting: parquet, ipc
sort_projections: bool,
) -> PolarsResult<Schema> {
let mut new_schema = Schema::with_capacity(acc_projections.len());
Expand Down Expand Up @@ -355,7 +362,7 @@ impl ProjectionPushDown {
output_schema,
} => {
if function.allows_projection_pushdown() {
options.with_columns = get_scan_columns(&mut acc_projections, expr_arena);
options.with_columns = get_scan_columns(&mut acc_projections, expr_arena, None);

let output_schema = if options.with_columns.is_none() {
None
Expand Down Expand Up @@ -403,7 +410,7 @@ impl ProjectionPushDown {
&schema,
false,
)?));
projection = get_scan_columns(&mut acc_projections, expr_arena);
projection = get_scan_columns(&mut acc_projections, expr_arena, None);
}
let lp = DataFrameScan {
df,
Expand All @@ -422,15 +429,16 @@ impl ProjectionPushDown {
mut options,
..
} => {
let with_columns = get_scan_columns(&mut acc_projections, expr_arena);
let with_columns =
get_scan_columns(&mut acc_projections, expr_arena, options.row_count.as_ref());
let output_schema = if with_columns.is_none() {
None
} else {
Some(Arc::new(update_scan_schema(
&acc_projections,
expr_arena,
&file_info.schema,
false,
options.row_count.is_some(),
)?))
};
options.with_columns = with_columns;
Expand All @@ -454,15 +462,16 @@ impl ProjectionPushDown {
cloud_options,
..
} => {
let with_columns = get_scan_columns(&mut acc_projections, expr_arena);
let with_columns =
get_scan_columns(&mut acc_projections, expr_arena, options.row_count.as_ref());
let output_schema = if with_columns.is_none() {
None
} else {
Some(Arc::new(update_scan_schema(
&acc_projections,
expr_arena,
&file_info.schema,
false,
options.row_count.is_some(),
)?))
};
options.with_columns = with_columns;
Expand All @@ -482,7 +491,7 @@ impl ProjectionPushDown {
mut options,
predicate,
} => {
options.with_columns = get_scan_columns(&mut acc_projections, expr_arena);
options.with_columns = get_scan_columns(&mut acc_projections, expr_arena, None);

options.output_schema = if options.with_columns.is_none() {
None
Expand All @@ -504,7 +513,8 @@ impl ProjectionPushDown {
predicate,
..
} => {
options.with_columns = get_scan_columns(&mut acc_projections, expr_arena);
options.with_columns =
get_scan_columns(&mut acc_projections, expr_arena, options.row_count.as_ref());

let output_schema = if options.with_columns.is_none() {
None
Expand Down
80 changes: 47 additions & 33 deletions polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1202,67 +1202,81 @@ impl LazyFrame {
let add_row_count_in_map = match &mut self.logical_plan {
// Do the row count at scan
#[cfg(feature = "csv")]
LogicalPlan::CsvScan { options, .. } => {
LogicalPlan::CsvScan {
options, file_info, ..
} => {
options.row_count = Some(RowCount {
name: name.to_string(),
offset: offset.unwrap_or(0),
});
file_info.schema = Arc::new(
file_info
.schema
.new_inserting_at_index(0, name.into(), IDX_DTYPE)
.unwrap(),
);
false
}
#[cfg(feature = "ipc")]
LogicalPlan::IpcScan { options, .. } => {
LogicalPlan::IpcScan {
options, file_info, ..
} => {
options.row_count = Some(RowCount {
name: name.to_string(),
offset: offset.unwrap_or(0),
});
file_info.schema = Arc::new(
file_info
.schema
.new_inserting_at_index(0, name.into(), IDX_DTYPE)
.unwrap(),
);
false
}
#[cfg(feature = "parquet")]
LogicalPlan::ParquetScan { options, .. } => {
LogicalPlan::ParquetScan {
options, file_info, ..
} => {
options.row_count = Some(RowCount {
name: name.to_string(),
offset: offset.unwrap_or(0),
});
file_info.schema = Arc::new(
file_info
.schema
.new_inserting_at_index(0, name.into(), IDX_DTYPE)
.unwrap(),
);
false
}
_ => true,
};

let name2: SmartString = name.into();
let udf_schema = move |s: &Schema| {
// Can't error, index 0 is always in bounds
let new = s
.new_inserting_at_index(0, name2.clone(), IDX_DTYPE)
.unwrap();
Ok(Arc::new(new))
};

let name = name.to_owned();

// if we do the row count at scan we add a dummy map, to update the schema
let opt = if add_row_count_in_map {
AllowedOptimizations {
if add_row_count_in_map {
let name: SmartString = name.into();
let name2: SmartString = name.clone();
let opt = AllowedOptimizations {
slice_pushdown: false,
predicate_pushdown: false,
streaming: false,
..Default::default()
}
};
let udf_schema = move |s: &Schema| {
// Can't error, index 0 is always in bounds
let new = s
.new_inserting_at_index(0, name2.clone(), IDX_DTYPE)
.unwrap();
Ok(Arc::new(new))
};
self.map(
move |df: DataFrame| df.with_row_count(&name, offset),
opt,
Some(Arc::new(udf_schema)),
Some("WITH ROW COUNT"),
)
} else {
AllowedOptimizations::default()
};

self.map(
move |df: DataFrame| {
if add_row_count_in_map {
df.with_row_count(&name, offset)
} else {
Ok(df)
}
},
opt,
Some(Arc::new(udf_schema)),
Some("WITH ROW COUNT"),
)
self
}
}

/// Unnest the given `Struct` columns. This means that the fields of the `Struct` type will be
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/src/physical_plan/executors/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ impl IpcExec {
&mut self.options.with_columns,
&mut self.schema,
self.options.n_rows,
self.options.row_count.is_some(),
);
IpcReader::new(file)
.with_n_rows(n_rows)
Expand Down
3 changes: 2 additions & 1 deletion polars/polars-lazy/src/physical_plan/executors/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ fn prepare_scan_args(
with_columns: &mut Option<Arc<Vec<String>>>,
schema: &mut SchemaRef,
n_rows: Option<usize>,
has_row_count: bool,
) -> (std::fs::File, Projection, StopNRows, Predicate) {
let file = std::fs::File::open(path).unwrap();

Expand All @@ -49,7 +50,7 @@ fn prepare_scan_args(
let projection: Option<Vec<_>> = with_columns.map(|with_columns| {
with_columns
.iter()
.map(|name| schema.index_of(name).unwrap())
.map(|name| schema.index_of(name).unwrap() - has_row_count as usize)
.collect()
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ impl ParquetExec {
&mut self.options.with_columns,
&mut self.schema,
self.options.n_rows,
self.options.row_count.is_some(),
);

ParquetReader::new(file)
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use polars_core::export::chrono::{NaiveDate, NaiveDateTime, NaiveTime};
use polars_core::prelude::*;
pub(crate) use polars_core::SINGLE_LOCK;
use polars_io::prelude::*;
use polars_io::RowCount;
use polars_plan::logical_plan::{
ArenaLpIter, OptimizationRule, SimplifyExprRule, StackOptimizer, TypeCoercionRule,
};
Expand Down
1 change: 1 addition & 0 deletions polars/tests/it/lazy/cse.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::*;

#[test]
#[cfg(feature = "semi_anti_join")]
fn test_cse_union_schema_6504() -> PolarsResult<()> {
use polars_core::df;
let q1: LazyFrame = df![
Expand Down
4 changes: 2 additions & 2 deletions py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 077e533

Please sign in to comment.