Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug-fix: MemoryExec sort expressions do NOT refer to the projected schema #12876

Merged
merged 10 commits into from
Oct 12, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,18 @@ use crate::physical_planner::create_physical_sort_exprs;

use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_catalog::Session;
use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema, SchemaExt};
use datafusion_execution::TaskContext;
use datafusion_expr::dml::InsertOp;
use datafusion_expr::SortExpr;
use datafusion_physical_expr::equivalence::ProjectionMapping;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_expr::SortExpr;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use futures::StreamExt;
use log::debug;
use parking_lot::Mutex;
Expand Down Expand Up @@ -231,7 +235,7 @@ impl TableProvider for MemTable {
if !sort_order.is_empty() {
let df_schema = DFSchema::try_from(self.schema.as_ref().clone())?;

let file_sort_order = sort_order
let mut file_sort_order = sort_order
.iter()
.map(|sort_exprs| {
create_physical_sort_exprs(
Expand All @@ -241,6 +245,31 @@ impl TableProvider for MemTable {
)
})
.collect::<Result<Vec<_>>>()?;

// If there is a projection on the source, we also need to project orderings
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adjusting the ordering here, I wonder if it would be less code / "just work" if you applied the projection to self.sort_order first? the normal output properties calculation should work I think 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I can simply apply a straightforward projection to the self.sort_order, as they may still require special handling. For example, if we have an ordering [a, b, c] and the projection excludes column a, the remaining ordering [b,c] would not be valid. To avoid missing edge cases like this, I prefer consulting the equivalence API to ensure correctness.

if let Some(projection) = projection {
let base_eqp = EquivalenceProperties::new_with_orderings(
self.schema(),
&file_sort_order,
);
let proj_exprs = projection
.iter()
.map(|idx| {
let name = self.schema.field(*idx).name();
(
Arc::new(Column::new(name, *idx)) as Arc<dyn PhysicalExpr>,
name.to_string(),
)
})
.collect::<Vec<_>>();
let projection_mapping =
ProjectionMapping::try_new(&proj_exprs, &self.schema)?;
file_sort_order = base_eqp
.project(&projection_mapping, exec.schema())
.oeq_class
.orderings;
}

exec = exec.with_sort_information(file_sort_order);
}

Expand Down
15 changes: 15 additions & 0 deletions datafusion/physical-plan/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use datafusion_execution::memory_pool::MemoryReservation;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};

use datafusion_physical_expr::utils::collect_columns;
use futures::Stream;

/// Execution plan for reading in-memory batches of data
Expand Down Expand Up @@ -207,6 +208,20 @@ impl MemoryExec {
/// [`EquivalenceProperties`], we can keep track of these equivalences
/// and treat `a ASC` and `b DESC` as the same ordering requirement.
pub fn with_sort_information(mut self, sort_information: Vec<LexOrdering>) -> Self {
// All sort expressions must refer to the projected schema
debug_assert!({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than a debug assert, I think it is worth considering changing with_sort_information to try_with_sort_information and returning a Result<Self> with a real error such as which columns wasn't present, )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I have also insert the logic into try_with_sort_information() since I've observed that ExecutionPlan API's taking PhysicalExpr's as parameter refer to the input schema when that operator has a built-in projection like this MemoryExec. I believe we should follow this convention.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense -- thank you

let fields = self.projected_schema.fields();
sort_information
.iter()
.flatten()
.flat_map(|expr| collect_columns(&expr.expr))
.all(|col| {
fields
.get(col.index())
.map(|field| field.name() == col.name())
.unwrap_or(false)
})
});
self.sort_information = sort_information;

// We need to update equivalence properties when updating sort information.
Expand Down