Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Rayon-based Scheduler #6169

Merged
merged 4 commits into from
Apr 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/docs_pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ jobs:
rust-version: stable
# Note: this does not include dictionary_expressions to reduce codegen
- name: Run doctests
run: cargo test --doc --features avro,scheduler,json
run: cargo test --doc --features avro,json
- name: Verify Working Directory Clean
run: git diff --exit-code
11 changes: 5 additions & 6 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ jobs:

# Note: this does not include dictionary_expressions to reduce codegen
- name: Check workspace with all features
run: cargo check --workspace --benches --features avro,scheduler,json

run: cargo check --workspace --benches --features avro,json
- name: Check Cargo.lock for datafusion-cli
run: |
# If this test fails, try running `cargo update` in the `datafusion-cli` directory
Expand Down Expand Up @@ -97,7 +96,7 @@ jobs:
with:
rust-version: stable
- name: Run tests (excluding doctests)
run: cargo test --lib --tests --bins --features avro,scheduler,json,dictionary_expressions
run: cargo test --lib --tests --bins --features avro,json,dictionary_expressions
- name: Verify Working Directory Clean
run: git diff --exit-code

Expand Down Expand Up @@ -153,7 +152,7 @@ jobs:
rust-version: stable
# Note: this does not include dictionary_expressions to reduce codegen
- name: Run doctests
run: cargo test --doc --features avro,scheduler,json
run: cargo test --doc --features avro,json
- name: Verify Working Directory Clean
run: git diff --exit-code

Expand Down Expand Up @@ -272,7 +271,7 @@ jobs:
shell: bash
run: |
export PATH=$PATH:$HOME/d/protoc/bin
cargo test --lib --tests --bins --features avro,scheduler,json,dictionary_expressions
cargo test --lib --tests --bins --features avro,json,dictionary_expressions
env:
# do not produce debug symbols to keep memory usage down
RUSTFLAGS: "-C debuginfo=0"
Expand Down Expand Up @@ -305,7 +304,7 @@ jobs:
- name: Run tests (excluding doctests)
shell: bash
run: |
cargo test --lib --tests --bins --features avro,scheduler,json,dictionary_expressions
cargo test --lib --tests --bins --features avro,json,dictionary_expressions
env:
# do not produce debug symbols to keep memory usage down
RUSTFLAGS: "-C debuginfo=0"
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ snmalloc = ["snmalloc-rs"]

[dependencies]
arrow = { workspace = true }
datafusion = { path = "../datafusion/core", version = "23.0.0", features = ["scheduler"] }
datafusion = { path = "../datafusion/core", version = "23.0.0" }
env_logger = "0.10"
futures = "0.3"
mimalloc = { version = "0.1", optional = true, default-features = false }
Expand Down
34 changes: 7 additions & 27 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ use std::{iter::Iterator, path::PathBuf, sync::Arc, time::Instant};
use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::scheduler::Scheduler;
use futures::TryStreamExt;
use structopt::StructOpt;

#[cfg(feature = "snmalloc")]
Expand Down Expand Up @@ -90,10 +88,6 @@ struct DataFusionBenchmarkOpt {
/// Whether to disable collection of statistics (and cost based optimizations) or not.
#[structopt(short = "S", long = "disable-statistics")]
disable_statistics: bool,

/// Enable scheduler
#[structopt(short = "e", long = "enable-scheduler")]
enable_scheduler: bool,
}

#[derive(Debug, StructOpt)]
Expand Down Expand Up @@ -227,16 +221,14 @@ async fn benchmark_query(
if query_id == 15 {
for (n, query) in sql.iter().enumerate() {
if n == 1 {
result = execute_query(&ctx, query, opt.debug, opt.enable_scheduler)
.await?;
result = execute_query(&ctx, query, opt.debug).await?;
} else {
execute_query(&ctx, query, opt.debug, opt.enable_scheduler).await?;
execute_query(&ctx, query, opt.debug).await?;
}
}
} else {
for query in sql {
result =
execute_query(&ctx, query, opt.debug, opt.enable_scheduler).await?;
result = execute_query(&ctx, query, opt.debug).await?;
}
}

Expand Down Expand Up @@ -295,7 +287,6 @@ async fn execute_query(
ctx: &SessionContext,
sql: &str,
debug: bool,
enable_scheduler: bool,
) -> Result<Vec<RecordBatch>> {
let plan = ctx.sql(sql).await?;
let (state, plan) = plan.into_parts();
Expand All @@ -315,15 +306,7 @@ async fn execute_query(
displayable(physical_plan.as_ref()).indent()
);
}
let result = if enable_scheduler {
let scheduler = Scheduler::new(num_cpus::get());
let results = scheduler
.schedule(physical_plan.clone(), state.task_ctx())
.unwrap();
results.stream().try_collect().await?
} else {
collect(physical_plan.clone(), state.task_ctx()).await?
};
let result = collect(physical_plan.clone(), state.task_ctx()).await?;
if debug {
println!(
"=== Physical plan with metrics ===\n{}\n",
Expand Down Expand Up @@ -529,8 +512,7 @@ mod tests {
// handle special q15 which contains "create view" sql statement
if sql.starts_with("select") {
let explain = "explain ".to_string() + sql;
let result_batch =
execute_query(&ctx, explain.as_str(), false, false).await?;
let result_batch = execute_query(&ctx, explain.as_str(), false).await?;
if !actual.is_empty() {
actual += "\n";
}
Expand All @@ -542,7 +524,7 @@ mod tests {
// let mut file = File::create(format!("expected-plans/q{}.txt", query))?;
// file.write_all(actual.as_bytes())?;
} else {
execute_query(&ctx, sql.as_str(), false, false).await?;
execute_query(&ctx, sql.as_str(), false).await?;
}
}

Expand Down Expand Up @@ -726,7 +708,7 @@ mod tests {

let sql = &get_query_sql(n)?;
for query in sql {
execute_query(&ctx, query, false, false).await?;
execute_query(&ctx, query, false).await?;
}

Ok(())
Expand Down Expand Up @@ -757,7 +739,6 @@ mod ci {
mem_table: false,
output_path: None,
disable_statistics: false,
enable_scheduler: false,
};
register_tables(&opt, &ctx).await?;
let queries = get_query_sql(query)?;
Expand Down Expand Up @@ -1064,7 +1045,6 @@ mod ci {
mem_table: false,
output_path: None,
disable_statistics: false,
enable_scheduler: false,
};
let mut results = benchmark_datafusion(opt).await?;
assert_eq!(results.len(), 1);
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/rust_clippy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@
# under the License.

set -ex
cargo clippy --all-targets --workspace --features avro,pyarrow,scheduler -- -D warnings
cargo clippy --all-targets --workspace --features avro,pyarrow -- -D warnings
4 changes: 0 additions & 4 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ dictionary_expressions = ["datafusion-physical-expr/dictionary_expressions", "da
force_hash_collisions = []
pyarrow = ["datafusion-common/pyarrow"]
regex_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion-optimizer/regex_expressions"]
# Used to enable scheduler
scheduler = ["rayon"]
simd = ["arrow/simd"]
unicode_expressions = ["datafusion-physical-expr/unicode_expressions", "datafusion-optimizer/unicode_expressions", "datafusion-sql/unicode_expressions"]

Expand Down Expand Up @@ -86,7 +84,6 @@ parquet = { workspace = true }
percent-encoding = "2.2.0"
pin-project-lite = "^0.2.7"
rand = "0.8"
rayon = { version = "1.5", optional = true }
smallvec = { version = "1.6", features = ["union"] }
sqlparser = { version = "0.33", features = ["visitor"] }
tempfile = "3"
Expand Down Expand Up @@ -150,7 +147,6 @@ name = "physical_plan"
[[bench]]
harness = false
name = "parquet_query_sql"
required-features = ["scheduler"]

[[bench]]
harness = false
Expand Down
19 changes: 0 additions & 19 deletions datafusion/core/benches/parquet_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use arrow::datatypes::{
use arrow::record_batch::RecordBatch;
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion::scheduler::Scheduler;
use futures::stream::StreamExt;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::{WriterProperties, WriterVersion};
Expand Down Expand Up @@ -196,8 +195,6 @@ fn criterion_benchmark(c: &mut Criterion) {
let config = SessionConfig::new().with_target_partitions(partitions);
let context = SessionContext::with_config(config);

let scheduler = Scheduler::new(partitions);

let local_rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
Expand Down Expand Up @@ -249,22 +246,6 @@ fn criterion_benchmark(c: &mut Criterion) {
})
});
});

c.bench_function(&format!("scheduled: {query}"), |b| {
b.iter(|| {
let query = query.clone();
let context = context.clone();

local_rt.block_on(async {
let query = context.sql(&query).await.unwrap();
let plan = query.create_physical_plan().await.unwrap();
let results = scheduler.schedule(plan, context.task_ctx()).unwrap();

let mut stream = results.stream();
while stream.next().await.transpose().unwrap().is_some() {}
});
});
});
}

// Temporary file must outlive the benchmarks, it is deleted when dropped
Expand Down
2 changes: 0 additions & 2 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,6 @@ pub mod physical_optimizer;
pub mod physical_plan;
pub mod prelude;
pub mod scalar;
#[cfg(feature = "scheduler")]
pub mod scheduler;
pub mod variable;

// re-export dependencies from arrow-rs to minimise version maintenance for crate users
Expand Down
Loading