Skip to content

Commit

Permalink
Change the unbounded_output API default (apache#7605)
Browse files Browse the repository at this point in the history
  • Loading branch information
metesynnada authored Sep 20, 2023
1 parent a2a09c7 commit bcdda39
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 2 deletions.
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ impl ExecutionPlan for FileSinkExec {
}))
}

fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
Ok(_children[0])
}

/// Execute the plan and return a stream of `RecordBatch`es for
/// the specified partition.
fn execute(
Expand Down
8 changes: 6 additions & 2 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use self::metrics::MetricsSet;
use self::{
coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan,
};
use datafusion_common::Result;
pub use datafusion_common::{internal_err, ColumnStatistics, Statistics};
use datafusion_common::{plan_err, Result};
use datafusion_physical_expr::PhysicalSortExpr;
pub use visitor::{accept, visit_execution_plan, ExecutionPlanVisitor};

Expand Down Expand Up @@ -74,7 +74,11 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
/// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
Ok(false)
if _children.iter().any(|&x| x) {
plan_err!("Plan does not support infinite stream from its children")
} else {
Ok(false)
}
}

/// If the output of this operator within each partition is sorted,
Expand Down
8 changes: 8 additions & 0 deletions datafusion/physical-plan/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ impl ExecutionPlan for GlobalLimitExec {
)))
}

fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
Ok(false)
}

fn execute(
&self,
partition: usize,
Expand Down Expand Up @@ -320,6 +324,10 @@ impl ExecutionPlan for LocalLimitExec {
self.input.ordering_equivalence_properties()
}

fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
Ok(false)
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
Expand Down

0 comments on commit bcdda39

Please sign in to comment.