diff --git a/Cargo.toml b/Cargo.toml index ec4b7858..abe46c8b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,8 @@ [workspace] -members = ["datafusion-optd-cli", "optd-core", "datafusion-optd-bridge"] +members = [ + "datafusion-optd-cli", + "optd-core", + "optd-datafusion-bridge", + "optd-datafusion-repr", +] resolver = "2" diff --git a/README.md b/README.md index 052a5724..b67ba260 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,10 @@ # optd -A query optimizer. Currently aimed for a cascades optimizer for Apache datafusion. +A query optimizer. Currently aimed for a cascades optimizer for Apache Arrow Datafusion. + +## Structure + +* `datafusion-optd-cli`: patched Apache Arrow Datafusion cli that calls into optd +* `datafusion-optd-bridge`: implementation of Apache Arrow Datafusion query planner as a bridge between optd and Apache Arrow Datafusion. +* `optd-core`: the core framework of optd. +* `optd-datafusion-repr`: representation of Apache Arrow Datafusion plan nodes in optd. diff --git a/datafusion-optd-cli/Cargo.toml b/datafusion-optd-cli/Cargo.toml index d7462a9b..5a9273e2 100644 --- a/datafusion-optd-cli/Cargo.toml +++ b/datafusion-optd-cli/Cargo.toml @@ -57,7 +57,7 @@ tokio = { version = "1.24", features = [ "parking_lot", ] } url = "2.2" -datafusion-optd-bridge = { path = "../datafusion-optd-bridge" } +optd-datafusion-bridge = { path = "../optd-datafusion-bridge" } [dev-dependencies] assert_cmd = "2.0" diff --git a/datafusion-optd-cli/src/main.rs b/datafusion-optd-cli/src/main.rs index 482442e8..e4622555 100644 --- a/datafusion-optd-cli/src/main.rs +++ b/datafusion-optd-cli/src/main.rs @@ -21,7 +21,6 @@ use datafusion::execution::context::{SessionConfig, SessionState}; use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool}; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::prelude::SessionContext; -use datafusion_optd_bridge::OptdQueryPlanner; use datafusion_optd_cli::catalog::DynamicFileCatalog; use datafusion_optd_cli::{ exec, @@ -30,6 +29,7 @@ use datafusion_optd_cli::{ DATAFUSION_CLI_VERSION, }; use mimalloc::MiMalloc; +use optd_datafusion_bridge::OptdQueryPlanner; use std::collections::HashMap; use std::env; use std::path::Path; diff --git a/optd-core/src/cascades/tasks/apply_rule.rs b/optd-core/src/cascades/tasks/apply_rule.rs index d03fef0a..e5290582 100644 --- a/optd-core/src/cascades/tasks/apply_rule.rs +++ b/optd-core/src/cascades/tasks/apply_rule.rs @@ -11,7 +11,7 @@ use crate::{ tasks::{OptimizeExpressionTask, OptimizeInputsTask}, GroupId, }, - rel_node::{RelNode, RelNodeRef, RelNodeTyp}, + rel_node::{RelNode, RelNodeTyp}, rules::{OneOrMany, RuleMatcher}, }; diff --git a/optd-core/src/cost.rs b/optd-core/src/cost.rs index c7d29f35..a60595dc 100644 --- a/optd-core/src/cost.rs +++ b/optd-core/src/cost.rs @@ -1,26 +1,6 @@ -use std::collections::HashMap; - use itertools::Itertools; -use crate::{ - plan_nodes::OptRelNodeTyp, - rel_node::{RelNode, RelNodeTyp, Value}, -}; - -fn compute_plan_node_cost>( - model: &C, - node: &RelNode, - total_cost: &mut Cost, -) -> Cost { - let children = node - .children - .iter() - .map(|child| compute_plan_node_cost(model, child, total_cost)) - .collect_vec(); - let cost = model.compute_cost(&node.typ, &node.data, &children); - model.accumulate(total_cost, &cost); - cost -} +use crate::rel_node::{RelNode, RelNodeTyp, Value}; #[derive(Default, Clone, Debug, PartialOrd, PartialEq)] pub struct Cost(pub Vec); @@ -36,108 +16,3 @@ pub trait CostModel: 'static + Send + Sync { fn zero(&self) -> Cost; } - -pub struct OptCostModel { - table_stat: HashMap, -} - -pub const ROW_COUNT: usize = 1; -pub const COMPUTE_COST: usize = 2; -pub const IO_COST: usize = 3; - -impl OptCostModel { - fn row_cnt(Cost(cost): &Cost) -> f64 { - cost[ROW_COUNT] - } - - fn compute_cost(Cost(cost): &Cost) -> f64 { - cost[COMPUTE_COST] - } - - fn io_cost(Cost(cost): &Cost) -> f64 { - cost[IO_COST] - } - - fn cost_tuple(Cost(cost): &Cost) -> (f64, f64, f64) { - (cost[ROW_COUNT], cost[COMPUTE_COST], cost[IO_COST]) - } - - fn weighted_cost(row_cnt: f64, compute_cost: f64, io_cost: f64) -> f64 { - let _ = row_cnt; - compute_cost + io_cost * 10.0 - } - - pub fn cost(row_cnt: f64, compute_cost: f64, io_cost: f64) -> Cost { - Cost(vec![ - Self::weighted_cost(row_cnt, compute_cost, io_cost), - row_cnt, - compute_cost, - io_cost, - ]) - } -} - -impl CostModel for OptCostModel { - fn explain(&self, cost: &Cost) -> String { - format!( - "weighted={},row_cnt={},compute={},io={}", - cost.0[0], - Self::row_cnt(cost), - Self::compute_cost(cost), - Self::io_cost(cost) - ) - } - - fn accumulate(&self, total_cost: &mut Cost, cost: &Cost) { - total_cost.0[ROW_COUNT] += Self::row_cnt(cost); - total_cost.0[COMPUTE_COST] += Self::compute_cost(cost); - total_cost.0[IO_COST] += Self::io_cost(cost); - total_cost.0[0] = Self::weighted_cost( - total_cost.0[ROW_COUNT], - total_cost.0[COMPUTE_COST], - total_cost.0[IO_COST], - ); - } - - fn zero(&self) -> Cost { - Self::cost(0.0, 0.0, 0.0) - } - - fn compute_cost(&self, node: &OptRelNodeTyp, data: &Option, children: &[Cost]) -> Cost { - match node { - OptRelNodeTyp::PhysicalScan => { - let table_name = data.as_ref().unwrap().as_str(); - let row_cnt = self.table_stat.get(table_name.as_ref()).copied().unwrap() as f64; - Self::cost(row_cnt, 0.0, row_cnt) - } - OptRelNodeTyp::PhysicalFilter => { - let (row_cnt, _, _) = Self::cost_tuple(&children[0]); - let selectivity = 0.1; - Self::cost(row_cnt * selectivity, row_cnt, 0.0) - } - OptRelNodeTyp::PhysicalNestedLoopJoin(_) => { - let (row_cnt_1, _, _) = Self::cost_tuple(&children[0]); - let (row_cnt_2, _, _) = Self::cost_tuple(&children[1]); - let selectivity = 0.1; - Self::cost( - row_cnt_1 * row_cnt_2 * selectivity, - row_cnt_1 * row_cnt_2, - 0.0, - ) - } - _ => Self::cost(1.0, 0.0, 0.0), - } - } - - fn compute_plan_node_cost(&self, node: &RelNode) -> Cost { - let mut cost = self.zero(); - compute_plan_node_cost(self, node, &mut cost); - cost - } -} - -impl OptCostModel { - pub fn new(table_stat: HashMap) -> Self { - Self { table_stat } - } -} diff --git a/optd-core/src/lib.rs b/optd-core/src/lib.rs index 11ad6aa3..41967282 100644 --- a/optd-core/src/lib.rs +++ b/optd-core/src/lib.rs @@ -1,5 +1,4 @@ pub mod cascades; pub mod cost; -pub mod plan_nodes; pub mod rel_node; pub mod rules; diff --git a/optd-core/src/rules.rs b/optd-core/src/rules.rs index 39b6300a..c17d185b 100644 --- a/optd-core/src/rules.rs +++ b/optd-core/src/rules.rs @@ -1,18 +1,10 @@ -mod filter_join; mod ir; -mod join_assoc; -mod join_commute; -mod physical; use std::collections::HashMap; use crate::rel_node::{RelNode, RelNodeTyp}; -pub use filter_join::FilterJoinPullUpRule; pub use ir::{OneOrMany, RuleMatcher}; -pub use join_assoc::{JoinAssocLeftRule, JoinAssocRightRule}; -pub use join_commute::JoinCommuteRule; -pub use physical::PhysicalConversionRule; pub trait Rule { fn matcher(&self) -> &RuleMatcher; diff --git a/datafusion-optd-bridge/Cargo.toml b/optd-datafusion-bridge/Cargo.toml similarity index 86% rename from datafusion-optd-bridge/Cargo.toml rename to optd-datafusion-bridge/Cargo.toml index 77cd69e2..cf487bad 100644 --- a/datafusion-optd-bridge/Cargo.toml +++ b/optd-datafusion-bridge/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "datafusion-optd-bridge" +name = "optd-datafusion-bridge" version = "0.1.0" edition = "2021" diff --git a/datafusion-optd-bridge/src/lib.rs b/optd-datafusion-bridge/src/lib.rs similarity index 100% rename from datafusion-optd-bridge/src/lib.rs rename to optd-datafusion-bridge/src/lib.rs diff --git a/optd-datafusion-repr/Cargo.toml b/optd-datafusion-repr/Cargo.toml new file mode 100644 index 00000000..8c005a6d --- /dev/null +++ b/optd-datafusion-repr/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "optd-datafusion-repr" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1" +num-traits = "0.2" +num-derive = "0.2" +tracing = "0.1" +ordered-float = "4" +tracing-subscriber = "0.3" +pretty-xmlish = "0.1" +itertools = "0.11" +optd-core = { path = "../optd-core" } diff --git a/optd-core/src/bin/test_optimize.rs b/optd-datafusion-repr/src/bin/test_optimize.rs similarity index 96% rename from optd-core/src/bin/test_optimize.rs rename to optd-datafusion-repr/src/bin/test_optimize.rs index ab96dcaf..216ccdc3 100644 --- a/optd-core/src/bin/test_optimize.rs +++ b/optd-datafusion-repr/src/bin/test_optimize.rs @@ -1,13 +1,12 @@ use std::sync::Arc; -use optd_core::{ - cascades::CascadesOptimizer, +use optd_core::{cascades::CascadesOptimizer, rel_node::Value}; +use optd_datafusion_repr::{ cost::OptCostModel, plan_nodes::{ BinOpExpr, BinOpType, ColumnRefExpr, ConstantExpr, JoinType, LogicalFilter, LogicalJoin, LogicalScan, OptRelNode, OptRelNodeTyp, PlanNode, }, - rel_node::Value, rules::{ FilterJoinPullUpRule, JoinAssocLeftRule, JoinAssocRightRule, JoinCommuteRule, PhysicalConversionRule, diff --git a/optd-datafusion-repr/src/cost.rs b/optd-datafusion-repr/src/cost.rs new file mode 100644 index 00000000..548e2540 --- /dev/null +++ b/optd-datafusion-repr/src/cost.rs @@ -0,0 +1,128 @@ +use std::collections::HashMap; + +use crate::plan_nodes::OptRelNodeTyp; +use itertools::Itertools; +use optd_core::{ + cost::{Cost, CostModel}, + rel_node::{RelNode, RelNodeTyp, Value}, +}; + +fn compute_plan_node_cost>( + model: &C, + node: &RelNode, + total_cost: &mut Cost, +) -> Cost { + let children = node + .children + .iter() + .map(|child| compute_plan_node_cost(model, child, total_cost)) + .collect_vec(); + let cost = model.compute_cost(&node.typ, &node.data, &children); + model.accumulate(total_cost, &cost); + cost +} + +pub struct OptCostModel { + table_stat: HashMap, +} + +pub const ROW_COUNT: usize = 1; +pub const COMPUTE_COST: usize = 2; +pub const IO_COST: usize = 3; + +impl OptCostModel { + fn row_cnt(Cost(cost): &Cost) -> f64 { + cost[ROW_COUNT] + } + + fn compute_cost(Cost(cost): &Cost) -> f64 { + cost[COMPUTE_COST] + } + + fn io_cost(Cost(cost): &Cost) -> f64 { + cost[IO_COST] + } + + fn cost_tuple(Cost(cost): &Cost) -> (f64, f64, f64) { + (cost[ROW_COUNT], cost[COMPUTE_COST], cost[IO_COST]) + } + + fn weighted_cost(row_cnt: f64, compute_cost: f64, io_cost: f64) -> f64 { + let _ = row_cnt; + compute_cost + io_cost * 10.0 + } + + pub fn cost(row_cnt: f64, compute_cost: f64, io_cost: f64) -> Cost { + Cost(vec![ + Self::weighted_cost(row_cnt, compute_cost, io_cost), + row_cnt, + compute_cost, + io_cost, + ]) + } +} + +impl CostModel for OptCostModel { + fn explain(&self, cost: &Cost) -> String { + format!( + "weighted={},row_cnt={},compute={},io={}", + cost.0[0], + Self::row_cnt(cost), + Self::compute_cost(cost), + Self::io_cost(cost) + ) + } + + fn accumulate(&self, total_cost: &mut Cost, cost: &Cost) { + total_cost.0[ROW_COUNT] += Self::row_cnt(cost); + total_cost.0[COMPUTE_COST] += Self::compute_cost(cost); + total_cost.0[IO_COST] += Self::io_cost(cost); + total_cost.0[0] = Self::weighted_cost( + total_cost.0[ROW_COUNT], + total_cost.0[COMPUTE_COST], + total_cost.0[IO_COST], + ); + } + + fn zero(&self) -> Cost { + Self::cost(0.0, 0.0, 0.0) + } + + fn compute_cost(&self, node: &OptRelNodeTyp, data: &Option, children: &[Cost]) -> Cost { + match node { + OptRelNodeTyp::PhysicalScan => { + let table_name = data.as_ref().unwrap().as_str(); + let row_cnt = self.table_stat.get(table_name.as_ref()).copied().unwrap() as f64; + Self::cost(row_cnt, 0.0, row_cnt) + } + OptRelNodeTyp::PhysicalFilter => { + let (row_cnt, _, _) = Self::cost_tuple(&children[0]); + let selectivity = 0.1; + Self::cost(row_cnt * selectivity, row_cnt, 0.0) + } + OptRelNodeTyp::PhysicalNestedLoopJoin(_) => { + let (row_cnt_1, _, _) = Self::cost_tuple(&children[0]); + let (row_cnt_2, _, _) = Self::cost_tuple(&children[1]); + let selectivity = 0.1; + Self::cost( + row_cnt_1 * row_cnt_2 * selectivity, + row_cnt_1 * row_cnt_2, + 0.0, + ) + } + _ => Self::cost(1.0, 0.0, 0.0), + } + } + + fn compute_plan_node_cost(&self, node: &RelNode) -> Cost { + let mut cost = self.zero(); + compute_plan_node_cost(self, node, &mut cost); + cost + } +} + +impl OptCostModel { + pub fn new(table_stat: HashMap) -> Self { + Self { table_stat } + } +} diff --git a/optd-datafusion-repr/src/lib.rs b/optd-datafusion-repr/src/lib.rs new file mode 100644 index 00000000..0f5ee789 --- /dev/null +++ b/optd-datafusion-repr/src/lib.rs @@ -0,0 +1,3 @@ +pub mod cost; +pub mod plan_nodes; +pub mod rules; diff --git a/optd-core/src/plan_nodes.rs b/optd-datafusion-repr/src/plan_nodes.rs similarity index 99% rename from optd-core/src/plan_nodes.rs rename to optd-datafusion-repr/src/plan_nodes.rs index 705f907e..edb11dbd 100644 --- a/optd-core/src/plan_nodes.rs +++ b/optd-datafusion-repr/src/plan_nodes.rs @@ -8,7 +8,7 @@ mod scan; use std::sync::Arc; -use crate::{ +use optd_core::{ cascades::GroupId, rel_node::{RelNode, RelNodeRef, RelNodeTyp}, }; diff --git a/optd-core/src/plan_nodes/apply.rs b/optd-datafusion-repr/src/plan_nodes/apply.rs similarity index 98% rename from optd-core/src/plan_nodes/apply.rs rename to optd-datafusion-repr/src/plan_nodes/apply.rs index 4a3368df..5789ae56 100644 --- a/optd-core/src/plan_nodes/apply.rs +++ b/optd-datafusion-repr/src/plan_nodes/apply.rs @@ -3,7 +3,7 @@ use std::fmt::Display; use pretty_xmlish::Pretty; -use crate::rel_node::RelNode; +use optd_core::rel_node::RelNode; use super::{Expr, JoinType, OptRelNode, OptRelNodeRef, OptRelNodeTyp, PlanNode}; diff --git a/optd-core/src/plan_nodes/expr.rs b/optd-datafusion-repr/src/plan_nodes/expr.rs similarity index 99% rename from optd-core/src/plan_nodes/expr.rs rename to optd-datafusion-repr/src/plan_nodes/expr.rs index bc99bba6..78203c0d 100644 --- a/optd-core/src/plan_nodes/expr.rs +++ b/optd-datafusion-repr/src/plan_nodes/expr.rs @@ -3,7 +3,7 @@ use std::fmt::Display; use itertools::Itertools; use pretty_xmlish::Pretty; -use crate::rel_node::{RelNode, Value}; +use optd_core::rel_node::{RelNode, Value}; use super::{Expr, OptRelNode, OptRelNodeRef, OptRelNodeTyp}; diff --git a/optd-core/src/plan_nodes/filter.rs b/optd-datafusion-repr/src/plan_nodes/filter.rs similarity index 98% rename from optd-core/src/plan_nodes/filter.rs rename to optd-datafusion-repr/src/plan_nodes/filter.rs index 49abebf2..25ffe628 100644 --- a/optd-core/src/plan_nodes/filter.rs +++ b/optd-datafusion-repr/src/plan_nodes/filter.rs @@ -1,6 +1,6 @@ use pretty_xmlish::Pretty; -use crate::rel_node::RelNode; +use optd_core::rel_node::RelNode; use super::{replace_typ, Expr, OptRelNode, OptRelNodeRef, OptRelNodeTyp, PlanNode}; diff --git a/optd-core/src/plan_nodes/join.rs b/optd-datafusion-repr/src/plan_nodes/join.rs similarity index 99% rename from optd-core/src/plan_nodes/join.rs rename to optd-datafusion-repr/src/plan_nodes/join.rs index 5bdd25fa..45ee0ea7 100644 --- a/optd-core/src/plan_nodes/join.rs +++ b/optd-datafusion-repr/src/plan_nodes/join.rs @@ -3,7 +3,7 @@ use std::fmt::Display; use pretty_xmlish::Pretty; -use crate::rel_node::RelNode; +use optd_core::rel_node::RelNode; use super::{replace_typ, Expr, OptRelNode, OptRelNodeRef, OptRelNodeTyp, PlanNode}; diff --git a/optd-core/src/plan_nodes/scan.rs b/optd-datafusion-repr/src/plan_nodes/scan.rs similarity index 97% rename from optd-core/src/plan_nodes/scan.rs rename to optd-datafusion-repr/src/plan_nodes/scan.rs index a4d78930..b8c03dfb 100644 --- a/optd-core/src/plan_nodes/scan.rs +++ b/optd-datafusion-repr/src/plan_nodes/scan.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use pretty_xmlish::Pretty; -use crate::rel_node::{RelNode, Value}; +use optd_core::rel_node::{RelNode, Value}; use super::{replace_typ, OptRelNode, OptRelNodeRef, OptRelNodeTyp, PlanNode}; diff --git a/optd-datafusion-repr/src/rules.rs b/optd-datafusion-repr/src/rules.rs new file mode 100644 index 00000000..ec5b2436 --- /dev/null +++ b/optd-datafusion-repr/src/rules.rs @@ -0,0 +1,9 @@ +mod filter_join; +mod join_assoc; +mod join_commute; +mod physical; + +pub use filter_join::FilterJoinPullUpRule; +pub use join_assoc::{JoinAssocLeftRule, JoinAssocRightRule}; +pub use join_commute::JoinCommuteRule; +pub use physical::PhysicalConversionRule; diff --git a/optd-core/src/rules/filter_join.rs b/optd-datafusion-repr/src/rules/filter_join.rs similarity index 93% rename from optd-core/src/rules/filter_join.rs rename to optd-datafusion-repr/src/rules/filter_join.rs index 67a18f92..5efc1db8 100644 --- a/optd-core/src/rules/filter_join.rs +++ b/optd-datafusion-repr/src/rules/filter_join.rs @@ -1,14 +1,9 @@ use std::collections::HashMap; -use crate::{ - plan_nodes::{JoinType, OptRelNodeTyp}, - rel_node::RelNode, -}; +use optd_core::rel_node::RelNode; +use optd_core::rules::{OneOrMany, Rule, RuleMatcher}; -use super::{ - ir::{OneOrMany, RuleMatcher}, - Rule, -}; +use crate::plan_nodes::{JoinType, OptRelNodeTyp}; pub struct FilterJoinPullUpRule { matcher: RuleMatcher, diff --git a/optd-core/src/rules/join_assoc.rs b/optd-datafusion-repr/src/rules/join_assoc.rs similarity index 97% rename from optd-core/src/rules/join_assoc.rs rename to optd-datafusion-repr/src/rules/join_assoc.rs index bb769bd3..e83940cf 100644 --- a/optd-core/src/rules/join_assoc.rs +++ b/optd-datafusion-repr/src/rules/join_assoc.rs @@ -1,14 +1,9 @@ use std::collections::HashMap; -use crate::{ - plan_nodes::{JoinType, OptRelNodeTyp}, - rel_node::RelNode, -}; +use optd_core::rel_node::RelNode; +use optd_core::rules::{OneOrMany, Rule, RuleMatcher}; -use super::{ - ir::{OneOrMany, RuleMatcher}, - Rule, -}; +use crate::plan_nodes::{JoinType, OptRelNodeTyp}; /// Implements A join (B join C) = (A join B) join C pub struct JoinAssocLeftRule { diff --git a/optd-core/src/rules/join_commute.rs b/optd-datafusion-repr/src/rules/join_commute.rs similarity index 92% rename from optd-core/src/rules/join_commute.rs rename to optd-datafusion-repr/src/rules/join_commute.rs index e791aaad..2ceccd03 100644 --- a/optd-core/src/rules/join_commute.rs +++ b/optd-datafusion-repr/src/rules/join_commute.rs @@ -1,14 +1,9 @@ use std::collections::HashMap; -use crate::{ - plan_nodes::{JoinType, OptRelNodeTyp}, - rel_node::RelNode, -}; +use optd_core::rel_node::RelNode; +use optd_core::rules::{OneOrMany, Rule, RuleMatcher}; -use super::{ - ir::{OneOrMany, RuleMatcher}, - Rule, -}; +use crate::plan_nodes::{JoinType, OptRelNodeTyp}; /// Implements A join B = B join A /// TODO: should insert a projection to reorder the columns diff --git a/optd-core/src/rules/physical.rs b/optd-datafusion-repr/src/rules/physical.rs similarity index 93% rename from optd-core/src/rules/physical.rs rename to optd-datafusion-repr/src/rules/physical.rs index 088b5262..db429510 100644 --- a/optd-core/src/rules/physical.rs +++ b/optd-datafusion-repr/src/rules/physical.rs @@ -1,11 +1,9 @@ use std::collections::HashMap; -use crate::{plan_nodes::OptRelNodeTyp, rel_node::RelNode}; +use optd_core::rel_node::RelNode; +use optd_core::rules::{OneOrMany, Rule, RuleMatcher}; -use super::{ - ir::{OneOrMany, RuleMatcher}, - Rule, -}; +use crate::plan_nodes::{JoinType, OptRelNodeTyp}; pub struct PhysicalConversionRule { matcher: RuleMatcher,