Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TPCH-DS planning benchmark #9907

Merged
merged 3 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 57 additions & 110 deletions datafusion/core/benches/sql_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ use arrow::datatypes::{DataType, Field, Fields, Schema};
use datafusion::datasource::MemTable;
use datafusion::execution::context::SessionContext;
use std::sync::Arc;
use test_utils::tpcds::tpcds_schemas;
use test_utils::tpch::tpch_schemas;
use test_utils::TableDef;
use tokio::runtime::Runtime;

/// Create a logical plan from the specified sql
Expand All @@ -48,116 +51,18 @@ fn physical_plan(ctx: &SessionContext, sql: &str) {
}

/// Create schema with the specified number of columns
pub fn create_schema(column_prefix: &str, num_columns: usize) -> Schema {
fn create_schema(column_prefix: &str, num_columns: usize) -> Schema {
let fields: Fields = (0..num_columns)
.map(|i| Field::new(format!("{column_prefix}{i}"), DataType::Int32, true))
.collect();
Schema::new(fields)
}

pub fn create_table_provider(column_prefix: &str, num_columns: usize) -> Arc<MemTable> {
fn create_table_provider(column_prefix: &str, num_columns: usize) -> Arc<MemTable> {
let schema = Arc::new(create_schema(column_prefix, num_columns));
MemTable::try_new(schema, vec![]).map(Arc::new).unwrap()
}

pub fn create_tpch_schemas() -> [(String, Schema); 8] {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just moved this code into test_utils.

let lineitem_schema = Schema::new(vec![
Field::new("l_orderkey", DataType::Int64, false),
Field::new("l_partkey", DataType::Int64, false),
Field::new("l_suppkey", DataType::Int64, false),
Field::new("l_linenumber", DataType::Int32, false),
Field::new("l_quantity", DataType::Decimal128(15, 2), false),
Field::new("l_extendedprice", DataType::Decimal128(15, 2), false),
Field::new("l_discount", DataType::Decimal128(15, 2), false),
Field::new("l_tax", DataType::Decimal128(15, 2), false),
Field::new("l_returnflag", DataType::Utf8, false),
Field::new("l_linestatus", DataType::Utf8, false),
Field::new("l_shipdate", DataType::Date32, false),
Field::new("l_commitdate", DataType::Date32, false),
Field::new("l_receiptdate", DataType::Date32, false),
Field::new("l_shipinstruct", DataType::Utf8, false),
Field::new("l_shipmode", DataType::Utf8, false),
Field::new("l_comment", DataType::Utf8, false),
]);

let orders_schema = Schema::new(vec![
Field::new("o_orderkey", DataType::Int64, false),
Field::new("o_custkey", DataType::Int64, false),
Field::new("o_orderstatus", DataType::Utf8, false),
Field::new("o_totalprice", DataType::Decimal128(15, 2), false),
Field::new("o_orderdate", DataType::Date32, false),
Field::new("o_orderpriority", DataType::Utf8, false),
Field::new("o_clerk", DataType::Utf8, false),
Field::new("o_shippriority", DataType::Int32, false),
Field::new("o_comment", DataType::Utf8, false),
]);

let part_schema = Schema::new(vec![
Field::new("p_partkey", DataType::Int64, false),
Field::new("p_name", DataType::Utf8, false),
Field::new("p_mfgr", DataType::Utf8, false),
Field::new("p_brand", DataType::Utf8, false),
Field::new("p_type", DataType::Utf8, false),
Field::new("p_size", DataType::Int32, false),
Field::new("p_container", DataType::Utf8, false),
Field::new("p_retailprice", DataType::Decimal128(15, 2), false),
Field::new("p_comment", DataType::Utf8, false),
]);

let supplier_schema = Schema::new(vec![
Field::new("s_suppkey", DataType::Int64, false),
Field::new("s_name", DataType::Utf8, false),
Field::new("s_address", DataType::Utf8, false),
Field::new("s_nationkey", DataType::Int64, false),
Field::new("s_phone", DataType::Utf8, false),
Field::new("s_acctbal", DataType::Decimal128(15, 2), false),
Field::new("s_comment", DataType::Utf8, false),
]);

let partsupp_schema = Schema::new(vec![
Field::new("ps_partkey", DataType::Int64, false),
Field::new("ps_suppkey", DataType::Int64, false),
Field::new("ps_availqty", DataType::Int32, false),
Field::new("ps_supplycost", DataType::Decimal128(15, 2), false),
Field::new("ps_comment", DataType::Utf8, false),
]);

let customer_schema = Schema::new(vec![
Field::new("c_custkey", DataType::Int64, false),
Field::new("c_name", DataType::Utf8, false),
Field::new("c_address", DataType::Utf8, false),
Field::new("c_nationkey", DataType::Int64, false),
Field::new("c_phone", DataType::Utf8, false),
Field::new("c_acctbal", DataType::Decimal128(15, 2), false),
Field::new("c_mktsegment", DataType::Utf8, false),
Field::new("c_comment", DataType::Utf8, false),
]);

let nation_schema = Schema::new(vec![
Field::new("n_nationkey", DataType::Int64, false),
Field::new("n_name", DataType::Utf8, false),
Field::new("n_regionkey", DataType::Int64, false),
Field::new("n_comment", DataType::Utf8, false),
]);

let region_schema = Schema::new(vec![
Field::new("r_regionkey", DataType::Int64, false),
Field::new("r_name", DataType::Utf8, false),
Field::new("r_comment", DataType::Utf8, false),
]);

[
("lineitem".to_string(), lineitem_schema),
("orders".to_string(), orders_schema),
("part".to_string(), part_schema),
("supplier".to_string(), supplier_schema),
("partsupp".to_string(), partsupp_schema),
("customer".to_string(), customer_schema),
("nation".to_string(), nation_schema),
("region".to_string(), region_schema),
]
}

fn create_context() -> SessionContext {
let ctx = SessionContext::new();
ctx.register_table("t1", create_table_provider("a", 200))
Expand All @@ -168,16 +73,19 @@ fn create_context() -> SessionContext {
.unwrap();
ctx.register_table("t1000", create_table_provider("d", 1000))
.unwrap();
ctx
}

let tpch_schemas = create_tpch_schemas();
tpch_schemas.iter().for_each(|(name, schema)| {
/// Register the table definitions as a MemTable with the context and return the
/// context
fn register_defs(ctx: SessionContext, defs: Vec<TableDef>) -> SessionContext {
defs.iter().for_each(|TableDef { name, schema }| {
ctx.register_table(
name,
Arc::new(MemTable::try_new(Arc::new(schema.clone()), vec![]).unwrap()),
)
.unwrap();
});

ctx
}

Expand Down Expand Up @@ -236,40 +144,79 @@ fn criterion_benchmark(c: &mut Criterion) {
})
});

// --- TPC-H ---

let tpch_ctx = register_defs(SessionContext::new(), tpch_schemas());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than using the same context for all tests, I made separate contexts for TPCH and TPC-DS


let tpch_queries = [
"q1", "q2", "q3", "q4", "q5", "q6", "q7", "q8", "q9", "q10", "q11", "q12", "q13",
"q14", // "q15", q15 has multiple SQL statements which is not supported
"q16", "q17", "q18", "q19", "q20", "q21", "q22",
];

for q in tpch_queries {
let sql = std::fs::read_to_string(format!("../../benchmarks/queries/{}.sql", q))
.unwrap();
let sql =
std::fs::read_to_string(format!("../../benchmarks/queries/{q}.sql")).unwrap();
c.bench_function(&format!("physical_plan_tpch_{}", q), |b| {
b.iter(|| physical_plan(&ctx, &sql))
b.iter(|| physical_plan(&tpch_ctx, &sql))
});
}

let all_tpch_sql_queries = tpch_queries
.iter()
.map(|q| {
std::fs::read_to_string(format!("../../benchmarks/queries/{}.sql", q))
.unwrap()
std::fs::read_to_string(format!("../../benchmarks/queries/{q}.sql")).unwrap()
})
.collect::<Vec<_>>();

c.bench_function("physical_plan_tpch_all", |b| {
b.iter(|| {
for sql in &all_tpch_sql_queries {
physical_plan(&ctx, sql)
physical_plan(&tpch_ctx, sql)
}
})
});

c.bench_function("logical_plan_tpch_all", |b| {
b.iter(|| {
for sql in &all_tpch_sql_queries {
logical_plan(&ctx, sql)
logical_plan(&tpch_ctx, sql)
}
})
});

// --- TPC-DS ---

let tpcds_ctx = register_defs(SessionContext::new(), tpcds_schemas());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the new code


// 10, 35: Physical plan does not support logical expression Exists(<subquery>)
// 45: Physical plan does not support logical expression (<subquery>)
// 41: Optimizing disjunctions not supported
let ignored = [10, 35, 41, 45];

let raw_tpcds_sql_queries = (1..100)
.filter(|q| !ignored.contains(q))
.map(|q| std::fs::read_to_string(format!("./tests/tpc-ds/{q}.sql")).unwrap())
.collect::<Vec<_>>();

// some queries have multiple statements
let all_tpcds_sql_queries = raw_tpcds_sql_queries
.iter()
.flat_map(|sql| sql.split(';').filter(|s| !s.trim().is_empty()))
.collect::<Vec<_>>();

c.bench_function("physical_plan_tpcds_all", |b| {
b.iter(|| {
for sql in &all_tpcds_sql_queries {
physical_plan(&tpcds_ctx, sql)
}
})
});

c.bench_function("logical_plan_tpcds_all", |b| {
b.iter(|| {
for sql in &all_tpcds_sql_queries {
logical_plan(&tpcds_ctx, sql)
}
})
});
Expand Down
Loading
Loading