You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Currently some cost based optimizations are executed during the logical plan optimization. This breaks the intent of separating the logical and physical plan. Usually, the logical plan is supposed to be optimized using rule based optimizations only. It is in the physical plan selection that the cost based optimization should kick in.
A more concrete reason to move all cost based optimizations further down the pipe is to avoid the need to fetch statistics when building the logical plan. In Ballista for example the logical plan is built in the client, which might be off cluster, and thus shouldn’t be required to have access to the datastore (or at least not the high performance access that is sometimes required to fetch statistics).
Describe the solution you'd like
Remove statistics() and has_exact_statistics() from the TableProvider trait
Add a statistics() method to the ExecutionPlan trait
Each node of the physical plan will try to assess its output statistics itself according to its inputs and its internal logic. This is easier to maintain than the current approach where we are trying to reconstruct the statistics of the nodes externally (e.g. in hash_build_probe_order.rs -> get_num_rows(&LogicalPlan) ).
The returned type will be the same as the current Statistics struct, except that we use a boolean field is_exact instead of having another has_exact_statistics() methods
The method is sync and takes immutable &self. Some ExecutionPlan nodes might want to fetch the latest statistics when being queried. A good example is the ShuffleReaderExec which might want to recompute the statistics once all its input partitions are resolved. We consider that the execution plan nodes are mostly immutable, and and updating the statistics requires to rewrite the whole physical plan tree above the updated node.
[can be postponed to a separate PR] Make the TableProvider.scan(...) method async to allow the computation of the statistics when creating the source ExecutionPlan nodes. This will require async propagation to the PhysicalPlanner trait.
The AggregateStatistics rule (use the statistics directly to provide the aggregation if possible) could be managed:
By the HashAggregateExec itself when being constructed
During the physical plan optimization
The HashBuildProbeOrder rule (optimize join orderings according to table sizes) :
By the HashJoinExec and CrossJoinExec themselves when being constructed
During the physical plan optimization, this would give better support for Spark style AQE
[can be postponed to a separate PR] add a eval_stats(&self, &Statistics) to the PhysicalExpr trait so that we can also propagate statistics through complex expressions
Describe alternatives you've considered
It was considered to have async fn statistics(&mut self) to support fetching the latest statistics or simply fn statistics(&self). Fetching the latest statistics is important to support Spark style AQE, but it can also be achieved by keeping the physical plan nodes immutable and updating the entire plan tree when necessary.
It would also be valuable to have the statistics at the partition level. This should be considered in a further evolution.
Additional context
The need to move the CBO to the execution plan is linked to problem such as the implementation of table formats
As suggested by @Dandandan, we might also want to consider the fact that the statistics can be updated at runtime (like Spark AQE). In Ballista, an execution plan for a stage that takes a shuffle as input might be re-optimized according to the statistics of the shuffle boundary. For instance, this might change the optimal build/probe order of the tables for a join that has the shuffle boundary as one of its inputs.
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Currently some cost based optimizations are executed during the logical plan optimization. This breaks the intent of separating the logical and physical plan. Usually, the logical plan is supposed to be optimized using rule based optimizations only. It is in the physical plan selection that the cost based optimization should kick in.
A more concrete reason to move all cost based optimizations further down the pipe is to avoid the need to fetch statistics when building the logical plan. In Ballista for example the logical plan is built in the client, which might be off cluster, and thus shouldn’t be required to have access to the datastore (or at least not the high performance access that is sometimes required to fetch statistics).
Describe the solution you'd like
statistics()
andhas_exact_statistics()
from theTableProvider
traitstatistics()
method to theExecutionPlan
traithash_build_probe_order.rs
->get_num_rows(&LogicalPlan)
).is_exact
instead of having anotherhas_exact_statistics()
methods&self
. SomeExecutionPlan
nodes might want to fetch the latest statistics when being queried. A good example is theShuffleReaderExec
which might want to recompute the statistics once all its input partitions are resolved. We consider that the execution plan nodes are mostly immutable, and and updating the statistics requires to rewrite the whole physical plan tree above the updated node.TableProvider.scan(...)
method async to allow the computation of the statistics when creating the sourceExecutionPlan
nodes. This will require async propagation to thePhysicalPlanner
trait.AggregateStatistics
rule (use the statistics directly to provide the aggregation if possible) could be managed:HashAggregateExec
itself when being constructedHashBuildProbeOrder
rule (optimize join orderings according to table sizes) :eval_stats(&self, &Statistics)
to thePhysicalExpr
trait so that we can also propagate statistics through complex expressionsDescribe alternatives you've considered
It was considered to have
async fn statistics(&mut self)
to support fetching the latest statistics or simplyfn statistics(&self)
. Fetching the latest statistics is important to support Spark style AQE, but it can also be achieved by keeping the physical plan nodes immutable and updating the entire plan tree when necessary.It would also be valuable to have the statistics at the partition level. This should be considered in a further evolution.
Additional context
The need to move the CBO to the execution plan is linked to problem such as the implementation of table formats
It is also related to the multiple reading of the statistics raised in Ballista:
Features for which the implementation will likely create merge conflicts:
count(col)
using table statistics #904The text was updated successfully, but these errors were encountered: