Skip to content

Commit

Permalink
remove physical collector
Browse files Browse the repository at this point in the history
Signed-off-by: Yuchen Liang <[email protected]>
  • Loading branch information
yliang412 committed Feb 22, 2024
1 parent 5208e32 commit 637a7f2
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 78 deletions.
10 changes: 3 additions & 7 deletions optd-core/src/cascades/memo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,26 +471,22 @@ impl<T: RelNodeTyp> Memo<T> {
bail!("no best group binding for group {}", group_id)
}

pub fn get_best_group_binding(
&self,
group_id: GroupId,
on_produce: &mut impl FnMut(RelNodeRef<T>, GroupId) -> RelNodeRef<T>,
) -> Result<RelNodeRef<T>> {
pub fn get_best_group_binding(&self, group_id: GroupId) -> Result<RelNodeRef<T>> {
let info = self.get_group_info(group_id);
if let Some(winner) = info.winner {
if !winner.impossible {
let expr_id = winner.expr_id;
let expr = self.get_expr_memoed(expr_id);
let mut children = Vec::with_capacity(expr.children.len());
for child in &expr.children {
children.push(self.get_best_group_binding(*child, on_produce)?);
children.push(self.get_best_group_binding(*child)?);
}
let node = Arc::new(RelNode {
typ: expr.typ.clone(),
children,
data: expr.data.clone(),
});
return Ok(on_produce(node, group_id));
return Ok(node);
}
}
bail!("no best group binding for group {}", group_id)
Expand Down
10 changes: 3 additions & 7 deletions optd-core/src/cascades/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,8 @@ impl<T: RelNodeTyp> CascadesOptimizer<T> {
}

/// Get the group binding.
pub fn step_get_optimize_rel(
&self,
group_id: GroupId,
mut on_produce: impl FnMut(RelNodeRef<T>, GroupId) -> RelNodeRef<T>,
) -> Result<RelNodeRef<T>> {
self.memo.get_best_group_binding(group_id, &mut on_produce)
pub fn step_get_optimize_rel(&self, group_id: GroupId) -> Result<RelNodeRef<T>> {
self.memo.get_best_group_binding(group_id)
}

fn fire_optimize_tasks(&mut self, group_id: GroupId) -> Result<()> {
Expand Down Expand Up @@ -269,7 +265,7 @@ impl<T: RelNodeTyp> CascadesOptimizer<T> {
fn optimize_inner(&mut self, root_rel: RelNodeRef<T>) -> Result<RelNodeRef<T>> {
let (group_id, _) = self.add_group_expr(root_rel, None);
self.fire_optimize_tasks(group_id)?;
self.memo.get_best_group_binding(group_id, &mut |x, _| x)
self.memo.get_best_group_binding(group_id)
}

pub fn resolve_group_id(&self, root_rel: RelNodeRef<T>) -> GroupId {
Expand Down
114 changes: 57 additions & 57 deletions optd-datafusion-repr/src/adaptive.rs
Original file line number Diff line number Diff line change
@@ -1,59 +1,59 @@
// TODO: move this to a separate crate

use optd_core::cascades::GroupId;

use pretty_xmlish::Pretty;

use optd_core::rel_node::RelNode;

use crate::plan_nodes::{OptRelNode, OptRelNodeRef, OptRelNodeTyp, PlanNode};

#[derive(Clone, Debug)]
pub struct PhysicalCollector(pub PlanNode);

impl OptRelNode for PhysicalCollector {
fn into_rel_node(self) -> OptRelNodeRef {
self.0.into_rel_node()
}

fn from_rel_node(rel_node: OptRelNodeRef) -> Option<Self> {
if !matches!(rel_node.typ, OptRelNodeTyp::PhysicalCollector(_)) {
return None;
}
PlanNode::from_rel_node(rel_node).map(Self)
}

fn dispatch_explain(&self) -> Pretty<'static> {
// Pretty::simple_record(
// "PhysicalCollector",
// vec![("group_id", self.group_id().to_string().into())],
// vec![self.child().explain()],
// )
self.child().explain()
}
}

impl PhysicalCollector {
pub fn new(child: PlanNode, group_id: GroupId) -> PhysicalCollector {
PhysicalCollector(PlanNode(
RelNode {
typ: OptRelNodeTyp::PhysicalCollector(group_id),
children: vec![child.into_rel_node()],
data: None,
}
.into(),
))
}

pub fn group_id(&self) -> GroupId {
if let OptRelNodeTyp::PhysicalCollector(group_id) = self.clone().into_rel_node().typ {
group_id
} else {
panic!("not a physical collector")
}
}

pub fn child(&self) -> PlanNode {
PlanNode::from_rel_node(self.clone().into_rel_node().child(0)).unwrap()
}
}
// use optd_core::cascades::GroupId;

// use pretty_xmlish::Pretty;

// use optd_core::rel_node::RelNode;

// use crate::plan_nodes::{OptRelNode, OptRelNodeRef, OptRelNodeTyp, PlanNode};

// #[derive(Clone, Debug)]
// pub struct PhysicalCollector(pub PlanNode);

// impl OptRelNode for PhysicalCollector {
// fn into_rel_node(self) -> OptRelNodeRef {
// self.0.into_rel_node()
// }

// fn from_rel_node(rel_node: OptRelNodeRef) -> Option<Self> {
// if !matches!(rel_node.typ, OptRelNodeTyp::PhysicalCollector(_)) {
// return None;
// }
// PlanNode::from_rel_node(rel_node).map(Self)
// }

// fn dispatch_explain(&self) -> Pretty<'static> {
// // Pretty::simple_record(
// // "PhysicalCollector",
// // vec![("group_id", self.group_id().to_string().into())],
// // vec![self.child().explain()],
// // )
// self.child().explain()
// }
// }

// impl PhysicalCollector {
// pub fn new(child: PlanNode, group_id: GroupId) -> PhysicalCollector {
// PhysicalCollector(PlanNode(
// RelNode {
// typ: OptRelNodeTyp::PhysicalCollector(group_id),
// children: vec![child.into_rel_node()],
// data: None,
// }
// .into(),
// ))
// }

// pub fn group_id(&self) -> GroupId {
// if let OptRelNodeTyp::PhysicalCollector(group_id) = self.clone().into_rel_node().typ {
// group_id
// } else {
// panic!("not a physical collector")
// }
// }

// pub fn child(&self) -> PlanNode {
// PlanNode::from_rel_node(self.clone().into_rel_node().child(0)).unwrap()
// }
// }
2 changes: 1 addition & 1 deletion optd-datafusion-repr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use rules::{
PhysicalConversionRule, ProjectionPullUpJoin,
};

pub use adaptive::PhysicalCollector;
// pub use adaptive::PhysicalCollector;
pub use optd_core::rel_node::Value;

mod adaptive;
Expand Down
12 changes: 6 additions & 6 deletions optd-datafusion-repr/src/plan_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub use scan::{LogicalScan, PhysicalScan};
pub use sort::{LogicalSort, PhysicalSort};

use crate::{
adaptive::PhysicalCollector,
// adaptive::PhysicalCollector,
properties::schema::{Schema, SchemaPropertyBuilder},
};

Expand Down Expand Up @@ -66,7 +66,7 @@ pub enum OptRelNodeTyp {
PhysicalNestedLoopJoin(JoinType),
PhysicalEmptyRelation,
PhysicalLimit,
PhysicalCollector(GroupId), // only produced after optimization is done
// PhysicalCollector(GroupId), // only produced after optimization is done
// Expressions
Constant(ConstantType),
ColumnRef,
Expand Down Expand Up @@ -100,7 +100,7 @@ impl OptRelNodeTyp {
| Self::PhysicalSort
| Self::PhysicalAgg
| Self::PhysicalHashJoin(_)
| Self::PhysicalCollector(_)
// | Self::PhysicalCollector(_)
| Self::PhysicalLimit
| Self::PhysicalEmptyRelation
)
Expand Down Expand Up @@ -367,9 +367,9 @@ pub fn explain(rel_node: OptRelNodeRef) -> Pretty<'static> {
OptRelNodeTyp::SortOrder(_) => SortOrderExpr::from_rel_node(rel_node)
.unwrap()
.dispatch_explain(),
OptRelNodeTyp::PhysicalCollector(_) => PhysicalCollector::from_rel_node(rel_node)
.unwrap()
.dispatch_explain(),
// OptRelNodeTyp::PhysicalCollector(_) => PhysicalCollector::from_rel_node(rel_node)
// .unwrap()
// .dispatch_explain(),
OptRelNodeTyp::PhysicalEmptyRelation => PhysicalEmptyRelation::from_rel_node(rel_node)
.unwrap()
.dispatch_explain(),
Expand Down

0 comments on commit 637a7f2

Please sign in to comment.