Skip to content

Commit

Permalink
Minor: improve Display of output ordering of StreamTableExec (#9225)
Browse files Browse the repository at this point in the history
* Initial commit

* Update plan
  • Loading branch information
mustafasrepo authored Feb 15, 2024
1 parent 3f0963d commit 85be1bc
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 36 deletions.
23 changes: 2 additions & 21 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use crate::{
listing::{FileRange, PartitionedFile},
object_store::ObjectStoreUrl,
},
physical_plan::display::{OutputOrderingDisplay, ProjectSchemaDisplay},
physical_plan::display::{display_orderings, ProjectSchemaDisplay},
};

use arrow::{
Expand Down Expand Up @@ -129,26 +129,7 @@ impl DisplayAs for FileScanConfig {
write!(f, ", limit={limit}")?;
}

if let Some(ordering) = orderings.first() {
if !ordering.is_empty() {
let start = if orderings.len() == 1 {
", output_ordering="
} else {
", output_orderings=["
};
write!(f, "{}", start)?;
for (idx, ordering) in
orderings.iter().enumerate().filter(|(_, o)| !o.is_empty())
{
match idx {
0 => write!(f, "{}", OutputOrderingDisplay(ordering))?,
_ => write!(f, ", {}", OutputOrderingDisplay(ordering))?,
}
}
let end = if orderings.len() == 1 { "" } else { "]" };
write!(f, "{}", end)?;
}
}
display_orderings(f, &orderings)?;

Ok(())
}
Expand Down
28 changes: 27 additions & 1 deletion datafusion/physical-plan/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
//! [`crate::displayable`] for examples of how to format
use std::fmt;
use std::fmt::Formatter;

use super::{accept, ExecutionPlan, ExecutionPlanVisitor};

use arrow_schema::SchemaRef;
use datafusion_common::display::{GraphvizBuilder, PlanType, StringifiedPlan};
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};

/// Options for controlling how each [`ExecutionPlan`] should format itself
#[derive(Debug, Clone, Copy)]
Expand Down Expand Up @@ -437,6 +438,31 @@ impl<'a> fmt::Display for OutputOrderingDisplay<'a> {
}
}

pub fn display_orderings(f: &mut Formatter, orderings: &[LexOrdering]) -> fmt::Result {
if let Some(ordering) = orderings.first() {
if !ordering.is_empty() {
let start = if orderings.len() == 1 {
", output_ordering="
} else {
", output_orderings=["
};
write!(f, "{}", start)?;
for (idx, ordering) in
orderings.iter().enumerate().filter(|(_, o)| !o.is_empty())
{
match idx {
0 => write!(f, "{}", OutputOrderingDisplay(ordering))?,
_ => write!(f, ", {}", OutputOrderingDisplay(ordering))?,
}
}
let end = if orderings.len() == 1 { "" } else { "]" };
write!(f, "{}", end)?;
}
}

Ok(())
}

#[cfg(test)]
mod tests {
use std::fmt::Write;
Expand Down
17 changes: 4 additions & 13 deletions datafusion/physical-plan/src/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::any::Any;
use std::sync::Arc;

use super::{DisplayAs, DisplayFormatType};
use crate::display::{OutputOrderingDisplay, ProjectSchemaDisplay};
use crate::display::{display_orderings, ProjectSchemaDisplay};
use crate::stream::RecordBatchStreamAdapter;
use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream};

Expand Down Expand Up @@ -149,18 +149,9 @@ impl DisplayAs for StreamingTableExec {
write!(f, ", infinite_source=true")?;
}

self.projected_output_ordering
.first()
.map_or(Ok(()), |ordering| {
if !ordering.is_empty() {
write!(
f,
", output_ordering={}",
OutputOrderingDisplay(ordering)
)?;
}
Ok(())
})
display_orderings(f, &self.projected_output_ordering)?;

Ok(())
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3582,7 +3582,7 @@ SortPreservingMergeExec: [c@3 ASC NULLS LAST]
------CoalesceBatchesExec: target_batch_size=4096
--------RepartitionExec: partitioning=Hash([d@4], 2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC NULLS LAST
----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
------------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST]
------------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], [c@3 ASC NULLS LAST]]

# CTAS with NTILE function
statement ok
Expand Down

0 comments on commit 85be1bc

Please sign in to comment.