Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: produce metadata in optimizer output #83

Merged
merged 11 commits into from
Mar 11, 2024
15 changes: 11 additions & 4 deletions optd-core/src/cascades/memo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::any::Any;
use crate::{
cost::Cost,
property::PropertyBuilderAny,
rel_node::{RelNode, RelNodeRef, RelNodeTyp, Value},
rel_node::{RelNode, RelNodeMeta, RelNodeMetaMap, RelNodeRef, RelNodeTyp, Value},
};

use super::optimizer::{ExprId, GroupId};
Expand Down Expand Up @@ -498,7 +498,7 @@ impl<T: RelNodeTyp> Memo<T> {
pub fn get_best_group_binding(
&self,
group_id: GroupId,
on_produce: &mut impl FnMut(RelNodeRef<T>, GroupId) -> RelNodeRef<T>,
meta: &mut Option<RelNodeMetaMap>,
) -> Result<RelNodeRef<T>> {
let info = self.get_group_info(group_id);
if let Some(winner) = info.winner {
Expand All @@ -507,14 +507,21 @@ impl<T: RelNodeTyp> Memo<T> {
let expr = self.get_expr_memoed(expr_id);
let mut children = Vec::with_capacity(expr.children.len());
for child in &expr.children {
children.push(self.get_best_group_binding(*child, on_produce)?);
children.push(self.get_best_group_binding(*child, meta)?);
}
let node = Arc::new(RelNode {
typ: expr.typ.clone(),
children,
data: expr.data.clone(),
});
return Ok(on_produce(node, group_id));

if let Some(meta) = meta {
meta.insert(
node.as_ref() as *const _ as usize,
RelNodeMeta::new(group_id),
);
}
return Ok(node);
}
}
bail!("no best group binding for group {}", group_id)
Expand Down
10 changes: 5 additions & 5 deletions optd-core/src/cascades/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
cost::CostModel,
optimizer::Optimizer,
property::{PropertyBuilder, PropertyBuilderAny},
rel_node::{RelNodeRef, RelNodeTyp},
rel_node::{RelNodeMetaMap, RelNodeRef, RelNodeTyp},
rules::RuleWrapper,
};

Expand Down Expand Up @@ -209,13 +209,13 @@ impl<T: RelNodeTyp> CascadesOptimizer<T> {
Ok(group_id)
}

/// Get the group binding.
/// Gets the group binding.
pub fn step_get_optimize_rel(
&self,
group_id: GroupId,
mut on_produce: impl FnMut(RelNodeRef<T>, GroupId) -> RelNodeRef<T>,
meta: &mut Option<RelNodeMetaMap>,
) -> Result<RelNodeRef<T>> {
self.memo.get_best_group_binding(group_id, &mut on_produce)
self.memo.get_best_group_binding(group_id, meta)
}

fn fire_optimize_tasks(&mut self, group_id: GroupId) -> Result<()> {
Expand Down Expand Up @@ -256,7 +256,7 @@ impl<T: RelNodeTyp> CascadesOptimizer<T> {
fn optimize_inner(&mut self, root_rel: RelNodeRef<T>) -> Result<RelNodeRef<T>> {
let (group_id, _) = self.add_group_expr(root_rel, None);
self.fire_optimize_tasks(group_id)?;
self.memo.get_best_group_binding(group_id, &mut |x, _| x)
self.memo.get_best_group_binding(group_id, &mut None)
}

pub fn resolve_group_id(&self, root_rel: RelNodeRef<T>) -> GroupId {
Expand Down
17 changes: 17 additions & 0 deletions optd-core/src/rel_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//! the internal representation of the plan nodes.

use std::{
collections::HashMap,
fmt::{Debug, Display},
hash::Hash,
sync::Arc,
Expand Down Expand Up @@ -201,3 +202,19 @@ impl<T: RelNodeTyp> RelNode<T> {
}
}
}

/// Metadata for a rel node.
#[derive(Clone, Debug, PartialEq)]
pub struct RelNodeMeta {
/// The group (id) of the `RelNode`
pub group_id: GroupId,
}

impl RelNodeMeta {
pub fn new(group_id: GroupId) -> Self {
RelNodeMeta { group_id }
}
}

/// A hash table storing `RelNode` (memory address, metadata) pairs.
pub type RelNodeMetaMap = HashMap<usize, RelNodeMeta>;
106 changes: 66 additions & 40 deletions optd-datafusion-bridge/src/from_optd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use datafusion::{
},
scalar::ScalarValue,
};
use optd_core::rel_node::RelNodeMetaMap;
use optd_datafusion_repr::{
plan_nodes::{
BetweenExpr, BinOpExpr, BinOpType, CastExpr, ColumnRefExpr, ConstantExpr, ConstantType,
Expand All @@ -30,7 +31,6 @@ use optd_datafusion_repr::{
PlanNode, SortOrderExpr, SortOrderType,
},
properties::schema::Schema as OptdSchema,
PhysicalCollector,
};

use crate::{physical_collector::CollectorExec, OptdPlanContext};
Expand Down Expand Up @@ -282,8 +282,9 @@ impl OptdPlanContext<'_> {
async fn conv_from_optd_projection(
&mut self,
node: PhysicalProjection,
meta: &RelNodeMetaMap,
) -> Result<Arc<dyn ExecutionPlan + 'static>> {
let input_exec = self.conv_from_optd_plan_node(node.child()).await?;
let input_exec = self.conv_from_optd_plan_node(node.child(), meta).await?;
let physical_exprs = node
.exprs()
.to_vec()
Expand All @@ -307,8 +308,9 @@ impl OptdPlanContext<'_> {
async fn conv_from_optd_filter(
&mut self,
node: PhysicalFilter,
meta: &RelNodeMetaMap,
) -> Result<Arc<dyn ExecutionPlan + 'static>> {
let input_exec = self.conv_from_optd_plan_node(node.child()).await?;
let input_exec = self.conv_from_optd_plan_node(node.child(), meta).await?;
let physical_expr = Self::conv_from_optd_expr(node.cond(), &input_exec.schema())?;
Ok(
Arc::new(datafusion::physical_plan::filter::FilterExec::try_new(
Expand All @@ -322,8 +324,9 @@ impl OptdPlanContext<'_> {
async fn conv_from_optd_limit(
&mut self,
node: PhysicalLimit,
meta: &RelNodeMetaMap,
) -> Result<Arc<dyn ExecutionPlan + 'static>> {
let child = self.conv_from_optd_plan_node(node.child()).await?;
let child = self.conv_from_optd_plan_node(node.child(), meta).await?;

// Limit skip/fetch expressions are only allowed to be constant int
assert!(node.skip().typ() == OptRelNodeTyp::Constant(ConstantType::UInt64));
Expand Down Expand Up @@ -357,8 +360,9 @@ impl OptdPlanContext<'_> {
async fn conv_from_optd_sort(
&mut self,
node: PhysicalSort,
meta: &RelNodeMetaMap,
) -> Result<Arc<dyn ExecutionPlan + 'static>> {
let input_exec = self.conv_from_optd_plan_node(node.child()).await?;
let input_exec = self.conv_from_optd_plan_node(node.child(), meta).await?;
let physical_exprs = node
.exprs()
.to_vec()
Expand All @@ -382,8 +386,9 @@ impl OptdPlanContext<'_> {
async fn conv_from_optd_agg(
&mut self,
node: PhysicalAgg,
meta: &RelNodeMetaMap,
) -> Result<Arc<dyn ExecutionPlan + 'static>> {
let input_exec = self.conv_from_optd_plan_node(node.child()).await?;
let input_exec = self.conv_from_optd_plan_node(node.child(), meta).await?;
let agg_exprs = node
.aggrs()
.to_vec()
Expand Down Expand Up @@ -421,9 +426,10 @@ impl OptdPlanContext<'_> {
async fn conv_from_optd_nested_loop_join(
&mut self,
node: PhysicalNestedLoopJoin,
meta: &RelNodeMetaMap,
) -> Result<Arc<dyn ExecutionPlan + 'static>> {
let left_exec = self.conv_from_optd_plan_node(node.left()).await?;
let right_exec = self.conv_from_optd_plan_node(node.right()).await?;
let left_exec = self.conv_from_optd_plan_node(node.left(), meta).await?;
let right_exec = self.conv_from_optd_plan_node(node.right(), meta).await?;
let filter_schema = {
let fields = left_exec
.schema()
Expand Down Expand Up @@ -477,9 +483,10 @@ impl OptdPlanContext<'_> {
async fn conv_from_optd_hash_join(
&mut self,
node: PhysicalHashJoin,
meta: &RelNodeMetaMap,
) -> Result<Arc<dyn ExecutionPlan + 'static>> {
let left_exec = self.conv_from_optd_plan_node(node.left()).await?;
let right_exec = self.conv_from_optd_plan_node(node.right()).await?;
let left_exec = self.conv_from_optd_plan_node(node.left(), meta).await?;
let right_exec = self.conv_from_optd_plan_node(node.right(), meta).await?;
let join_type = match node.join_type() {
JoinType::Inner => datafusion::logical_expr::JoinType::Inner,
_ => unimplemented!(),
Expand Down Expand Up @@ -519,76 +526,95 @@ impl OptdPlanContext<'_> {
)
}

#[async_recursion]
async fn conv_from_optd_plan_node(&mut self, node: PlanNode) -> Result<Arc<dyn ExecutionPlan>> {
async fn conv_from_optd_plan_node(
&mut self,
node: PlanNode,
meta: &RelNodeMetaMap,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut schema = OptdSchema { fields: vec![] };
if node.typ() == OptRelNodeTyp::PhysicalEmptyRelation {
schema = node.schema(self.optimizer.unwrap().optd_optimizer());
}
let rel_node = node.into_rel_node();

let group_id = meta
.get(&(rel_node.as_ref() as *const _ as usize))
.expect("group id not found")
.group_id;
let rel_node_dbg = rel_node.clone();
let result = match &rel_node.typ {
let bare = match &rel_node.typ {
OptRelNodeTyp::PhysicalScan => {
self.conv_from_optd_table_scan(PhysicalScan::from_rel_node(rel_node).unwrap())
.await
.await?
}
OptRelNodeTyp::PhysicalProjection => {
self.conv_from_optd_projection(PhysicalProjection::from_rel_node(rel_node).unwrap())
.await
self.conv_from_optd_projection(
PhysicalProjection::from_rel_node(rel_node).unwrap(),
meta,
)
.await?
}
OptRelNodeTyp::PhysicalFilter => {
self.conv_from_optd_filter(PhysicalFilter::from_rel_node(rel_node).unwrap())
.await
self.conv_from_optd_filter(PhysicalFilter::from_rel_node(rel_node).unwrap(), meta)
.await?
}
OptRelNodeTyp::PhysicalSort => {
self.conv_from_optd_sort(PhysicalSort::from_rel_node(rel_node).unwrap())
.await
self.conv_from_optd_sort(PhysicalSort::from_rel_node(rel_node).unwrap(), meta)
.await?
}
OptRelNodeTyp::PhysicalAgg => {
self.conv_from_optd_agg(PhysicalAgg::from_rel_node(rel_node).unwrap())
.await
self.conv_from_optd_agg(PhysicalAgg::from_rel_node(rel_node).unwrap(), meta)
.await?
}
OptRelNodeTyp::PhysicalNestedLoopJoin(_) => {
self.conv_from_optd_nested_loop_join(
PhysicalNestedLoopJoin::from_rel_node(rel_node).unwrap(),
meta,
)
.await
.await?
}
OptRelNodeTyp::PhysicalHashJoin(_) => {
self.conv_from_optd_hash_join(PhysicalHashJoin::from_rel_node(rel_node).unwrap())
.await
}
OptRelNodeTyp::PhysicalCollector(_) => {
let node = PhysicalCollector::from_rel_node(rel_node).unwrap();
let child = self.conv_from_optd_plan_node(node.child()).await?;
Ok(Arc::new(CollectorExec::new(
child,
node.group_id(),
self.optimizer.as_ref().unwrap().runtime_statistics.clone(),
)) as Arc<dyn ExecutionPlan>)
self.conv_from_optd_hash_join(
PhysicalHashJoin::from_rel_node(rel_node).unwrap(),
meta,
)
.await?
}
OptRelNodeTyp::PhysicalEmptyRelation => {
let physical_node = PhysicalEmptyRelation::from_rel_node(rel_node).unwrap();
let datafusion_schema: Schema = from_optd_schema(schema);
Ok(Arc::new(datafusion::physical_plan::empty::EmptyExec::new(
Arc::new(datafusion::physical_plan::empty::EmptyExec::new(
physical_node.produce_one_row(),
Arc::new(datafusion_schema),
)) as Arc<dyn ExecutionPlan>)
)) as Arc<dyn ExecutionPlan>
}
OptRelNodeTyp::PhysicalLimit => {
self.conv_from_optd_limit(PhysicalLimit::from_rel_node(rel_node).unwrap())
.await
self.conv_from_optd_limit(PhysicalLimit::from_rel_node(rel_node).unwrap(), meta)
.await?
}
typ => unimplemented!("{}", typ),
};
result.with_context(|| format!("when processing {}", rel_node_dbg))

let optimizer = self.optimizer.as_ref().unwrap();
if optimizer.adaptive_enabled() {
let bare_with_collector: Result<Arc<dyn ExecutionPlan>> =
Ok(Arc::new(CollectorExec::new(
bare,
group_id,
self.optimizer.as_ref().unwrap().runtime_statistics.clone(),
)) as Arc<dyn ExecutionPlan>);
bare_with_collector.with_context(|| format!("when processing {}", rel_node_dbg))
} else {
Ok(bare)
}
}

pub async fn conv_from_optd(
&mut self,
root_rel: OptRelNodeRef,
meta: RelNodeMetaMap,
) -> Result<Arc<dyn ExecutionPlan>> {
self.conv_from_optd_plan_node(PlanNode::from_rel_node(root_rel).unwrap())
self.conv_from_optd_plan_node(PlanNode::from_rel_node(root_rel).unwrap(), &meta)
.await
}
}
5 changes: 3 additions & 2 deletions optd-datafusion-bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ impl OptdQueryPlanner {
));
}
let mut optimizer = self.optimizer.lock().unwrap().take().unwrap();
let (group_id, optimized_rel) = optimizer.optimize(optd_rel)?;
let (group_id, optimized_rel, meta) = optimizer.optimize(optd_rel)?;

if let Some(explains) = &mut explains {
explains.push(StringifiedPlan::new(
PlanType::OptimizedPhysicalPlan {
Expand Down Expand Up @@ -280,7 +281,7 @@ impl OptdQueryPlanner {
// );
// optimizer.dump(Some(group_id));
ctx.optimizer = Some(&optimizer);
let physical_plan = ctx.conv_from_optd(optimized_rel).await?;
let physical_plan = ctx.conv_from_optd(optimized_rel, meta).await?;
if let Some(explains) = &mut explains {
explains.push(
displayable(&*physical_plan)
Expand Down
Loading
Loading