From 5314ee4348d7c5a6a6039ed088ab400ce8a8f4de Mon Sep 17 00:00:00 2001 From: "jiangzhenxing@analysys.com.cn" <1983915Jzx> Date: Tue, 18 Jan 2022 16:11:33 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=97=A0=E7=94=A8example?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- benchmarks/examples/sql_parser_example.rs | 28 ---- benchmarks/examples/ssb_memtable_example.rs | 168 -------------------- benchmarks/examples/ssb_query_example.rs | 113 ------------- benchmarks/examples/ssb_sql_example.rs | 108 ------------- 4 files changed, 417 deletions(-) delete mode 100644 benchmarks/examples/sql_parser_example.rs delete mode 100644 benchmarks/examples/ssb_memtable_example.rs delete mode 100644 benchmarks/examples/ssb_query_example.rs delete mode 100644 benchmarks/examples/ssb_sql_example.rs diff --git a/benchmarks/examples/sql_parser_example.rs b/benchmarks/examples/sql_parser_example.rs deleted file mode 100644 index e15a79f55d297..0000000000000 --- a/benchmarks/examples/sql_parser_example.rs +++ /dev/null @@ -1,28 +0,0 @@ -use chrono::prelude::*; - -#[cfg(feature = "snmalloc")] -#[global_allocator] -static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; - -#[cfg(feature = "mimalloc")] -#[global_allocator] -static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; - -#[tokio::main] -async fn main() -> datafusion::error::Result<()> { - use sqlparser::dialect::GenericDialect; - use sqlparser::parser::Parser; - - let dialect = GenericDialect {}; // or AnsiDialect - - let sql = "SELECT a, b, 123, myfunc(b) \ - FROM table_1 \ - WHERE a > b AND b < 100 \ - ORDER BY a DESC, b"; - - let ast = Parser::parse_sql(&dialect, sql).unwrap(); - - println!("AST: {:?}", ast); - - Ok(()) -} diff --git a/benchmarks/examples/ssb_memtable_example.rs b/benchmarks/examples/ssb_memtable_example.rs deleted file mode 100644 index 747429524891c..0000000000000 --- a/benchmarks/examples/ssb_memtable_example.rs +++ /dev/null @@ -1,168 +0,0 @@ -use datafusion::arrow::record_batch::RecordBatch; -use datafusion::arrow::util::pretty::print_batches; - -use chrono::prelude::*; - -use datafusion::prelude::*; - -use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion::datasource::file_format::parquet::ParquetFormat; -use datafusion::datasource::object_store::local::LocalFileSystem; - -use datafusion::datasource::{listing, MemTable, TableProvider}; - -use datafusion::datasource::file_format::FileFormat; -use datafusion::datasource::listing::ListingOptions; - -use datafusion::execution::runtime_env::RuntimeEnv; -use datafusion::physical_plan::{collect, common, displayable}; -use futures::StreamExt; -use std::sync::Arc; -use utils::test_util; - -#[cfg(feature = "snmalloc")] -#[global_allocator] -static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; - -#[cfg(feature = "mimalloc")] -#[global_allocator] -static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; - -#[tokio::main] -async fn main() -> datafusion::error::Result<()> { - let lineorder_flat_schema = Schema::new(vec![ - Field::new("LO_ORDERKEY", DataType::Int64, false), - Field::new("LO_LINENUMBER", DataType::Int64, false), - Field::new("LO_CUSTKEY", DataType::Int64, false), - Field::new("LO_PARTKEY", DataType::Int64, false), - Field::new("LO_SUPPKEY", DataType::Int64, false), - Field::new("LO_ORDERDATE", DataType::Int64, false), - Field::new("LO_ORDERPRIORITY", DataType::Utf8, false), - Field::new("LO_SHIPPRIOTITY", DataType::Int64, false), - Field::new("LO_QUANTITY", DataType::Int64, false), - Field::new("LO_EXTENDEDPRICE", DataType::Int64, false), - Field::new("LO_ORDTOTALPRICE", DataType::Int64, false), - Field::new("LO_DISCOUNT", DataType::Int64, false), - Field::new("LO_REVENUE", DataType::Int64, false), - Field::new("LO_SUPPLYCOST", DataType::Int64, false), - Field::new("LO_TAX", DataType::Int64, false), - Field::new("LO_COMMITDATE", DataType::Int64, false), - Field::new("LO_SHIPMODE", DataType::Utf8, false), - Field::new("C_NAME", DataType::Utf8, false), - Field::new("C_ADDRESS", DataType::Utf8, false), - Field::new("C_CITY", DataType::Utf8, false), - Field::new("C_NATION", DataType::Utf8, false), - Field::new("C_REGION", DataType::Utf8, false), - Field::new("C_PHONE", DataType::Utf8, false), - Field::new("C_MKTSEGMENT", DataType::Utf8, false), - Field::new("S_NAME", DataType::Utf8, false), - Field::new("S_ADDRESS", DataType::Utf8, false), - Field::new("S_CITY", DataType::Utf8, false), - Field::new("S_NATION", DataType::Utf8, false), - Field::new("S_REGION", DataType::Utf8, false), - Field::new("S_PHONE", DataType::Utf8, false), - Field::new("P_NAME", DataType::Utf8, false), - Field::new("P_MFGR", DataType::Utf8, false), - Field::new("P_CATEGORY", DataType::Utf8, false), - Field::new("P_BRAND", DataType::Utf8, false), - Field::new("P_COLOR", DataType::Utf8, false), - Field::new("P_TYPE", DataType::Utf8, false), - Field::new("P_SIZE", DataType::Int64, false), - Field::new("P_CONTAINER", DataType::Utf8, false), - ]); - - let project_schema = Schema::new(vec![Field::new("S_ADDRESS", DataType::Utf8, false)]); - - let dt = Local::now(); - - let parquet_format = ParquetFormat::default(); - - let table_provider = listing::ListingTable::new( - Arc::new(LocalFileSystem {}), - test_util::parquet_lineorder_flat(), - Arc::new(lineorder_flat_schema), - ListingOptions::new(Arc::new(parquet_format)), - ); - - let exec = table_provider - .scan(&Option::Some(vec![25]), 8192, &[], None) - .await?; - let partition_count = exec.output_partitioning().partition_count(); - let tasks = (0..partition_count) - .map(|part_i| { - let exec = exec.clone(); - tokio::spawn(async move { - let stream = exec - .execute(part_i, Arc::new(RuntimeEnv::default())) - .await?; - common::collect(stream).await - }) - }) - // this collect *is needed* so that the join below can - // switch between tasks - .collect::>(); - - let mut data: Vec> = Vec::new(); - // Vec::with_capacity(exec.output_partitioning().partition_count()); - for task in tasks { - let result = task.await.expect("MemTable::load could not join task")?; - - let chunks = result.chunks(100); - for chunk in chunks { - data.push(chunk.into()); - } - - // data.push(result); - } - - println!("memtable total partitions: {}", data.len()); - - let memtable = MemTable::try_new(SchemaRef::new(project_schema).clone(), data).unwrap(); - - let execution_config = ExecutionConfig::new() - // .with_target_partitions(80) - ; - let mut ctx = ExecutionContext::with_config(execution_config); - ctx.register_table("lineorder_flat", Arc::new(memtable))?; - - println!( - "prepare memtable usage millis: {}", - Local::now().timestamp_millis() - dt.timestamp_millis() - ); - - let target_partitions = 32; - let project = "S_ADDRESS"; - let dt = Local::now(); - - let df = ctx - .table("lineorder_flat") - .unwrap() - .repartition(Partitioning::Hash(vec![col(project)], target_partitions))? - .select_columns(&[project])? - .aggregate( - vec![col(project)], - // vec![Expr::Literal(ScalarValue::Int8(Some(1)))], - vec![count(col(project))], - )? - .aggregate(vec![], vec![count(col(project))])?; - - let logic_plan = df.to_logical_plan(); - - let logic_plan = ctx.optimize(&logic_plan)?; - let physical_plan = ctx.create_physical_plan(&logic_plan).await?; - let result = collect(physical_plan.clone(), Arc::new(RuntimeEnv::default())).await?; - - print_batches(&result)?; - - println!( - "group by usage millis: {}", - Local::now().timestamp_millis() - dt.timestamp_millis() - ); - - println!( - "=== Physical plan with optimize ===\n{}\n", - displayable(physical_plan.as_ref()).indent().to_string() - ); - - Ok(()) -} diff --git a/benchmarks/examples/ssb_query_example.rs b/benchmarks/examples/ssb_query_example.rs deleted file mode 100644 index 62bd694a2b654..0000000000000 --- a/benchmarks/examples/ssb_query_example.rs +++ /dev/null @@ -1,113 +0,0 @@ -use chrono::prelude::*; -use datafusion::arrow::util::pretty::print_batches; -use datafusion::datasource::listing::ListingTable; -use datafusion::logical_plan::{count_distinct, Expr}; -// use datafusion::optimizer::common_subexpr_eliminate::CommonSubexprEliminate; -// use datafusion::optimizer::constant_folding::ConstantFolding; -// use datafusion::optimizer::eliminate_limit::EliminateLimit; -// use datafusion::optimizer::filter_push_down::FilterPushDown; -// use datafusion::optimizer::limit_push_down::LimitPushDown; -// use datafusion::optimizer::projection_push_down::ProjectionPushDown; -// use datafusion::optimizer::simplify_expressions::SimplifyExpressions; -// use datafusion::physical_optimizer::aggregate_statistics::AggregateStatistics; -// use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches; -// use datafusion::physical_optimizer::hash_build_probe_order::HashBuildProbeOrder; -// use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec; -use datafusion::execution::runtime_env::RuntimeEnv; -use datafusion::physical_optimizer::repartition::Repartition; -use datafusion::physical_plan::{collect, displayable}; -use datafusion::prelude::*; -use datafusion::scalar::ScalarValue; -use std::sync::Arc; -use utils::test_util; - -#[cfg(feature = "snmalloc")] -#[global_allocator] -static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; - -#[cfg(feature = "mimalloc")] -#[global_allocator] -static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; - -#[tokio::main] -async fn main() -> datafusion::error::Result<()> { - let target_partitions = 32; - let execution_config = ExecutionConfig::new() - .with_optimizer_rules( - vec![ - // Arc::new(ConstantFolding::new()), - // Arc::new(CommonSubexprEliminate::new()), - // Arc::new(EliminateLimit::new()), - // Arc::new(ProjectionPushDown::new()), - // Arc::new(FilterPushDown::new()), - // Arc::new(SimplifyExpressions::new()), - // Arc::new(LimitPushDown::new()), - ] - ) - .with_physical_optimizer_rules( - vec![ - // Arc::new(AggregateStatistics::new()), - // Arc::new(HashBuildProbeOrder::new()), - // Arc::new(CoalesceBatches::new()), - // Arc::new(Repartition::new()), - // Arc::new(AddCoalescePartitionsExec::new()), - ] - ) - //.with_target_partitions(8) - // .with_repartition_aggregations(false) - //.with_batch_size(8192/2) - ; - - let mut ctx = ExecutionContext::with_config(execution_config); - //LO_SUPPKEY || S_ADDRESS - let project = "S_ADDRESS"; - // let project = "LO_SUPPKEY"; - let parquet_datapath = &format!("{}", test_util::parquet_lineorder_flat()); - - let dt = Local::now(); - - // let df = ctx.read_parquet(parquet_datapath); - - ctx.register_parquet("lineorder_flat", parquet_datapath) - .await?; - - let df = ctx.table("lineorder_flat").unwrap(); - - //select count(1) from (select 1 from lineorder_flat group by S_ADDRESS); - let df = df - .select_columns(&[project])? - .repartition(Partitioning::Hash(vec![col(project)], target_partitions))? - .aggregate( - vec![col(project)], - vec![Expr::Literal(ScalarValue::Int8(Some(1)))], - )? - .aggregate( - vec![], - vec![count(Expr::Literal(ScalarValue::Int8(Some(1))))], - )?; - - // select count(distinct S_ADDRESS) from lineorder_flat - // let df = df - // .select_columns(&[project])? - // .repartition(Partitioning::Hash(vec![col(project)], target_partitions))? - // .aggregate(vec![], vec![count_distinct(col(project))])?; - - let logic_plan = df.to_logical_plan(); - let logic_plan = ctx.optimize(&logic_plan)?; - let physical_plan = ctx.create_physical_plan(&logic_plan).await?; - let result = collect(physical_plan.clone(), Arc::new(RuntimeEnv::default())).await?; - - println!( - "usage millis: {}", - Local::now().timestamp_millis() - dt.timestamp_millis() - ); - - print_batches(&result)?; - - println!( - "=== Physical plan with optimize ===\n{}\n", - displayable(physical_plan.as_ref()).indent().to_string() - ); - - Ok(()) -} diff --git a/benchmarks/examples/ssb_sql_example.rs b/benchmarks/examples/ssb_sql_example.rs deleted file mode 100644 index 3778775cb1b00..0000000000000 --- a/benchmarks/examples/ssb_sql_example.rs +++ /dev/null @@ -1,108 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use chrono::Local; -use datafusion::arrow::record_batch::RecordBatch; -use datafusion::arrow::util::pretty; -use datafusion::arrow::util::pretty::print_batches; -use std::sync::Arc; - -use datafusion::error::Result; -// use datafusion::optimizer::common_subexpr_eliminate::CommonSubexprEliminate; -// use datafusion::optimizer::constant_folding::ConstantFolding; -// use datafusion::optimizer::eliminate_limit::EliminateLimit; -// use datafusion::optimizer::filter_push_down::FilterPushDown; -// use datafusion::optimizer::limit_push_down::LimitPushDown; -// use datafusion::optimizer::projection_push_down::ProjectionPushDown; -// use datafusion::optimizer::simplify_expressions::SimplifyExpressions; -// use datafusion::physical_optimizer::aggregate_statistics::AggregateStatistics; -// use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches; -// use datafusion::physical_optimizer::hash_build_probe_order::HashBuildProbeOrder; -// use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec; -use datafusion::prelude::*; -use utils::test_util; - -#[cfg(feature = "snmalloc")] -#[global_allocator] -static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; - -#[cfg(feature = "mimalloc")] -#[global_allocator] -static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; -/// This example demonstrates executing a simple query against an Arrow data source (CSV) and -/// fetching results -#[tokio::main] -async fn main() -> Result<()> { - let parquet_datapath = &format!("{}", test_util::parquet_lineorder_flat()); - let execution_config = ExecutionConfig::new() - .with_optimizer_rules( - vec![ - // Arc::new(ConstantFolding::new()), - // Arc::new(CommonSubexprEliminate::new()), - // Arc::new(EliminateLimit::new()), - // Arc::new(ProjectionPushDown::new()), - // Arc::new(FilterPushDown::new()), - // Arc::new(SimplifyExpressions::new()), - // Arc::new(LimitPushDown::new()), - //Arc::new(SingleDistinctAggregationToGroupBy::new()), - ] - ) - .with_physical_optimizer_rules( - vec![ - // Arc::new(AggregateStatistics::new()), - // Arc::new(HashBuildProbeOrder::new()), - // Arc::new(CoalesceBatches::new()), - // // Arc::new(Repartition::new()), - // Arc::new(AddCoalescePartitionsExec::new()), - ] - ) - //.with_target_partitions(8) - .with_repartition_aggregations(false) - //.with_batch_size(8192/2) - ; - - let mut ctx = ExecutionContext::with_config(execution_config); - let dt = Local::now(); - let result = ctx - .register_parquet("lineorder_flat", parquet_datapath) - .await?; - //let sql = r#"select count(distinct S_ADDRESS) from lineorder_flat;"#; - // let sql = r#"select S_ADDRESS from lineorder_flat group by S_ADDRESS"#; - let sql = r#"SELECT sum(LO_EXTENDEDPRICE) FROM lineorder_flat group by LO_ORDERPRIORITY;"#; - let sql = r#"SELECT sum(LO_EXTENDEDPRICE) FROM lineorder_flat group by S_ADDRESS"#; - - //LO_ORDERPRIORITY - //S_ADDRESS - - let df = ctx.sql(sql).await?; - let results: Vec = df.collect().await?; - - let usage = Local::now().timestamp_millis() - dt.timestamp_millis(); - - // print_batches(&results)?; - - println!( - "response lines: {}; usage millis: {}", - &results.get(0).unwrap().num_rows(), - usage - ); - - // let logic_plan = df.to_logical_plan(); - // println!("Display: {}", logic_plan.display_indent_schema()); - - Ok(()) -}